583 lines
18 KiB
Plaintext
583 lines
18 KiB
Plaintext
import marimo
|
|
|
|
__generated_with = "0.13.15"
|
|
app = marimo.App()
|
|
|
|
|
|
@app.cell
|
|
def init():
|
|
|
|
import sys
|
|
sys.path.append('/opt/spark/work-dir/')
|
|
from workflow_templates.spark.udf_manager import bootstrap_udfs
|
|
from pyspark.sql.functions import udf
|
|
from pyspark.sql.functions import lit
|
|
from pyspark.sql.types import StringType, IntegerType
|
|
import uuid
|
|
from pathlib import Path
|
|
from pyspark import SparkConf, Row
|
|
from pyspark.sql import SparkSession
|
|
import os
|
|
import pandas as pd
|
|
import polars as pl
|
|
import pyarrow as pa
|
|
from pyspark.sql.functions import expr,to_json,col,struct
|
|
from functools import reduce
|
|
from handle_structs_or_arrays import preprocess_then_expand
|
|
import requests
|
|
from jinja2 import Template
|
|
import json
|
|
|
|
|
|
from secrets_manager import SecretsManager
|
|
|
|
from WorkflowManager import WorkflowDSL, WorkflowManager
|
|
from KnowledgebaseManager import KnowledgebaseManager
|
|
from gitea_client import GiteaClient, WorkspaceVersionedContent
|
|
|
|
from dremio.flight.endpoint import DremioFlightEndpoint
|
|
from dremio.flight.query import DremioFlightEndpointQuery
|
|
|
|
|
|
alias_str='abcdefghijklmnopqrstuvwxyz'
|
|
workspace = os.getenv('WORKSPACE') or 'exp360cust'
|
|
|
|
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
|
|
|
|
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
|
|
secrets = sm.list_secrets(workspace)
|
|
|
|
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
|
|
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
|
|
conf = SparkConf()
|
|
params = {
|
|
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
|
|
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
|
|
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
|
|
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
|
|
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
|
|
"spark.sql.catalog.dremio.type" : "hadoop",
|
|
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
|
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
|
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
|
|
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
|
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
|
|
}
|
|
|
|
|
|
|
|
conf.setAll(list(params.items()))
|
|
|
|
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
|
|
bootstrap_udfs(spark)
|
|
return expr, job_id, lit, preprocess_then_expand, reduce, spark
|
|
|
|
|
|
@app.cell
|
|
def success_payments_reader(spark):
|
|
|
|
|
|
|
|
success_payments_reader_df = spark.read.table('dremio.payments')
|
|
success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df')
|
|
return (success_payments_reader_df,)
|
|
|
|
|
|
@app.cell
|
|
def success_payments_mapper(job_id, spark, success_payments_reader_df):
|
|
|
|
_success_payments_mapper_select_clause=success_payments_reader_df.columns if False else []
|
|
|
|
_success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
|
|
|
|
_success_payments_mapper_select_clause.append("amount AS amount")
|
|
|
|
_success_payments_mapper_select_clause.append("gateway AS gateway")
|
|
|
|
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
|
|
|
|
|
|
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
|
|
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
|
|
return (success_payments_mapper_df,)
|
|
|
|
|
|
@app.cell
|
|
def final_success_payments(spark, success_payments_mapper_df):
|
|
|
|
print(success_payments_mapper_df.columns)
|
|
final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'")
|
|
final_success_payments_df.createOrReplaceTempView('final_success_payments_df')
|
|
return (final_success_payments_df,)
|
|
|
|
|
|
@app.cell
|
|
def high_valued_payments_filter(final_success_payments_df, spark):
|
|
|
|
print(final_success_payments_df.columns)
|
|
high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500")
|
|
high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df')
|
|
return (high_valued_payments_filter_df,)
|
|
|
|
|
|
@app.cell
|
|
def total_payments_and_total_value_processed(
|
|
expr,
|
|
final_success_payments_df,
|
|
lit,
|
|
preprocess_then_expand,
|
|
reduce,
|
|
):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_params = {
|
|
"datasource": "final_success_payments",
|
|
"selectFunctions" : [{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}]
|
|
}
|
|
|
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df,
|
|
group_expression="payment_date",
|
|
cube="",
|
|
rollup="",
|
|
grouping_set="",
|
|
select_functions=[{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}]
|
|
)
|
|
|
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
|
for f in _rewritten_selects
|
|
]
|
|
|
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
|
|
|
_partials = []
|
|
for _gs in _grouping_specs:
|
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
|
for _col in _all_group_cols:
|
|
if _col not in _gs:
|
|
_gdf = _gdf.withColumn(_col, lit(None))
|
|
_partials.append(_gdf)
|
|
|
|
|
|
total_payments_and_total_value_processed_df = reduce(lambda a, b: a.unionByName(b), _partials)
|
|
|
|
total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df')
|
|
|
|
|
|
|
|
|
|
return (total_payments_and_total_value_processed_df,)
|
|
|
|
|
|
@app.cell
|
|
def aggregate__4(
|
|
expr,
|
|
final_success_payments_df,
|
|
lit,
|
|
preprocess_then_expand,
|
|
reduce,
|
|
):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_params = {
|
|
"datasource": "final_success_payments",
|
|
"selectFunctions" : [{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}]
|
|
}
|
|
|
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df,
|
|
group_expression="payment_date, payment_method",
|
|
cube="",
|
|
rollup="",
|
|
grouping_set="",
|
|
select_functions=[{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}]
|
|
)
|
|
|
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
|
for f in _rewritten_selects
|
|
]
|
|
|
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
|
|
|
_partials = []
|
|
for _gs in _grouping_specs:
|
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
|
for _col in _all_group_cols:
|
|
if _col not in _gs:
|
|
_gdf = _gdf.withColumn(_col, lit(None))
|
|
_partials.append(_gdf)
|
|
|
|
|
|
aggregate__4_df = reduce(lambda a, b: a.unionByName(b), _partials)
|
|
|
|
aggregate__4_df.createOrReplaceTempView('aggregate__4_df')
|
|
|
|
|
|
|
|
|
|
return (aggregate__4_df,)
|
|
|
|
|
|
@app.cell
|
|
def data_mapper__5(aggregate__4_df, job_id, spark):
|
|
|
|
_data_mapper__5_select_clause=aggregate__4_df.columns if False else []
|
|
|
|
_data_mapper__5_select_clause.append("payment_date AS payment_date")
|
|
|
|
_data_mapper__5_select_clause.append("payment_method AS payment_method")
|
|
|
|
_data_mapper__5_select_clause.append("method_count AS method_count")
|
|
|
|
_data_mapper__5_select_clause.append("RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method")
|
|
|
|
|
|
data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'"))
|
|
data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df")
|
|
return (data_mapper__5_df,)
|
|
|
|
|
|
@app.cell
|
|
def filter__6(data_mapper__5_df, spark):
|
|
|
|
print(data_mapper__5_df.columns)
|
|
filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1")
|
|
filter__6_df.createOrReplaceTempView('filter__6_df')
|
|
return (filter__6_df,)
|
|
|
|
|
|
@app.cell
|
|
def most_used_payment_method__(filter__6_df, job_id, spark):
|
|
|
|
_most_used_payment_method___select_clause=filter__6_df.columns if False else []
|
|
|
|
_most_used_payment_method___select_clause.append("payment_date AS payment_date")
|
|
|
|
_most_used_payment_method___select_clause.append("payment_method AS most_used_payment_method")
|
|
|
|
|
|
most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'"))
|
|
most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df")
|
|
return (most_used_payment_method___df,)
|
|
|
|
|
|
@app.cell
|
|
def high_valued_payments__(
|
|
expr,
|
|
high_valued_payments_filter_df,
|
|
lit,
|
|
preprocess_then_expand,
|
|
reduce,
|
|
):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_params = {
|
|
"datasource": "high_valued_payments_filter",
|
|
"selectFunctions" : [{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}]
|
|
}
|
|
|
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( high_valued_payments_filter_df,
|
|
group_expression="payment_date",
|
|
cube="",
|
|
rollup="",
|
|
grouping_set="",
|
|
select_functions=[{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}]
|
|
)
|
|
|
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
|
for f in _rewritten_selects
|
|
]
|
|
|
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
|
|
|
_partials = []
|
|
for _gs in _grouping_specs:
|
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
|
for _col in _all_group_cols:
|
|
if _col not in _gs:
|
|
_gdf = _gdf.withColumn(_col, lit(None))
|
|
_partials.append(_gdf)
|
|
|
|
|
|
high_valued_payments___df = reduce(lambda a, b: a.unionByName(b), _partials)
|
|
|
|
high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df')
|
|
|
|
|
|
|
|
|
|
return (high_valued_payments___df,)
|
|
|
|
|
|
@app.cell
|
|
def failed_payments_reader(spark):
|
|
|
|
|
|
|
|
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
|
|
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
|
|
return (failed_payments_reader_df,)
|
|
|
|
|
|
@app.cell
|
|
def failed_payments_mapper(failed_payments_reader_df, job_id, spark):
|
|
|
|
_failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else []
|
|
|
|
_failed_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
|
|
|
|
_failed_payments_mapper_select_clause.append("payment_method AS payment_method")
|
|
|
|
_failed_payments_mapper_select_clause.append("failure_reason AS failure_reason")
|
|
|
|
_failed_payments_mapper_select_clause.append("gateway AS gateway")
|
|
|
|
|
|
failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
|
|
failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df")
|
|
return (failed_payments_mapper_df,)
|
|
|
|
|
|
@app.cell
|
|
def final_failed_payments(failed_payments_mapper_df, spark):
|
|
|
|
print(failed_payments_mapper_df.columns)
|
|
final_failed_payments_df = spark.sql("select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))")
|
|
final_failed_payments_df.createOrReplaceTempView('final_failed_payments_df')
|
|
return (final_failed_payments_df,)
|
|
|
|
|
|
@app.cell
|
|
def filter__13(final_failed_payments_df, spark):
|
|
|
|
print(final_failed_payments_df.columns)
|
|
filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'")
|
|
filter__13_df.createOrReplaceTempView('filter__13_df')
|
|
return (filter__13_df,)
|
|
|
|
|
|
@app.cell
|
|
def total_failed_payments__(
|
|
expr,
|
|
filter__13_df,
|
|
lit,
|
|
preprocess_then_expand,
|
|
reduce,
|
|
):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_params = {
|
|
"datasource": "filter__13",
|
|
"selectFunctions" : [{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}]
|
|
}
|
|
|
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( filter__13_df,
|
|
group_expression="payment_date",
|
|
cube="",
|
|
rollup="",
|
|
grouping_set="",
|
|
select_functions=[{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}]
|
|
)
|
|
|
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
|
for f in _rewritten_selects
|
|
]
|
|
|
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
|
|
|
_partials = []
|
|
for _gs in _grouping_specs:
|
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
|
for _col in _all_group_cols:
|
|
if _col not in _gs:
|
|
_gdf = _gdf.withColumn(_col, lit(None))
|
|
_partials.append(_gdf)
|
|
|
|
|
|
total_failed_payments___df = reduce(lambda a, b: a.unionByName(b), _partials)
|
|
|
|
total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df')
|
|
|
|
|
|
|
|
|
|
return (total_failed_payments___df,)
|
|
|
|
|
|
@app.cell
|
|
def failed_payment_metrics(
|
|
expr,
|
|
final_failed_payments_df,
|
|
lit,
|
|
preprocess_then_expand,
|
|
reduce,
|
|
):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_params = {
|
|
"datasource": "final_failed_payments",
|
|
"selectFunctions" : [{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}]
|
|
}
|
|
|
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_failed_payments_df,
|
|
group_expression="payment_date, gateway, failure_reason",
|
|
cube="",
|
|
rollup="",
|
|
grouping_set="",
|
|
select_functions=[{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}]
|
|
)
|
|
|
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
|
for f in _rewritten_selects
|
|
]
|
|
|
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
|
|
|
_partials = []
|
|
for _gs in _grouping_specs:
|
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
|
for _col in _all_group_cols:
|
|
if _col not in _gs:
|
|
_gdf = _gdf.withColumn(_col, lit(None))
|
|
_partials.append(_gdf)
|
|
|
|
|
|
failed_payment_metrics_df = reduce(lambda a, b: a.unionByName(b), _partials)
|
|
|
|
failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df')
|
|
|
|
|
|
|
|
|
|
return (failed_payment_metrics_df,)
|
|
|
|
|
|
@app.cell
|
|
def data_writer__15(failed_payment_metrics_df, spark):
|
|
|
|
|
|
|
|
|
|
_data_writer__15_fields_to_update = failed_payment_metrics_df.columns
|
|
_data_writer__15_set_clause=[]
|
|
_data_writer__15_unique_key_clause= []
|
|
|
|
for _key in ['payment_date', 'gateway', 'failure_reason']:
|
|
_data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}')
|
|
|
|
for _field in _data_writer__15_fields_to_update:
|
|
if(_field not in _data_writer__15_unique_key_clause):
|
|
_data_writer__15_set_clause.append(f't.{_field} = s.{_field}')
|
|
|
|
_merge_query = '''
|
|
MERGE INTO dremio.failedpaymentmetrics t
|
|
USING failed_payment_metrics_df s
|
|
ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN
|
|
UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
|
|
|
spark.sql(_merge_query)
|
|
|
|
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def success_payment_metrics(
|
|
high_valued_payments___df,
|
|
most_used_payment_method___df,
|
|
spark,
|
|
total_failed_payments___df,
|
|
total_payments_and_total_value_processed_df,
|
|
):
|
|
|
|
print(total_payments_and_total_value_processed_df.columns)
|
|
print(most_used_payment_method___df.columns)
|
|
print(high_valued_payments___df.columns)
|
|
print(total_failed_payments___df.columns)
|
|
|
|
success_payment_metrics_df = spark.sql("""
|
|
SELECT
|
|
COALESCE(a.payment_date, d.payment_date) AS payment_date,
|
|
a.total_payments,
|
|
a.total_value_processed,
|
|
b.most_used_payment_method,
|
|
c.high_valued_payments,
|
|
d.total_failed_payments
|
|
FROM total_failed_payments___df d
|
|
FULL OUTER JOIN total_payments_and_total_value_processed_df a
|
|
ON a.payment_date = d.payment_date
|
|
LEFT JOIN most_used_payment_method___df b
|
|
ON a.payment_date = b.payment_date
|
|
LEFT JOIN high_valued_payments___df c
|
|
ON a.payment_date = c.payment_date
|
|
""")
|
|
|
|
success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df')
|
|
return (success_payment_metrics_df,)
|
|
|
|
|
|
@app.cell
|
|
def success_payment_metrics_writer(spark, success_payment_metrics_df):
|
|
|
|
|
|
|
|
|
|
_success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns
|
|
_success_payment_metrics_writer_set_clause=[]
|
|
_success_payment_metrics_writer_unique_key_clause= []
|
|
|
|
for _key in ['payment_date']:
|
|
_success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
|
|
|
|
for _field in _success_payment_metrics_writer_fields_to_update:
|
|
if(_field not in _success_payment_metrics_writer_unique_key_clause):
|
|
_success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}')
|
|
|
|
_merge_query = '''
|
|
MERGE INTO dremio.successpaymentmetrics t
|
|
USING success_payment_metrics_df s
|
|
ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN
|
|
UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
|
|
|
spark.sql(_merge_query)
|
|
|
|
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run()
|