import marimo __generated_with = "0.13.15" app = marimo.App() @app.cell def init(): import sys sys.path.append('/opt/spark/work-dir/') from workflow_templates.spark.udf_manager import bootstrap_udfs from pyspark.sql.functions import udf from pyspark.sql.functions import lit from pyspark.sql.types import StringType, IntegerType import uuid from pathlib import Path from pyspark import SparkConf, Row from pyspark.sql import SparkSession import os import pandas as pd import polars as pl import pyarrow as pa from pyspark.sql.functions import expr,to_json,col,struct from functools import reduce from handle_structs_or_arrays import preprocess_then_expand import requests from jinja2 import Template import json from secrets_manager import SecretsManager from WorkflowManager import WorkflowDSL, WorkflowManager from KnowledgebaseManager import KnowledgebaseManager from gitea_client import GiteaClient, WorkspaceVersionedContent from dremio.flight.endpoint import DremioFlightEndpoint from dremio.flight.query import DremioFlightEndpointQuery alias_str='abcdefghijklmnopqrstuvwxyz' workspace = os.getenv('WORKSPACE') or 'exp360cust' job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) secrets = sm.list_secrets(workspace) gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) conf = SparkConf() params = { "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), "spark.hadoop.fs.s3a.aws.region": "us-west-1", "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dremio.type" : "hadoop", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" } conf.setAll(list(params.items())) spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() bootstrap_udfs(spark) return Template, job_id, json, requests, secrets, spark @app.cell def failed_payments_reader(spark): failed_payments_reader_df = spark.read.table('dremio.failedpayments') failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df') return (failed_payments_reader_df,) @app.cell def failed_payments_filter(failed_payments_reader_df, spark): print(failed_payments_reader_df.columns) failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\') and payment_id = \'pi_3RvDQqP0JqbrujP90uuhk13h\'") failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df') return (failed_payments_filter_df,) @app.cell def payment_api( Template, failed_payments_filter_df, json, requests, secrets, spark, ): _payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish' _payment_api_headers = dict() for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() : if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''): _payment_api_headers[_payment_api_k] = _payment_api_v.get('value') elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''): _payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret')) def _payment_api_call_api_udf(row): body_dict = row.asDict(recursive=True) template = Template('''{ "msgData": { "canDistribute": true, "payment": [ { "accountId3": "{{account_id}}", "currency2": "{{currency}}", "paymentAmount2": "{{amount}}" } ], "paymentDate": "{{payment_date[:10]}}", "paymentTender": [ { "currency3": "{{currency}}", "name": "NAME", "payorAccountId2": "{{account_id}}", "tenderAmount": "{{amount}}", "tenderType2": "CASH" } ], "payorAccountId": "{{account_id}}", "shouldAllPaymentsFreeze": true, "tndrSrceCd": "CASH-01", "user": "" }, "outMsgConfigCode": "EXP_CREATE_PAYMENT" }''') body = json.loads(template.render(**body_dict)) print("request : "+ json.dumps(body)) response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={}) data = response.json() print("response : " + json.dumps(data)) merged = {**body_dict, "response_body": data} return json.dumps(merged,default=str) _payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist() payment_api_df = spark.read.json(_payment_api_df_rdd) if not "response_body" in payment_api_df.columns: raise Exception("Dataframe is empty") payment_api_df.createOrReplaceTempView('payment_api_df') return (payment_api_df,) @app.cell def failed_payments_update_mapper(job_id, payment_api_df, spark): _failed_payments_update_mapper_select_clause=payment_api_df.columns if False else [] _failed_payments_update_mapper_select_clause.append("id AS id") _failed_payments_update_mapper_select_clause.append("retry_attempt_count + 1 AS retry_attempt_count") _failed_payments_update_mapper_select_clause.append("TRUE AS batch_processed") _failed_payments_update_mapper_select_clause.append("{job_id} AS batch_job_id") _failed_payments_update_mapper_select_clause.append("CASE WHEN response_body.success = \'true\' THEN \'success\' WHEN retry_attempt_count >= 3 THEN \'permanently_failed\' ELSE \'failed\' END AS retry_status") _failed_payments_update_mapper_select_clause.append("account_id AS account_id") _failed_payments_update_mapper_select_clause.append("failure_reason AS failure_reason") _failed_payments_update_mapper_select_clause.append("failure_timestamp AS failure_timestamp") _failed_payments_update_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount") _failed_payments_update_mapper_select_clause.append("created_at AS created_at") _failed_payments_update_mapper_select_clause.append("payment_id AS payment_id") _failed_payments_update_mapper_select_clause.append("source_of_failure AS source_of_failure") _failed_payments_update_mapper_select_clause.append("currency AS currency") _failed_payments_update_mapper_select_clause.append("gateway AS gateway") _failed_payments_update_mapper_select_clause.append("payment_method AS payment_method") _failed_payments_update_mapper_select_clause.append("payment_date AS payment_date") _failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS last_retry_timestamp") _failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at") failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'")) failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df") return @app.cell def success_payment_filter(payment_api_df, spark): print(payment_api_df.columns) success_payment_filter_df = spark.sql("select * from payment_api_df where response_body.success=\'true\'") success_payment_filter_df.createOrReplaceTempView('success_payment_filter_df') return (success_payment_filter_df,) @app.cell def success_payments_mapper(job_id, spark, success_payment_filter_df): _success_payments_mapper_select_clause=success_payment_filter_df.columns if False else [] _success_payments_mapper_select_clause.append("uuid() AS id") _success_payments_mapper_select_clause.append("account_id AS account_id") _success_payments_mapper_select_clause.append("currency AS currency") _success_payments_mapper_select_clause.append("payment_id AS payment_id") _success_payments_mapper_select_clause.append("gateway AS gateway") _success_payments_mapper_select_clause.append("payment_method AS payment_method") _success_payments_mapper_select_clause.append("payment_date AS payment_date") _success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS created_at") _success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at") _success_payments_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount") success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payment_filter_df").replace("{job_id}",f"'{job_id}'")) success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") return if __name__ == "__main__": app.run()