Workflow saved

This commit is contained in:
unknown
2025-08-29 08:37:31 +00:00
parent 0027602621
commit d2dadaa5f6
3 changed files with 357 additions and 0 deletions

View File

@@ -0,0 +1,164 @@
__generated_with = "0.13.15"
# %%
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
# %%
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
# %%
print(failed_payments_reader_df.columns)
failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\') and payment_id = \'pi_3RvDQqP0JqbrujP90uuhk13h\'")
failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df')
# %%
_payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish'
_payment_api_headers = dict()
for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() :
if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''):
_payment_api_headers[_payment_api_k] = _payment_api_v.get('value')
elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''):
_payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret'))
def _payment_api_call_api_udf(row):
body_dict = row.asDict(recursive=True)
template = Template('''{
"msgData": {
"canDistribute": true,
"payment": [
{
"accountId3": "{{account_id}}",
"currency2": "{{currency}}",
"paymentAmount2": "{{amount}}"
}
],
"paymentDate": "{{payment_date[:10]}}",
"paymentTender": [
{
"currency3": "{{currency}}",
"name": "NAME",
"payorAccountId2": "{{account_id}}",
"tenderAmount": "{{amount}}",
"tenderType2": "CASH"
}
],
"payorAccountId": "{{account_id}}",
"shouldAllPaymentsFreeze": true,
"tndrSrceCd": "CASH-01",
"user": ""
},
"outMsgConfigCode": "EXP_CREATE_PAYMENT"
}''')
body = json.loads(template.render(**body_dict))
print("request : "+ json.dumps(body))
response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={})
data = response.json()
print("response : " + json.dumps(data))
merged = {**body_dict, "response_body": data}
return json.dumps(merged,default=str)
_payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist()
payment_api_df = spark.read.json(_payment_api_df_rdd)
if not "response_body" in payment_api_df.columns:
raise Exception("Dataframe is empty")
payment_api_df.createOrReplaceTempView('payment_api_df')
# %%
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")

View File

@@ -0,0 +1,192 @@
import marimo
__generated_with = "0.13.15"
app = marimo.App()
@app.cell
def init():
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
return Template, job_id, json, requests, secrets, spark
@app.cell
def failed_payments_reader(spark):
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
return (failed_payments_reader_df,)
@app.cell
def failed_payments_filter(failed_payments_reader_df, spark):
print(failed_payments_reader_df.columns)
failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\') and payment_id = \'pi_3RvDQqP0JqbrujP90uuhk13h\'")
failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df')
return (failed_payments_filter_df,)
@app.cell
def payment_api(
Template,
failed_payments_filter_df,
json,
requests,
secrets,
spark,
):
_payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish'
_payment_api_headers = dict()
for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() :
if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''):
_payment_api_headers[_payment_api_k] = _payment_api_v.get('value')
elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''):
_payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret'))
def _payment_api_call_api_udf(row):
body_dict = row.asDict(recursive=True)
template = Template('''{
"msgData": {
"canDistribute": true,
"payment": [
{
"accountId3": "{{account_id}}",
"currency2": "{{currency}}",
"paymentAmount2": "{{amount}}"
}
],
"paymentDate": "{{payment_date[:10]}}",
"paymentTender": [
{
"currency3": "{{currency}}",
"name": "NAME",
"payorAccountId2": "{{account_id}}",
"tenderAmount": "{{amount}}",
"tenderType2": "CASH"
}
],
"payorAccountId": "{{account_id}}",
"shouldAllPaymentsFreeze": true,
"tndrSrceCd": "CASH-01",
"user": ""
},
"outMsgConfigCode": "EXP_CREATE_PAYMENT"
}''')
body = json.loads(template.render(**body_dict))
print("request : "+ json.dumps(body))
response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={})
data = response.json()
print("response : " + json.dumps(data))
merged = {**body_dict, "response_body": data}
return json.dumps(merged,default=str)
_payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist()
payment_api_df = spark.read.json(_payment_api_df_rdd)
if not "response_body" in payment_api_df.columns:
raise Exception("Dataframe is empty")
payment_api_df.createOrReplaceTempView('payment_api_df')
return (payment_api_df,)
@app.cell
def failed_payments_update_mapper(job_id, payment_api_df, spark):
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")
return
if __name__ == "__main__":
app.run()

File diff suppressed because one or more lines are too long