diff --git a/failed_payments_retry/main.py b/failed_payments_retry/main.py new file mode 100644 index 0000000..d6440e5 --- /dev/null +++ b/failed_payments_retry/main.py @@ -0,0 +1,164 @@ + +__generated_with = "0.13.15" + +# %% + +import sys +sys.path.append('/opt/spark/work-dir/') +from workflow_templates.spark.udf_manager import bootstrap_udfs +from pyspark.sql.functions import udf +from pyspark.sql.functions import lit +from pyspark.sql.types import StringType, IntegerType +import uuid +from pathlib import Path +from pyspark import SparkConf, Row +from pyspark.sql import SparkSession +import os +import pandas as pd +import polars as pl +import pyarrow as pa +from pyspark.sql.functions import expr,to_json,col,struct +from functools import reduce +from handle_structs_or_arrays import preprocess_then_expand +import requests +from jinja2 import Template +import json + + +from secrets_manager import SecretsManager + +from WorkflowManager import WorkflowDSL, WorkflowManager +from KnowledgebaseManager import KnowledgebaseManager +from gitea_client import GiteaClient, WorkspaceVersionedContent + +from dremio.flight.endpoint import DremioFlightEndpoint +from dremio.flight.query import DremioFlightEndpointQuery + + +alias_str='abcdefghijklmnopqrstuvwxyz' +workspace = os.getenv('WORKSPACE') or 'exp360cust' + +job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) + +sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) +secrets = sm.list_secrets(workspace) + +gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') +workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) +conf = SparkConf() +params = { + "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), + "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), + "spark.hadoop.fs.s3a.aws.region": "us-west-1", + "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), + "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.dremio.type" : "hadoop", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" +} + + + +conf.setAll(list(params.items())) + +spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() +bootstrap_udfs(spark) + +# %% + + + +failed_payments_reader_df = spark.read.table('dremio.failedpayments') +failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df') + +# %% + +print(failed_payments_reader_df.columns) +failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\') and payment_id = \'pi_3RvDQqP0JqbrujP90uuhk13h\'") +failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df') + +# %% + + + + +_payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish' + +_payment_api_headers = dict() + +for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() : + + if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''): + _payment_api_headers[_payment_api_k] = _payment_api_v.get('value') + elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''): + _payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret')) + + + +def _payment_api_call_api_udf(row): + body_dict = row.asDict(recursive=True) + template = Template('''{ + "msgData": { + "canDistribute": true, + "payment": [ + { + "accountId3": "{{account_id}}", + "currency2": "{{currency}}", + "paymentAmount2": "{{amount}}" + } + ], + "paymentDate": "{{payment_date[:10]}}", + "paymentTender": [ + { + "currency3": "{{currency}}", + "name": "NAME", + "payorAccountId2": "{{account_id}}", + "tenderAmount": "{{amount}}", + "tenderType2": "CASH" + } + ], + "payorAccountId": "{{account_id}}", + "shouldAllPaymentsFreeze": true, + "tndrSrceCd": "CASH-01", + "user": "" + }, + "outMsgConfigCode": "EXP_CREATE_PAYMENT" +}''') + + body = json.loads(template.render(**body_dict)) + print("request : "+ json.dumps(body)) + + + response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={}) + + + data = response.json() + print("response : " + json.dumps(data)) + + merged = {**body_dict, "response_body": data} + return json.dumps(merged,default=str) + +_payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist() +payment_api_df = spark.read.json(_payment_api_df_rdd) +if not "response_body" in payment_api_df.columns: + raise Exception("Dataframe is empty") +payment_api_df.createOrReplaceTempView('payment_api_df') + + + + + + + + + +# %% + +_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else [] + + +failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'")) +failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df") diff --git a/failed_payments_retry/main.py.notebook b/failed_payments_retry/main.py.notebook new file mode 100644 index 0000000..dd3c097 --- /dev/null +++ b/failed_payments_retry/main.py.notebook @@ -0,0 +1,192 @@ +import marimo + +__generated_with = "0.13.15" +app = marimo.App() + + +@app.cell +def init(): + + import sys + sys.path.append('/opt/spark/work-dir/') + from workflow_templates.spark.udf_manager import bootstrap_udfs + from pyspark.sql.functions import udf + from pyspark.sql.functions import lit + from pyspark.sql.types import StringType, IntegerType + import uuid + from pathlib import Path + from pyspark import SparkConf, Row + from pyspark.sql import SparkSession + import os + import pandas as pd + import polars as pl + import pyarrow as pa + from pyspark.sql.functions import expr,to_json,col,struct + from functools import reduce + from handle_structs_or_arrays import preprocess_then_expand + import requests + from jinja2 import Template + import json + + + from secrets_manager import SecretsManager + + from WorkflowManager import WorkflowDSL, WorkflowManager + from KnowledgebaseManager import KnowledgebaseManager + from gitea_client import GiteaClient, WorkspaceVersionedContent + + from dremio.flight.endpoint import DremioFlightEndpoint + from dremio.flight.query import DremioFlightEndpointQuery + + + alias_str='abcdefghijklmnopqrstuvwxyz' + workspace = os.getenv('WORKSPACE') or 'exp360cust' + + job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) + + sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) + secrets = sm.list_secrets(workspace) + + gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') + workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) + conf = SparkConf() + params = { + "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), + "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), + "spark.hadoop.fs.s3a.aws.region": "us-west-1", + "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), + "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.dremio.type" : "hadoop", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" + } + + + + conf.setAll(list(params.items())) + + spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() + bootstrap_udfs(spark) + return Template, job_id, json, requests, secrets, spark + + +@app.cell +def failed_payments_reader(spark): + + + + failed_payments_reader_df = spark.read.table('dremio.failedpayments') + failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df') + return (failed_payments_reader_df,) + + +@app.cell +def failed_payments_filter(failed_payments_reader_df, spark): + + print(failed_payments_reader_df.columns) + failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\') and payment_id = \'pi_3RvDQqP0JqbrujP90uuhk13h\'") + failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df') + return (failed_payments_filter_df,) + + +@app.cell +def payment_api( + Template, + failed_payments_filter_df, + json, + requests, + secrets, + spark, +): + + + + + _payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish' + + _payment_api_headers = dict() + + for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() : + + if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''): + _payment_api_headers[_payment_api_k] = _payment_api_v.get('value') + elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''): + _payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret')) + + + + def _payment_api_call_api_udf(row): + body_dict = row.asDict(recursive=True) + template = Template('''{ + "msgData": { + "canDistribute": true, + "payment": [ + { + "accountId3": "{{account_id}}", + "currency2": "{{currency}}", + "paymentAmount2": "{{amount}}" + } + ], + "paymentDate": "{{payment_date[:10]}}", + "paymentTender": [ + { + "currency3": "{{currency}}", + "name": "NAME", + "payorAccountId2": "{{account_id}}", + "tenderAmount": "{{amount}}", + "tenderType2": "CASH" + } + ], + "payorAccountId": "{{account_id}}", + "shouldAllPaymentsFreeze": true, + "tndrSrceCd": "CASH-01", + "user": "" + }, + "outMsgConfigCode": "EXP_CREATE_PAYMENT" + }''') + + body = json.loads(template.render(**body_dict)) + print("request : "+ json.dumps(body)) + + + response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={}) + + + data = response.json() + print("response : " + json.dumps(data)) + + merged = {**body_dict, "response_body": data} + return json.dumps(merged,default=str) + + _payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist() + payment_api_df = spark.read.json(_payment_api_df_rdd) + if not "response_body" in payment_api_df.columns: + raise Exception("Dataframe is empty") + payment_api_df.createOrReplaceTempView('payment_api_df') + + + + + + + + + return (payment_api_df,) + + +@app.cell +def failed_payments_update_mapper(job_id, payment_api_df, spark): + + _failed_payments_update_mapper_select_clause=payment_api_df.columns if False else [] + + + failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'")) + failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df") + return + + +if __name__ == "__main__": + app.run() diff --git a/failed_payments_retry/main.workflow b/failed_payments_retry/main.workflow new file mode 100644 index 0000000..bdda2c8 --- /dev/null +++ b/failed_payments_retry/main.workflow @@ -0,0 +1 @@ +{"version":"v1alpha","kind":"VisualBuilder","metadata":{"name":"failed_payments_retry","description":"Failed payments retry workflow.","runtime":"spark"},"spec":{"ui":{"edges":[{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-reader__0","target":"filter__1","id":"xy-edge__data-reader__0-filter__1"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"rest-api-invoke__2","id":"xy-edge__filter__1-rest-api-invoke__2"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"rest-api-invoke__2","target":"data-mapper__3","id":"xy-edge__rest-api-invoke__2-data-mapper__3"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"rest-api-invoke__2","target":"filter__4","id":"xy-edge__rest-api-invoke__2-filter__4"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__4","target":"data-mapper__7","id":"xy-edge__filter__4-data-mapper__7"}],"nodes":[{"id":"data-reader__0","type":"workflowNode","position":{"x":562,"y":64},"data":{"nodeType":"data-reader","id":"data-reader__0"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__1","type":"workflowNode","position":{"x":992.5,"y":63.5},"data":{"nodeType":"filter","id":"filter__1"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"rest-api-invoke__2","type":"workflowNode","position":{"x":1398.5,"y":59.5},"data":{"nodeType":"rest-api-invoke","id":"rest-api-invoke__2"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__3","type":"workflowNode","position":{"x":1938.5,"y":-206.5},"data":{"nodeType":"data-mapper","id":"data-mapper__3"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__4","type":"workflowNode","position":{"x":1928.5,"y":307.5},"data":{"nodeType":"filter","id":"filter__4"},"measured":{"width":320,"height":97},"selected":false,"dragging":false},{"id":"data-writer__5","type":"workflowNode","position":{"x":2548.5,"y":-230.5},"data":{"nodeType":"data-writer","id":"data-writer__5"},"measured":{"width":320,"height":142},"selected":false,"dragging":false},{"id":"data-writer__6","type":"workflowNode","position":{"x":2742.5,"y":275.5},"data":{"nodeType":"data-writer","id":"data-writer__6"},"measured":{"width":320,"height":142},"selected":false,"dragging":false},{"id":"data-mapper__7","type":"workflowNode","position":{"x":2334.5,"y":283.5},"data":{"nodeType":"data-mapper","id":"data-mapper__7"},"measured":{"width":320,"height":142},"selected":true,"dragging":false}],"nodesData":{"data-reader__0":{"name":"failed_payments_reader","type":"SparkIcebergReader","table_name":"failedpayments","iceberg_catalog":"dremio","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{},"typeLabel":"Spark","isDefault":false},"filter__1":{"name":"failed_payments_filter","type":"Filter","datasource":"failed_payments_reader","condition":"retry_attempt_count < 3 AND gateway = \\'CCS\\' AND (retry_status = \\'new\\' OR retry_status = \\'failed\\') and payment_id = \\'pi_3RvDQqP0JqbrujP90uuhk13h\\'","isDefault":false},"rest-api-invoke__2":{"name":"payment_api","type":"RESTInvoke","datasource":"failed_payments_filter","url":"https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish","method":"POST","headers":{"Content-Type":{"value":"application/json","secret":null},"api-key":{"value":null,"secret":"API_KEY"}},"bodyTemplate":"{\n \"msgData\": {\n \"canDistribute\": true,\n \"payment\": [\n {\n \"accountId3\": \"{{account_id}}\",\n \"currency2\": \"{{currency}}\",\n \"paymentAmount2\": \"{{amount}}\"\n }\n ],\n \"paymentDate\": \"{{payment_date[:10]}}\",\n \"paymentTender\": [\n {\n \"currency3\": \"{{currency}}\",\n \"name\": \"NAME\",\n \"payorAccountId2\": \"{{account_id}}\",\n \"tenderAmount\": \"{{amount}}\",\n \"tenderType2\": \"CASH\"\n }\n ],\n \"payorAccountId\": \"{{account_id}}\",\n \"shouldAllPaymentsFreeze\": true,\n \"tndrSrceCd\": \"CASH-01\",\n \"user\": \"\"\n },\n \"outMsgConfigCode\": \"EXP_CREATE_PAYMENT\"\n}","isDefault":false},"data-mapper__3":{"name":"failed_payments_update_mapper","type":"DataMapping","fromDataReader":"payment_api","includeExistingColumns":false,"toSchema":[],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false}}},"blocks":[{"name":"failed_payments_reader","type":"SparkIcebergReader","options":{},"table_name":"failedpayments","iceberg_catalog":"dremio","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"typeLabel":"Spark","isDefault":false},{"name":"failed_payments_filter","type":"Filter","options":{},"datasource":"failed_payments_reader","condition":"retry_attempt_count < 3 AND gateway = \\'CCS\\' AND (retry_status = \\'new\\' OR retry_status = \\'failed\\') and payment_id = \\'pi_3RvDQqP0JqbrujP90uuhk13h\\'","isDefault":false},{"name":"payment_api","type":"RESTInvoke","options":{},"datasource":"failed_payments_filter","url":"https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish","method":"POST","headers":{"Content-Type":{"value":"application/json","secret":null},"api-key":{"value":null,"secret":"API_KEY"}},"bodyTemplate":"{\n \"msgData\": {\n \"canDistribute\": true,\n \"payment\": [\n {\n \"accountId3\": \"{{account_id}}\",\n \"currency2\": \"{{currency}}\",\n \"paymentAmount2\": \"{{amount}}\"\n }\n ],\n \"paymentDate\": \"{{payment_date[:10]}}\",\n \"paymentTender\": [\n {\n \"currency3\": \"{{currency}}\",\n \"name\": \"NAME\",\n \"payorAccountId2\": \"{{account_id}}\",\n \"tenderAmount\": \"{{amount}}\",\n \"tenderType2\": \"CASH\"\n }\n ],\n \"payorAccountId\": \"{{account_id}}\",\n \"shouldAllPaymentsFreeze\": true,\n \"tndrSrceCd\": \"CASH-01\",\n \"user\": \"\"\n },\n \"outMsgConfigCode\": \"EXP_CREATE_PAYMENT\"\n}","isDefault":false},{"name":"failed_payments_update_mapper","type":"DataMapping","options":{},"fromDataReader":"payment_api","includeExistingColumns":false,"toSchema":[],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false}]}} \ No newline at end of file