Files
tenant1/payment_metrics/main.py
2025-09-11 09:20:44 +00:00

482 lines
15 KiB
Python

__generated_with = "0.13.15"
# %%
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
# %%
success_payments_reader_df = spark.read.table('dremio.payments')
success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df')
# %%
_success_payments_mapper_select_clause=success_payments_reader_df.columns if False else []
_success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
_success_payments_mapper_select_clause.append("amount AS amount")
_success_payments_mapper_select_clause.append("gateway AS gateway")
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
# %%
print(success_payments_mapper_df.columns)
final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'")
final_success_payments_df.createOrReplaceTempView('final_success_payments_df')
# %%
print(final_success_payments_df.columns)
high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500")
high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df')
# %%
_params = {
"datasource": "final_success_payments",
"selectFunctions" : [{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df,
group_expression="payment_date",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
total_payments_and_total_value_processed_df = reduce(lambda a, b: a.unionByName(b), _partials)
total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df')
# %%
_params = {
"datasource": "final_success_payments",
"selectFunctions" : [{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df,
group_expression="payment_date, payment_method",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
aggregate__4_df = reduce(lambda a, b: a.unionByName(b), _partials)
aggregate__4_df.createOrReplaceTempView('aggregate__4_df')
# %%
_data_mapper__5_select_clause=aggregate__4_df.columns if False else []
_data_mapper__5_select_clause.append("payment_date AS payment_date")
_data_mapper__5_select_clause.append("payment_method AS payment_method")
_data_mapper__5_select_clause.append("method_count AS method_count")
_data_mapper__5_select_clause.append("RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method")
data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'"))
data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df")
# %%
print(data_mapper__5_df.columns)
filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1")
filter__6_df.createOrReplaceTempView('filter__6_df')
# %%
_most_used_payment_method___select_clause=filter__6_df.columns if False else []
_most_used_payment_method___select_clause.append("payment_date AS payment_date")
_most_used_payment_method___select_clause.append("payment_method AS most_used_payment_method")
most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'"))
most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df")
# %%
_params = {
"datasource": "high_valued_payments_filter",
"selectFunctions" : [{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( high_valued_payments_filter_df,
group_expression="payment_date",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
high_valued_payments___df = reduce(lambda a, b: a.unionByName(b), _partials)
high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df')
# %%
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
# %%
_failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else []
_failed_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
_failed_payments_mapper_select_clause.append("payment_method AS payment_method")
_failed_payments_mapper_select_clause.append("failure_reason AS failure_reason")
_failed_payments_mapper_select_clause.append("gateway AS gateway")
failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df")
# %%
print(final_failed_payments_df.columns)
filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'")
filter__13_df.createOrReplaceTempView('filter__13_df')
# %%
_params = {
"datasource": "filter__13",
"selectFunctions" : [{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( filter__13_df,
group_expression="payment_date",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
total_failed_payments___df = reduce(lambda a, b: a.unionByName(b), _partials)
total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df')
# %%
_params = {
"datasource": "final_failed_payments",
"selectFunctions" : [{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_failed_payments_df,
group_expression="payment_date, gateway, failure_reason",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
failed_payment_metrics_df = reduce(lambda a, b: a.unionByName(b), _partials)
failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df')
# %%
_data_writer__15_fields_to_update = failed_payment_metrics_df.columns
_data_writer__15_set_clause=[]
_data_writer__15_unique_key_clause= []
for _key in ['payment_date', 'gateway', 'failure_reason']:
_data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _data_writer__15_fields_to_update:
if(_field not in _data_writer__15_unique_key_clause):
_data_writer__15_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.failedpaymentmetrics t
USING failed_payment_metrics_df s
ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
# %%
print(total_payments_and_total_value_processed_df.columns)
print(most_used_payment_method___df.columns)
print(high_valued_payments___df.columns)
print(total_failed_payments___df.columns)
success_payment_metrics_df = spark.sql("""
SELECT
COALESCE(a.payment_date, d.payment_date) AS payment_date,
a.total_payments,
a.total_value_processed,
b.most_used_payment_method,
c.high_valued_payments,
d.total_failed_payments
FROM total_failed_payments___df d
FULL OUTER JOIN total_payments_and_total_value_processed_df a
ON a.payment_date = d.payment_date
LEFT JOIN most_used_payment_method___df b
ON a.payment_date = b.payment_date
LEFT JOIN high_valued_payments___df c
ON a.payment_date = c.payment_date
""")
success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df')
# %%
_success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns
_success_payment_metrics_writer_set_clause=[]
_success_payment_metrics_writer_unique_key_clause= []
for _key in ['payment_date']:
_success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _success_payment_metrics_writer_fields_to_update:
if(_field not in _success_payment_metrics_writer_unique_key_clause):
_success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.successpaymentmetrics t
USING success_payment_metrics_df s
ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
# %%
print(FailedPaymentsData_df.columns)
LatestFailedPayments_df = spark.sql("select * from FailedPaymentsData_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))")
failed_payments_df.createOrReplaceTempView('failed_payments_df')
failed_payments_df.persist()