22 Commits

Author SHA1 Message Date
unknown
bf935f3f53 Workflow saved 2025-11-13 07:57:29 +00:00
unknown
681045c4f0 Workflow saved 2025-11-12 05:26:51 +00:00
unknown
331f72d1a7 Workflow saved 2025-09-12 03:24:56 +00:00
unknown
c928074d4e Workflow saved 2025-09-11 09:43:55 +00:00
unknown
2f13af0751 Workflow saved 2025-09-11 09:20:44 +00:00
unknown
2e42932fab Workflow saved 2025-08-29 09:14:36 +00:00
unknown
1b840c8730 Workflow saved 2025-08-29 09:12:47 +00:00
unknown
b255836d13 Workflow saved 2025-08-29 09:09:09 +00:00
unknown
ab9c61fddd Workflow saved 2025-08-29 09:00:50 +00:00
unknown
0dcde9e5cb Workflow saved 2025-08-29 08:56:42 +00:00
unknown
d2dadaa5f6 Workflow saved 2025-08-29 08:37:31 +00:00
unknown
0027602621 Workflow saved 2025-08-29 05:56:22 +00:00
unknown
c5a5dfb7cc Workflow saved 2025-08-28 11:09:19 +00:00
unknown
6722686188 Workflow saved 2025-08-28 11:07:20 +00:00
unknown
0a844e08a1 Workflow saved 2025-08-28 11:00:26 +00:00
unknown
4554a42d83 Workflow saved 2025-08-28 10:09:39 +00:00
172ea4483b Delete __documents/operation_payloads.txt 2025-08-13 12:32:44 +00:00
unknown
588386cab7 Workflow saved 2025-08-13 10:46:53 +00:00
unknown
67e81d79c6 Workflow saved 2025-08-13 10:17:26 +00:00
526a3f705b Update payment_metrics/main.py.notebook, payment_metrics/main.py, payment_metrics/main.workflow 2025-08-13 06:00:37 +00:00
unknown
c7658869d6 Workflow saved 2025-08-13 05:57:33 +00:00
fdc83c60ab Add __documents/operation_payloads.txt 2025-08-13 05:56:36 +00:00
9 changed files with 2850 additions and 0 deletions

View File

@@ -0,0 +1,268 @@
__generated_with = "0.13.15"
# %%
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
# %%
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
# %%
print(failed_payments_reader_df.columns)
failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\')")
failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df')
# %%
_payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish'
_payment_api_headers = dict()
for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() :
if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''):
_payment_api_headers[_payment_api_k] = _payment_api_v.get('value')
elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''):
_payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret'))
def _payment_api_call_api_udf(row):
body_dict = row.asDict(recursive=True)
template = Template('''{
"msgData": {
"canDistribute": true,
"payment": [
{
"accountId3": "{{account_id}}",
"currency2": "{{currency}}",
"paymentAmount2": "{{amount}}"
}
],
"paymentDate": "{{payment_date[:10]}}",
"paymentTender": [
{
"currency3": "{{currency}}",
"name": "NAME",
"payorAccountId2": "{{account_id}}",
"tenderAmount": "{{amount}}",
"tenderType2": "CASH"
}
],
"payorAccountId": "{{account_id}}",
"shouldAllPaymentsFreeze": true,
"tndrSrceCd": "CASH-01",
"user": ""
},
"outMsgConfigCode": "EXP_CREATE_PAYMENT"
}''')
body = json.loads(template.render(**body_dict))
print("request : "+ json.dumps(body))
response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={})
data = response.json()
print("response : " + json.dumps(data))
merged = {**body_dict, "response_body": data}
return json.dumps(merged,default=str)
_payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist()
payment_api_df = spark.read.json(_payment_api_df_rdd)
if not "response_body" in payment_api_df.columns:
raise Exception("Dataframe is empty")
payment_api_df.createOrReplaceTempView('payment_api_df')
# %%
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
_failed_payments_update_mapper_select_clause.append("id AS id")
_failed_payments_update_mapper_select_clause.append("retry_attempt_count + 1 AS retry_attempt_count")
_failed_payments_update_mapper_select_clause.append("TRUE AS batch_processed")
_failed_payments_update_mapper_select_clause.append("{job_id} AS batch_job_id")
_failed_payments_update_mapper_select_clause.append("CASE WHEN response_body.success = \'true\' THEN \'success\' WHEN retry_attempt_count >= 3 THEN \'permanently_failed\' ELSE \'failed\' END AS retry_status")
_failed_payments_update_mapper_select_clause.append("account_id AS account_id")
_failed_payments_update_mapper_select_clause.append("failure_reason AS failure_reason")
_failed_payments_update_mapper_select_clause.append("failure_timestamp AS failure_timestamp")
_failed_payments_update_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
_failed_payments_update_mapper_select_clause.append("created_at AS created_at")
_failed_payments_update_mapper_select_clause.append("payment_id AS payment_id")
_failed_payments_update_mapper_select_clause.append("source_of_failure AS source_of_failure")
_failed_payments_update_mapper_select_clause.append("currency AS currency")
_failed_payments_update_mapper_select_clause.append("gateway AS gateway")
_failed_payments_update_mapper_select_clause.append("payment_method AS payment_method")
_failed_payments_update_mapper_select_clause.append("payment_date AS payment_date")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS last_retry_timestamp")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")
# %%
print(payment_api_df.columns)
success_payment_filter_df = spark.sql("select * from payment_api_df where response_body.success=\'true\'")
success_payment_filter_df.createOrReplaceTempView('success_payment_filter_df')
# %%
_success_payments_mapper_select_clause=success_payment_filter_df.columns if False else []
_success_payments_mapper_select_clause.append("uuid() AS id")
_success_payments_mapper_select_clause.append("account_id AS account_id")
_success_payments_mapper_select_clause.append("currency AS currency")
_success_payments_mapper_select_clause.append("payment_id AS payment_id")
_success_payments_mapper_select_clause.append("gateway AS gateway")
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
_success_payments_mapper_select_clause.append("payment_date AS payment_date")
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS created_at")
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
_success_payments_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payment_filter_df").replace("{job_id}",f"'{job_id}'"))
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
# %%
_failed_payments_update_writer_fields_to_update = failed_payments_update_mapper_df.columns
_failed_payments_update_writer_set_clause=[]
_failed_payments_update_writer_unique_key_clause= []
for _key in ['payment_id']:
_failed_payments_update_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _failed_payments_update_writer_fields_to_update:
if(_field not in _failed_payments_update_writer_unique_key_clause):
_failed_payments_update_writer_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.failedpayments t
USING failed_payments_update_mapper_df s
ON ''' + ' AND '.join(_failed_payments_update_writer_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_failed_payments_update_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
# %%
success_payments_mapper_df.write.mode('append').saveAsTable('dremio.payments')

View File

@@ -0,0 +1,308 @@
import marimo
__generated_with = "0.13.15"
app = marimo.App()
@app.cell
def init():
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
return Template, job_id, json, requests, secrets, spark
@app.cell
def failed_payments_reader(spark):
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
return (failed_payments_reader_df,)
@app.cell
def failed_payments_filter(failed_payments_reader_df, spark):
print(failed_payments_reader_df.columns)
failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\')")
failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df')
return (failed_payments_filter_df,)
@app.cell
def payment_api(
Template,
failed_payments_filter_df,
json,
requests,
secrets,
spark,
):
_payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish'
_payment_api_headers = dict()
for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() :
if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''):
_payment_api_headers[_payment_api_k] = _payment_api_v.get('value')
elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''):
_payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret'))
def _payment_api_call_api_udf(row):
body_dict = row.asDict(recursive=True)
template = Template('''{
"msgData": {
"canDistribute": true,
"payment": [
{
"accountId3": "{{account_id}}",
"currency2": "{{currency}}",
"paymentAmount2": "{{amount}}"
}
],
"paymentDate": "{{payment_date[:10]}}",
"paymentTender": [
{
"currency3": "{{currency}}",
"name": "NAME",
"payorAccountId2": "{{account_id}}",
"tenderAmount": "{{amount}}",
"tenderType2": "CASH"
}
],
"payorAccountId": "{{account_id}}",
"shouldAllPaymentsFreeze": true,
"tndrSrceCd": "CASH-01",
"user": ""
},
"outMsgConfigCode": "EXP_CREATE_PAYMENT"
}''')
body = json.loads(template.render(**body_dict))
print("request : "+ json.dumps(body))
response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={})
data = response.json()
print("response : " + json.dumps(data))
merged = {**body_dict, "response_body": data}
return json.dumps(merged,default=str)
_payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist()
payment_api_df = spark.read.json(_payment_api_df_rdd)
if not "response_body" in payment_api_df.columns:
raise Exception("Dataframe is empty")
payment_api_df.createOrReplaceTempView('payment_api_df')
return (payment_api_df,)
@app.cell
def failed_payments_update_mapper(job_id, payment_api_df, spark):
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
_failed_payments_update_mapper_select_clause.append("id AS id")
_failed_payments_update_mapper_select_clause.append("retry_attempt_count + 1 AS retry_attempt_count")
_failed_payments_update_mapper_select_clause.append("TRUE AS batch_processed")
_failed_payments_update_mapper_select_clause.append("{job_id} AS batch_job_id")
_failed_payments_update_mapper_select_clause.append("CASE WHEN response_body.success = \'true\' THEN \'success\' WHEN retry_attempt_count >= 3 THEN \'permanently_failed\' ELSE \'failed\' END AS retry_status")
_failed_payments_update_mapper_select_clause.append("account_id AS account_id")
_failed_payments_update_mapper_select_clause.append("failure_reason AS failure_reason")
_failed_payments_update_mapper_select_clause.append("failure_timestamp AS failure_timestamp")
_failed_payments_update_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
_failed_payments_update_mapper_select_clause.append("created_at AS created_at")
_failed_payments_update_mapper_select_clause.append("payment_id AS payment_id")
_failed_payments_update_mapper_select_clause.append("source_of_failure AS source_of_failure")
_failed_payments_update_mapper_select_clause.append("currency AS currency")
_failed_payments_update_mapper_select_clause.append("gateway AS gateway")
_failed_payments_update_mapper_select_clause.append("payment_method AS payment_method")
_failed_payments_update_mapper_select_clause.append("payment_date AS payment_date")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS last_retry_timestamp")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")
return (failed_payments_update_mapper_df,)
@app.cell
def success_payment_filter(payment_api_df, spark):
print(payment_api_df.columns)
success_payment_filter_df = spark.sql("select * from payment_api_df where response_body.success=\'true\'")
success_payment_filter_df.createOrReplaceTempView('success_payment_filter_df')
return (success_payment_filter_df,)
@app.cell
def success_payments_mapper(job_id, spark, success_payment_filter_df):
_success_payments_mapper_select_clause=success_payment_filter_df.columns if False else []
_success_payments_mapper_select_clause.append("uuid() AS id")
_success_payments_mapper_select_clause.append("account_id AS account_id")
_success_payments_mapper_select_clause.append("currency AS currency")
_success_payments_mapper_select_clause.append("payment_id AS payment_id")
_success_payments_mapper_select_clause.append("gateway AS gateway")
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
_success_payments_mapper_select_clause.append("payment_date AS payment_date")
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS created_at")
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
_success_payments_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payment_filter_df").replace("{job_id}",f"'{job_id}'"))
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
return (success_payments_mapper_df,)
@app.cell
def failed_payments_update_writer(failed_payments_update_mapper_df, spark):
_failed_payments_update_writer_fields_to_update = failed_payments_update_mapper_df.columns
_failed_payments_update_writer_set_clause=[]
_failed_payments_update_writer_unique_key_clause= []
for _key in ['payment_id']:
_failed_payments_update_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _failed_payments_update_writer_fields_to_update:
if(_field not in _failed_payments_update_writer_unique_key_clause):
_failed_payments_update_writer_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.failedpayments t
USING failed_payments_update_mapper_df s
ON ''' + ' AND '.join(_failed_payments_update_writer_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_failed_payments_update_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
return
@app.cell
def success_payments_writer(success_payments_mapper_df):
success_payments_mapper_df.write.mode('append').saveAsTable('dremio.payments')
return
if __name__ == "__main__":
app.run()

File diff suppressed because one or more lines are too long

842
payment_metrics/main.py Normal file
View File

@@ -0,0 +1,842 @@
__generated_with = "0.13.15"
# %%
import sys
import time
from pyspark.sql.utils import AnalysisException
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from util import get_logger, observe_metrics, collect_metrics, log_info, log_error, forgiving_serializer
from pyspark.sql.functions import udf
from pyspark.sql.functions import count, expr, lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
from pyspark.sql.observation import Observation
from pyspark import StorageLevel
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import approx_count_distinct, avg, collect_list, collect_set, corr, count, countDistinct, covar_pop, covar_samp, first, kurtosis, last, max, mean, min, skewness, stddev, stddev_pop, stddev_samp, sum, var_pop, var_samp, variance,expr,to_json,struct, date_format, col, lit, when, regexp_replace, ltrim
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from FilesystemManager import FilesystemManager, SupportedFilesystemType
from Materialization import Materialization
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
init_start_time=time.time()
LOGGER = get_logger()
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
workflow = 'payment_metrics'
execution_environment = os.getenv('EXECUTION_ENVIRONMENT') or 'CLUSTER'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
retry_job_id = os.getenv("RETRY_EXECUTION_ID") or ''
log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}', Retry Job Id: '{retry_job_id}'")
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
filesystemManager = FilesystemManager.create(secrets.get('LAKEHOUSE_BUCKET'), storage_options={'key': secrets.get('S3_ACCESS_KEY'), 'secret': secrets.get('S3_SECRET_KEY'), 'region': secrets.get('S3_REGION')})
if retry_job_id:
logs = Materialization.get_execution_history_by_job_id(filesystemManager, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, retry_job_id, selected_components=['finalize']).to_dicts()
if len(logs) == 1 and logs[0].get('metrics').get('execute_status') == 'SUCCESS':
log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}' - Retry Job Id: '{retry_job_id}' was already successful. Hence exiting to forward processing to next in chain.")
sys.exit(0)
_conf = SparkConf()
_params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": secrets.get("S3_REGION") or "None",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"spark.hadoop.fs.s3.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0,ch.cern.sparkmeasure:spark-measure_2.12:0.26"
}
_conf.setAll(list(_params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=_conf).getOrCreate()
bootstrap_udfs(spark)
materialization = Materialization(spark, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, job_id, retry_job_id, execution_environment, LOGGER)
init_dependency_key="init"
init_end_time=time.time()
# %%
success_payments_reader_start_time=time.time()
success_payments_reader_fail_on_error=""
try:
success_payments_reader_df = spark.read.table('dremio.payments')
success_payments_reader_df, success_payments_reader_observer = observe_metrics("success_payments_reader_df", success_payments_reader_df)
success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df')
success_payments_reader_dependency_key="success_payments_reader"
success_payments_reader_execute_status="SUCCESS"
except Exception as e:
success_payments_reader_error = e
log_error(LOGGER, f"Component success_payments_reader Failed", e)
success_payments_reader_execute_status="ERROR"
raise e
finally:
success_payments_reader_end_time=time.time()
# %%
success_payments_mapper_start_time=time.time()
success_payments_mapper_fail_on_error=""
try:
_success_payments_mapper_select_clause=success_payments_reader_df.columns if False else []
_success_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''')
_success_payments_mapper_select_clause.append('''amount AS amount''')
_success_payments_mapper_select_clause.append('''gateway AS gateway''')
_success_payments_mapper_select_clause.append('''payment_method AS payment_method''')
try:
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
except Exception as e:
success_payments_mapper_df = success_payments_reader_df.limit(0)
log_info(LOGGER, f"error while mapping the data :{e} " )
success_payments_mapper_df, success_payments_mapper_observer = observe_metrics("success_payments_mapper_df", success_payments_mapper_df)
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
success_payments_mapper_dependency_key="success_payments_mapper"
success_payments_mapper_execute_status="SUCCESS"
except Exception as e:
success_payments_mapper_error = e
log_error(LOGGER, f"Component success_payments_mapper Failed", e)
success_payments_mapper_execute_status="ERROR"
raise e
finally:
success_payments_mapper_end_time=time.time()
# %%
final_success_payments_start_time=time.time()
print(success_payments_mapper_df.columns)
final_success_payments_fail_on_error=""
try:
try:
final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'")
except AnalysisException as e:
log_info(LOGGER, f"error while filtering data : {e}")
final_success_payments_df = success_payments_mapper_df.limit(0)
except Exception as e:
log_info(LOGGER, f"Unexpected error: {e}")
final_success_payments_df = success_payments_mapper_df.limit(0)
final_success_payments_df, final_success_payments_observer = observe_metrics("final_success_payments_df", final_success_payments_df)
final_success_payments_df.createOrReplaceTempView('final_success_payments_df')
final_success_payments_dependency_key="final_success_payments"
final_success_payments_execute_status="SUCCESS"
except Exception as e:
final_success_payments_error = e
log_error(LOGGER, f"Component final_success_payments Failed", e)
final_success_payments_execute_status="ERROR"
raise e
finally:
final_success_payments_end_time=time.time()
# %%
high_valued_payments_filter_start_time=time.time()
print(final_success_payments_df.columns)
high_valued_payments_filter_fail_on_error=""
try:
try:
high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500")
except AnalysisException as e:
log_info(LOGGER, f"error while filtering data : {e}")
high_valued_payments_filter_df = final_success_payments_df.limit(0)
except Exception as e:
log_info(LOGGER, f"Unexpected error: {e}")
high_valued_payments_filter_df = final_success_payments_df.limit(0)
high_valued_payments_filter_df, high_valued_payments_filter_observer = observe_metrics("high_valued_payments_filter_df", high_valued_payments_filter_df)
high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df')
high_valued_payments_filter_dependency_key="high_valued_payments_filter"
high_valued_payments_filter_execute_status="SUCCESS"
except Exception as e:
high_valued_payments_filter_error = e
log_error(LOGGER, f"Component high_valued_payments_filter Failed", e)
high_valued_payments_filter_execute_status="ERROR"
raise e
finally:
high_valued_payments_filter_end_time=time.time()
# %%
total_payments_and_total_value_processed_start_time=time.time()
total_payments_and_total_value_processed_fail_on_error="True"
try:
total_payments_and_total_value_processed_df = final_success_payments_df.groupBy(
"payment_date"
).agg(
count('*').alias("total_payments"),
sum('amount').alias("total_value_processed")
)
total_payments_and_total_value_processed_df, total_payments_and_total_value_processed_observer = observe_metrics("total_payments_and_total_value_processed_df", total_payments_and_total_value_processed_df)
total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df')
total_payments_and_total_value_processed_dependency_key="total_payments_and_total_value_processed"
print(final_success_payments_dependency_key)
total_payments_and_total_value_processed_execute_status="SUCCESS"
except Exception as e:
total_payments_and_total_value_processed_error = e
log_error(LOGGER, f"Component total_payments_and_total_value_processed Failed", e)
total_payments_and_total_value_processed_execute_status="ERROR"
raise e
finally:
total_payments_and_total_value_processed_end_time=time.time()
# %%
aggregate__4_start_time=time.time()
aggregate__4_fail_on_error="True"
try:
aggregate__4_df = final_success_payments_df.groupBy(
"payment_date",
"payment_method"
).agg(
count('*').alias("method_count")
)
aggregate__4_df, aggregate__4_observer = observe_metrics("aggregate__4_df", aggregate__4_df)
aggregate__4_df.createOrReplaceTempView('aggregate__4_df')
aggregate__4_dependency_key="aggregate__4"
print(final_success_payments_dependency_key)
aggregate__4_execute_status="SUCCESS"
except Exception as e:
aggregate__4_error = e
log_error(LOGGER, f"Component aggregate__4 Failed", e)
aggregate__4_execute_status="ERROR"
raise e
finally:
aggregate__4_end_time=time.time()
# %%
data_mapper__5_start_time=time.time()
data_mapper__5_fail_on_error=""
try:
_data_mapper__5_select_clause=aggregate__4_df.columns if False else []
_data_mapper__5_select_clause.append('''payment_date AS payment_date''')
_data_mapper__5_select_clause.append('''payment_method AS payment_method''')
_data_mapper__5_select_clause.append('''method_count AS method_count''')
_data_mapper__5_select_clause.append('''RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method''')
try:
data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'"))
except Exception as e:
data_mapper__5_df = aggregate__4_df.limit(0)
log_info(LOGGER, f"error while mapping the data :{e} " )
data_mapper__5_df, data_mapper__5_observer = observe_metrics("data_mapper__5_df", data_mapper__5_df)
data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df")
data_mapper__5_dependency_key="data_mapper__5"
data_mapper__5_execute_status="SUCCESS"
except Exception as e:
data_mapper__5_error = e
log_error(LOGGER, f"Component data_mapper__5 Failed", e)
data_mapper__5_execute_status="ERROR"
raise e
finally:
data_mapper__5_end_time=time.time()
# %%
filter__6_start_time=time.time()
print(data_mapper__5_df.columns)
filter__6_fail_on_error=""
try:
try:
filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1")
except AnalysisException as e:
log_info(LOGGER, f"error while filtering data : {e}")
filter__6_df = data_mapper__5_df.limit(0)
except Exception as e:
log_info(LOGGER, f"Unexpected error: {e}")
filter__6_df = data_mapper__5_df.limit(0)
filter__6_df, filter__6_observer = observe_metrics("filter__6_df", filter__6_df)
filter__6_df.createOrReplaceTempView('filter__6_df')
filter__6_dependency_key="filter__6"
filter__6_execute_status="SUCCESS"
except Exception as e:
filter__6_error = e
log_error(LOGGER, f"Component filter__6 Failed", e)
filter__6_execute_status="ERROR"
raise e
finally:
filter__6_end_time=time.time()
# %%
most_used_payment_method___start_time=time.time()
most_used_payment_method___fail_on_error=""
try:
_most_used_payment_method___select_clause=filter__6_df.columns if False else []
_most_used_payment_method___select_clause.append('''payment_date AS payment_date''')
_most_used_payment_method___select_clause.append('''payment_method AS most_used_payment_method''')
try:
most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'"))
except Exception as e:
most_used_payment_method___df = filter__6_df.limit(0)
log_info(LOGGER, f"error while mapping the data :{e} " )
most_used_payment_method___df, most_used_payment_method___observer = observe_metrics("most_used_payment_method___df", most_used_payment_method___df)
most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df")
most_used_payment_method___dependency_key="most_used_payment_method__"
most_used_payment_method___execute_status="SUCCESS"
except Exception as e:
most_used_payment_method___error = e
log_error(LOGGER, f"Component most_used_payment_method__ Failed", e)
most_used_payment_method___execute_status="ERROR"
raise e
finally:
most_used_payment_method___end_time=time.time()
# %%
high_valued_payments___start_time=time.time()
high_valued_payments___fail_on_error="True"
try:
high_valued_payments___df = high_valued_payments_filter_df.groupBy(
"payment_date"
).agg(
count('*').alias("high_valued_payments")
)
high_valued_payments___df, high_valued_payments___observer = observe_metrics("high_valued_payments___df", high_valued_payments___df)
high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df')
high_valued_payments___dependency_key="high_valued_payments__"
print(high_valued_payments_filter_dependency_key)
high_valued_payments___execute_status="SUCCESS"
except Exception as e:
high_valued_payments___error = e
log_error(LOGGER, f"Component high_valued_payments__ Failed", e)
high_valued_payments___execute_status="ERROR"
raise e
finally:
high_valued_payments___end_time=time.time()
# %%
failed_payments_reader_start_time=time.time()
failed_payments_reader_fail_on_error=""
try:
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
failed_payments_reader_df, failed_payments_reader_observer = observe_metrics("failed_payments_reader_df", failed_payments_reader_df)
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
failed_payments_reader_dependency_key="failed_payments_reader"
failed_payments_reader_execute_status="SUCCESS"
except Exception as e:
failed_payments_reader_error = e
log_error(LOGGER, f"Component failed_payments_reader Failed", e)
failed_payments_reader_execute_status="ERROR"
raise e
finally:
failed_payments_reader_end_time=time.time()
# %%
failed_payments_mapper_start_time=time.time()
failed_payments_mapper_fail_on_error=""
try:
_failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else []
_failed_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''')
_failed_payments_mapper_select_clause.append('''payment_method AS payment_method''')
_failed_payments_mapper_select_clause.append('''failure_reason AS failure_reason''')
_failed_payments_mapper_select_clause.append('''gateway AS gateway''')
try:
failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
except Exception as e:
failed_payments_mapper_df = failed_payments_reader_df.limit(0)
log_info(LOGGER, f"error while mapping the data :{e} " )
failed_payments_mapper_df, failed_payments_mapper_observer = observe_metrics("failed_payments_mapper_df", failed_payments_mapper_df)
failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df")
failed_payments_mapper_dependency_key="failed_payments_mapper"
failed_payments_mapper_execute_status="SUCCESS"
except Exception as e:
failed_payments_mapper_error = e
log_error(LOGGER, f"Component failed_payments_mapper Failed", e)
failed_payments_mapper_execute_status="ERROR"
raise e
finally:
failed_payments_mapper_end_time=time.time()
# %%
final_failed_payments_start_time=time.time()
print(failed_payments_mapper_df.columns)
final_failed_payments_df = spark.sql("select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))")
final_failed_payments_df.createOrReplaceTempView('final_failed_payments_df')
final_failed_payments_df.persist()
final_failed_payments_end_time=time.time()
final_failed_payments_dependency_key="final_failed_payments"
print(failed_payments_mapper_dependency_key)
# %%
filter__13_start_time=time.time()
print(final_failed_payments_df.columns)
filter__13_fail_on_error=""
try:
try:
filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'")
except AnalysisException as e:
log_info(LOGGER, f"error while filtering data : {e}")
filter__13_df = final_failed_payments_df.limit(0)
except Exception as e:
log_info(LOGGER, f"Unexpected error: {e}")
filter__13_df = final_failed_payments_df.limit(0)
filter__13_df, filter__13_observer = observe_metrics("filter__13_df", filter__13_df)
filter__13_df.createOrReplaceTempView('filter__13_df')
filter__13_dependency_key="filter__13"
filter__13_execute_status="SUCCESS"
except Exception as e:
filter__13_error = e
log_error(LOGGER, f"Component filter__13 Failed", e)
filter__13_execute_status="ERROR"
raise e
finally:
filter__13_end_time=time.time()
# %%
total_failed_payments___start_time=time.time()
total_failed_payments___fail_on_error="True"
try:
total_failed_payments___df = filter__13_df.groupBy(
"payment_date"
).agg(
count('*').alias("total_failed_payments")
)
total_failed_payments___df, total_failed_payments___observer = observe_metrics("total_failed_payments___df", total_failed_payments___df)
total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df')
total_failed_payments___dependency_key="total_failed_payments__"
print(filter__13_dependency_key)
total_failed_payments___execute_status="SUCCESS"
except Exception as e:
total_failed_payments___error = e
log_error(LOGGER, f"Component total_failed_payments__ Failed", e)
total_failed_payments___execute_status="ERROR"
raise e
finally:
total_failed_payments___end_time=time.time()
# %%
failed_payment_metrics_start_time=time.time()
failed_payment_metrics_fail_on_error="True"
try:
failed_payment_metrics_df = final_failed_payments_df.groupBy(
"payment_date",
"gateway",
"failure_reason"
).agg(
count('*').alias("failure_count")
)
failed_payment_metrics_df, failed_payment_metrics_observer = observe_metrics("failed_payment_metrics_df", failed_payment_metrics_df)
failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df')
failed_payment_metrics_dependency_key="failed_payment_metrics"
print(final_failed_payments_dependency_key)
failed_payment_metrics_execute_status="SUCCESS"
except Exception as e:
failed_payment_metrics_error = e
log_error(LOGGER, f"Component failed_payment_metrics Failed", e)
failed_payment_metrics_execute_status="ERROR"
raise e
finally:
failed_payment_metrics_end_time=time.time()
# %%
data_writer__15_start_time=time.time()
data_writer__15_fail_on_error=""
try:
_data_writer__15_fields_to_update = failed_payment_metrics_df.columns
_data_writer__15_set_clause=[]
_data_writer__15_unique_key_clause= []
for _key in ['payment_date', 'gateway', 'failure_reason']:
_data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _data_writer__15_fields_to_update:
if(_field not in _data_writer__15_unique_key_clause):
_data_writer__15_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.failedpaymentmetrics t
USING failed_payment_metrics_df s
ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
data_writer__15_dependency_key="data_writer__15"
print(failed_payment_metrics_dependency_key)
data_writer__15_execute_status="SUCCESS"
except Exception as e:
data_writer__15_error = e
log_error(LOGGER, f"Component data_writer__15 Failed", e)
data_writer__15_execute_status="ERROR"
raise e
finally:
data_writer__15_end_time=time.time()
# %%
success_payment_metrics_start_time=time.time()
print(total_payments_and_total_value_processed_df.columns)
print(most_used_payment_method___df.columns)
print(high_valued_payments___df.columns)
print(total_failed_payments___df.columns)
success_payment_metrics_df = spark.sql("""
SELECT
COALESCE(a.payment_date, d.payment_date) AS payment_date,
a.total_payments,
a.total_value_processed,
b.most_used_payment_method,
c.high_valued_payments,
d.total_failed_payments
FROM total_failed_payments___df d
FULL OUTER JOIN total_payments_and_total_value_processed_df a
ON a.payment_date = d.payment_date
LEFT JOIN most_used_payment_method___df b
ON a.payment_date = b.payment_date
LEFT JOIN high_valued_payments___df c
ON a.payment_date = c.payment_date
""")
success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df')
success_payment_metrics_end_time=time.time()
success_payment_metrics_dependency_key="success_payment_metrics"
# %%
success_payment_metrics_writer_start_time=time.time()
success_payment_metrics_writer_fail_on_error=""
try:
_success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns
_success_payment_metrics_writer_set_clause=[]
_success_payment_metrics_writer_unique_key_clause= []
for _key in ['payment_date']:
_success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _success_payment_metrics_writer_fields_to_update:
if(_field not in _success_payment_metrics_writer_unique_key_clause):
_success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.successpaymentmetrics t
USING success_payment_metrics_df s
ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
success_payment_metrics_writer_dependency_key="success_payment_metrics_writer"
success_payment_metrics_writer_execute_status="SUCCESS"
except Exception as e:
success_payment_metrics_writer_error = e
log_error(LOGGER, f"Component success_payment_metrics_writer Failed", e)
success_payment_metrics_writer_execute_status="ERROR"
raise e
finally:
success_payment_metrics_writer_end_time=time.time()
# %%
finalize_start_time=time.time()
metrics = {
'data': collect_metrics(locals()),
}
materialization.materialized_execution_history({'finalize': {'execute_status': 'SUCCESS', 'fail_on_error': 'False'}, **metrics['data']})
log_info(LOGGER, f"Workflow Data metrics: {metrics['data']}")
finalize_end_time=time.time()
spark.stop()

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,167 @@
__generated_with = "0.13.15"
# %%
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
# %%
actions_audit_reader_df = spark.read.table('dremio.actionsaudit')
actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df')
# %%
_actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else []
_actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date")
_actions_audit_mapper_select_clause.append("sub_category AS service_type")
_actions_audit_mapper_select_clause.append("action_count AS action_count")
actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'"))
actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df")
# %%
print(actions_audit_mapper_df.columns)
actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))")
actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df')
# %%
_params = {
"datasource": "actions_audit_filter",
"selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df,
group_expression="action_date, service_type",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials)
aggregate__3_df.createOrReplaceTempView('aggregate__3_df')
# %%
_data_writer__5_fields_to_update = aggregate__3_df.columns
_data_writer__5_set_clause=[]
_data_writer__5_unique_key_clause= []
for _key in ['action_date', 'service_type']:
_data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _data_writer__5_fields_to_update:
if(_field not in _data_writer__5_unique_key_clause):
_data_writer__5_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.servicemetrics t
USING aggregate__3_df s
ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)

View File

@@ -0,0 +1,197 @@
import marimo
__generated_with = "0.13.15"
app = marimo.App()
@app.cell
def init():
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
return expr, job_id, lit, preprocess_then_expand, reduce, spark
@app.cell
def actions_audit_reader(spark):
actions_audit_reader_df = spark.read.table('dremio.actionsaudit')
actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df')
return (actions_audit_reader_df,)
@app.cell
def actions_audit_mapper(actions_audit_reader_df, job_id, spark):
_actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else []
_actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date")
_actions_audit_mapper_select_clause.append("sub_category AS service_type")
_actions_audit_mapper_select_clause.append("action_count AS action_count")
actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'"))
actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df")
return (actions_audit_mapper_df,)
@app.cell
def actions_audit_filter(actions_audit_mapper_df, spark):
print(actions_audit_mapper_df.columns)
actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))")
actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df')
return (actions_audit_filter_df,)
@app.cell
def aggregate__3(
actions_audit_filter_df,
expr,
lit,
preprocess_then_expand,
reduce,
):
_params = {
"datasource": "actions_audit_filter",
"selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df,
group_expression="action_date, service_type",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials)
aggregate__3_df.createOrReplaceTempView('aggregate__3_df')
return (aggregate__3_df,)
@app.cell
def data_writer__5(aggregate__3_df, spark):
_data_writer__5_fields_to_update = aggregate__3_df.columns
_data_writer__5_set_clause=[]
_data_writer__5_unique_key_clause= []
for _key in ['action_date', 'service_type']:
_data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _data_writer__5_fields_to_update:
if(_field not in _data_writer__5_unique_key_clause):
_data_writer__5_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.servicemetrics t
USING aggregate__3_df s
ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
return
if __name__ == "__main__":
app.run()

File diff suppressed because one or more lines are too long