Files
tenant1/failed_payments_retry/main.py
2025-08-29 09:09:09 +00:00

269 lines
10 KiB
Python

__generated_with = "0.13.15"
# %%
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
# %%
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
# %%
print(failed_payments_reader_df.columns)
failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\')")
failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df')
# %%
_payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish'
_payment_api_headers = dict()
for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() :
if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''):
_payment_api_headers[_payment_api_k] = _payment_api_v.get('value')
elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''):
_payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret'))
def _payment_api_call_api_udf(row):
body_dict = row.asDict(recursive=True)
template = Template('''{
"msgData": {
"canDistribute": true,
"payment": [
{
"accountId3": "{{account_id}}",
"currency2": "{{currency}}",
"paymentAmount2": "{{amount}}"
}
],
"paymentDate": "{{payment_date[:10]}}",
"paymentTender": [
{
"currency3": "{{currency}}",
"name": "NAME",
"payorAccountId2": "{{account_id}}",
"tenderAmount": "{{amount}}",
"tenderType2": "CASH"
}
],
"payorAccountId": "{{account_id}}",
"shouldAllPaymentsFreeze": true,
"tndrSrceCd": "CASH-01",
"user": ""
},
"outMsgConfigCode": "EXP_CREATE_PAYMENT"
}''')
body = json.loads(template.render(**body_dict))
print("request : "+ json.dumps(body))
response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={})
data = response.json()
print("response : " + json.dumps(data))
merged = {**body_dict, "response_body": data}
return json.dumps(merged,default=str)
_payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist()
payment_api_df = spark.read.json(_payment_api_df_rdd)
if not "response_body" in payment_api_df.columns:
raise Exception("Dataframe is empty")
payment_api_df.createOrReplaceTempView('payment_api_df')
# %%
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
_failed_payments_update_mapper_select_clause.append("id AS id")
_failed_payments_update_mapper_select_clause.append("retry_attempt_count + 1 AS retry_attempt_count")
_failed_payments_update_mapper_select_clause.append("TRUE AS batch_processed")
_failed_payments_update_mapper_select_clause.append("{job_id} AS batch_job_id")
_failed_payments_update_mapper_select_clause.append("CASE WHEN response_body.success = \'true\' THEN \'success\' WHEN retry_attempt_count >= 3 THEN \'permanently_failed\' ELSE \'failed\' END AS retry_status")
_failed_payments_update_mapper_select_clause.append("account_id AS account_id")
_failed_payments_update_mapper_select_clause.append("failure_reason AS failure_reason")
_failed_payments_update_mapper_select_clause.append("failure_timestamp AS failure_timestamp")
_failed_payments_update_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
_failed_payments_update_mapper_select_clause.append("created_at AS created_at")
_failed_payments_update_mapper_select_clause.append("payment_id AS payment_id")
_failed_payments_update_mapper_select_clause.append("source_of_failure AS source_of_failure")
_failed_payments_update_mapper_select_clause.append("currency AS currency")
_failed_payments_update_mapper_select_clause.append("gateway AS gateway")
_failed_payments_update_mapper_select_clause.append("payment_method AS payment_method")
_failed_payments_update_mapper_select_clause.append("payment_date AS payment_date")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS last_retry_timestamp")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")
# %%
print(payment_api_df.columns)
success_payment_filter_df = spark.sql("select * from payment_api_df where response_body.success=\'true\'")
success_payment_filter_df.createOrReplaceTempView('success_payment_filter_df')
# %%
_success_payments_mapper_select_clause=success_payment_filter_df.columns if False else []
_success_payments_mapper_select_clause.append("uuid() AS id")
_success_payments_mapper_select_clause.append("account_id AS account_id")
_success_payments_mapper_select_clause.append("currency AS currency")
_success_payments_mapper_select_clause.append("payment_id AS payment_id")
_success_payments_mapper_select_clause.append("gateway AS gateway")
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
_success_payments_mapper_select_clause.append("payment_date AS payment_date")
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS created_at")
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
_success_payments_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payment_filter_df").replace("{job_id}",f"'{job_id}'"))
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
# %%
_failed_payments_update_writer_fields_to_update = failed_payments_update_mapper_df.columns
_failed_payments_update_writer_set_clause=[]
_failed_payments_update_writer_unique_key_clause= []
for _key in ['payment_id']:
_failed_payments_update_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _failed_payments_update_writer_fields_to_update:
if(_field not in _failed_payments_update_writer_unique_key_clause):
_failed_payments_update_writer_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.failedpayments t
USING failed_payments_update_mapper_df s
ON ''' + ' AND '.join(_failed_payments_update_writer_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_failed_payments_update_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
# %%
success_payments_mapper_df.write.mode('append').saveAsTable('dremio.payments')