diff --git a/payment_metrics/main.py b/payment_metrics/main.py index 2c4f880..44d05f7 100644 --- a/payment_metrics/main.py +++ b/payment_metrics/main.py @@ -4,23 +4,30 @@ __generated_with = "0.13.15" # %% import sys +import time +from pyspark.sql.utils import AnalysisException sys.path.append('/opt/spark/work-dir/') from workflow_templates.spark.udf_manager import bootstrap_udfs +from util import get_logger, observe_metrics, collect_metrics, log_info, log_error, forgiving_serializer from pyspark.sql.functions import udf -from pyspark.sql.functions import lit +from pyspark.sql.functions import count, expr, lit from pyspark.sql.types import StringType, IntegerType import uuid from pathlib import Path from pyspark import SparkConf, Row from pyspark.sql import SparkSession +from pyspark.sql.observation import Observation +from pyspark import StorageLevel import os import pandas as pd import polars as pl import pyarrow as pa -from pyspark.sql.functions import expr,to_json,col,struct +from pyspark.sql.functions import approx_count_distinct, avg, collect_list, collect_set, corr, count, countDistinct, covar_pop, covar_samp, first, kurtosis, last, max, mean, min, skewness, stddev, stddev_pop, stddev_samp, sum, var_pop, var_samp, variance,expr,to_json,struct, date_format, col, lit, when, regexp_replace, ltrim from functools import reduce from handle_structs_or_arrays import preprocess_then_expand import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry from jinja2 import Template import json @@ -30,81 +37,214 @@ from secrets_manager import SecretsManager from WorkflowManager import WorkflowDSL, WorkflowManager from KnowledgebaseManager import KnowledgebaseManager from gitea_client import GiteaClient, WorkspaceVersionedContent +from FilesystemManager import FilesystemManager, SupportedFilesystemType +from Materialization import Materialization from dremio.flight.endpoint import DremioFlightEndpoint from dremio.flight.query import DremioFlightEndpointQuery +init_start_time=time.time() +LOGGER = get_logger() alias_str='abcdefghijklmnopqrstuvwxyz' workspace = os.getenv('WORKSPACE') or 'exp360cust' +workflow = 'payment_metrics' +execution_environment = os.getenv('EXECUTION_ENVIRONMENT') or 'CLUSTER' job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) +retry_job_id = os.getenv("RETRY_EXECUTION_ID") or '' + +log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}', Retry Job Id: '{retry_job_id}'") sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) secrets = sm.list_secrets(workspace) gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) -conf = SparkConf() -params = { + +filesystemManager = FilesystemManager.create(secrets.get('LAKEHOUSE_BUCKET'), storage_options={'key': secrets.get('S3_ACCESS_KEY'), 'secret': secrets.get('S3_SECRET_KEY'), 'region': secrets.get('S3_REGION')}) +if retry_job_id: + logs = Materialization.get_execution_history_by_job_id(filesystemManager, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, retry_job_id, selected_components=['finalize']).to_dicts() + if len(logs) == 1 and logs[0].get('metrics').get('execute_status') == 'SUCCESS': + log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}' - Retry Job Id: '{retry_job_id}' was already successful. Hence exiting to forward processing to next in chain.") + sys.exit(0) + +_conf = SparkConf() +_params = { "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), - "spark.hadoop.fs.s3a.aws.region": "us-west-1", + "spark.hadoop.fs.s3a.aws.region": secrets.get("S3_REGION") or "None", "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), + "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", + "spark.hadoop.fs.s3.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dremio.type" : "hadoop", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", - "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0,ch.cern.sparkmeasure:spark-measure_2.12:0.26" } -conf.setAll(list(params.items())) +_conf.setAll(list(_params.items())) -spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() +spark = SparkSession.builder.appName(workspace).config(conf=_conf).getOrCreate() bootstrap_udfs(spark) -# %% +materialization = Materialization(spark, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, job_id, retry_job_id, execution_environment, LOGGER) + +init_dependency_key="init" - -success_payments_reader_df = spark.read.table('dremio.payments') -success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df') +init_end_time=time.time() # %% -_success_payments_mapper_select_clause=success_payments_reader_df.columns if False else [] -_success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date") +success_payments_reader_start_time=time.time() -_success_payments_mapper_select_clause.append("amount AS amount") - -_success_payments_mapper_select_clause.append("gateway AS gateway") - -_success_payments_mapper_select_clause.append("payment_method AS payment_method") +success_payments_reader_fail_on_error="" +try: + success_payments_reader_df = spark.read.table('dremio.payments') + success_payments_reader_df, success_payments_reader_observer = observe_metrics("success_payments_reader_df", success_payments_reader_df) -success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'")) -success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") + + success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df') + + success_payments_reader_dependency_key="success_payments_reader" + + success_payments_reader_execute_status="SUCCESS" +except Exception as e: + success_payments_reader_error = e + log_error(LOGGER, f"Component success_payments_reader Failed", e) + success_payments_reader_execute_status="ERROR" + + raise e + +finally: + success_payments_reader_end_time=time.time() # %% + +success_payments_mapper_start_time=time.time() + +success_payments_mapper_fail_on_error="" +try: + _success_payments_mapper_select_clause=success_payments_reader_df.columns if False else [] + + _success_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''') + + _success_payments_mapper_select_clause.append('''amount AS amount''') + + _success_payments_mapper_select_clause.append('''gateway AS gateway''') + + _success_payments_mapper_select_clause.append('''payment_method AS payment_method''') + + try: + success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'")) + except Exception as e: + success_payments_mapper_df = success_payments_reader_df.limit(0) + log_info(LOGGER, f"error while mapping the data :{e} " ) + success_payments_mapper_df, success_payments_mapper_observer = observe_metrics("success_payments_mapper_df", success_payments_mapper_df) + + + + success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") + + success_payments_mapper_dependency_key="success_payments_mapper" + + success_payments_mapper_execute_status="SUCCESS" +except Exception as e: + success_payments_mapper_error = e + log_error(LOGGER, f"Component success_payments_mapper Failed", e) + success_payments_mapper_execute_status="ERROR" + + raise e + +finally: + success_payments_mapper_end_time=time.time() + +# %% + + +final_success_payments_start_time=time.time() + print(success_payments_mapper_df.columns) -final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'") -final_success_payments_df.createOrReplaceTempView('final_success_payments_df') +final_success_payments_fail_on_error="" +try: + try: + final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'") + except AnalysisException as e: + log_info(LOGGER, f"error while filtering data : {e}") + final_success_payments_df = success_payments_mapper_df.limit(0) + except Exception as e: + log_info(LOGGER, f"Unexpected error: {e}") + final_success_payments_df = success_payments_mapper_df.limit(0) + final_success_payments_df, final_success_payments_observer = observe_metrics("final_success_payments_df", final_success_payments_df) + + + + final_success_payments_df.createOrReplaceTempView('final_success_payments_df') + + final_success_payments_dependency_key="final_success_payments" + + final_success_payments_execute_status="SUCCESS" +except Exception as e: + final_success_payments_error = e + log_error(LOGGER, f"Component final_success_payments Failed", e) + final_success_payments_execute_status="ERROR" + + raise e + +finally: + final_success_payments_end_time=time.time() # %% + +high_valued_payments_filter_start_time=time.time() + print(final_success_payments_df.columns) -high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500") -high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df') +high_valued_payments_filter_fail_on_error="" +try: + try: + high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500") + except AnalysisException as e: + log_info(LOGGER, f"error while filtering data : {e}") + high_valued_payments_filter_df = final_success_payments_df.limit(0) + except Exception as e: + log_info(LOGGER, f"Unexpected error: {e}") + high_valued_payments_filter_df = final_success_payments_df.limit(0) + high_valued_payments_filter_df, high_valued_payments_filter_observer = observe_metrics("high_valued_payments_filter_df", high_valued_payments_filter_df) + + + + high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df') + + high_valued_payments_filter_dependency_key="high_valued_payments_filter" + + high_valued_payments_filter_execute_status="SUCCESS" +except Exception as e: + high_valued_payments_filter_error = e + log_error(LOGGER, f"Component high_valued_payments_filter Failed", e) + high_valued_payments_filter_execute_status="ERROR" + + raise e + +finally: + high_valued_payments_filter_end_time=time.time() # %% +total_payments_and_total_value_processed_start_time=time.time() + +total_payments_and_total_value_processed_fail_on_error="True" +try: @@ -112,45 +252,47 @@ high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_fil -_params = { - "datasource": "final_success_payments", - "selectFunctions" : [{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] -} - -_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, -group_expression="payment_date", -cube="", -rollup="", -grouping_set="", -select_functions=[{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] -) - -_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) -for f in _rewritten_selects -] - -_all_group_cols = list({c for gs in _grouping_specs for c in gs}) - -_partials = [] -for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - -total_payments_and_total_value_processed_df = reduce(lambda a, b: a.unionByName(b), _partials) - -total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df') + total_payments_and_total_value_processed_df = final_success_payments_df.groupBy( + "payment_date" + ).agg( + + count('*').alias("total_payments"), + + sum('amount').alias("total_value_processed") + + ) + total_payments_and_total_value_processed_df, total_payments_and_total_value_processed_observer = observe_metrics("total_payments_and_total_value_processed_df", total_payments_and_total_value_processed_df) + + + + total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df') + + total_payments_and_total_value_processed_dependency_key="total_payments_and_total_value_processed" + + print(final_success_payments_dependency_key) + + total_payments_and_total_value_processed_execute_status="SUCCESS" +except Exception as e: + total_payments_and_total_value_processed_error = e + log_error(LOGGER, f"Component total_payments_and_total_value_processed Failed", e) + total_payments_and_total_value_processed_execute_status="ERROR" + + raise e + +finally: + total_payments_and_total_value_processed_end_time=time.time() # %% +aggregate__4_start_time=time.time() + +aggregate__4_fail_on_error="True" +try: @@ -158,161 +300,325 @@ total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payme -_params = { - "datasource": "final_success_payments", - "selectFunctions" : [{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] -} - -_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, -group_expression="payment_date, payment_method", -cube="", -rollup="", -grouping_set="", -select_functions=[{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] -) - -_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) -for f in _rewritten_selects -] - -_all_group_cols = list({c for gs in _grouping_specs for c in gs}) - -_partials = [] -for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - -aggregate__4_df = reduce(lambda a, b: a.unionByName(b), _partials) - -aggregate__4_df.createOrReplaceTempView('aggregate__4_df') + aggregate__4_df = final_success_payments_df.groupBy( + "payment_date", + + "payment_method" + ).agg( + + count('*').alias("method_count") + + ) + aggregate__4_df, aggregate__4_observer = observe_metrics("aggregate__4_df", aggregate__4_df) + + + + aggregate__4_df.createOrReplaceTempView('aggregate__4_df') + + aggregate__4_dependency_key="aggregate__4" + + print(final_success_payments_dependency_key) + + aggregate__4_execute_status="SUCCESS" +except Exception as e: + aggregate__4_error = e + log_error(LOGGER, f"Component aggregate__4 Failed", e) + aggregate__4_execute_status="ERROR" + + raise e + +finally: + aggregate__4_end_time=time.time() # %% -_data_mapper__5_select_clause=aggregate__4_df.columns if False else [] -_data_mapper__5_select_clause.append("payment_date AS payment_date") +data_mapper__5_start_time=time.time() -_data_mapper__5_select_clause.append("payment_method AS payment_method") +data_mapper__5_fail_on_error="" +try: + _data_mapper__5_select_clause=aggregate__4_df.columns if False else [] -_data_mapper__5_select_clause.append("method_count AS method_count") + _data_mapper__5_select_clause.append('''payment_date AS payment_date''') -_data_mapper__5_select_clause.append("RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method") + _data_mapper__5_select_clause.append('''payment_method AS payment_method''') + + _data_mapper__5_select_clause.append('''method_count AS method_count''') + + _data_mapper__5_select_clause.append('''RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method''') + + try: + data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'")) + except Exception as e: + data_mapper__5_df = aggregate__4_df.limit(0) + log_info(LOGGER, f"error while mapping the data :{e} " ) + data_mapper__5_df, data_mapper__5_observer = observe_metrics("data_mapper__5_df", data_mapper__5_df) -data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'")) -data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df") + + data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df") + + data_mapper__5_dependency_key="data_mapper__5" + + data_mapper__5_execute_status="SUCCESS" +except Exception as e: + data_mapper__5_error = e + log_error(LOGGER, f"Component data_mapper__5 Failed", e) + data_mapper__5_execute_status="ERROR" + + raise e + +finally: + data_mapper__5_end_time=time.time() # %% + +filter__6_start_time=time.time() + print(data_mapper__5_df.columns) -filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1") -filter__6_df.createOrReplaceTempView('filter__6_df') - -# %% - -_most_used_payment_method___select_clause=filter__6_df.columns if False else [] - -_most_used_payment_method___select_clause.append("payment_date AS payment_date") - -_most_used_payment_method___select_clause.append("payment_method AS most_used_payment_method") +filter__6_fail_on_error="" +try: + try: + filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1") + except AnalysisException as e: + log_info(LOGGER, f"error while filtering data : {e}") + filter__6_df = data_mapper__5_df.limit(0) + except Exception as e: + log_info(LOGGER, f"Unexpected error: {e}") + filter__6_df = data_mapper__5_df.limit(0) + filter__6_df, filter__6_observer = observe_metrics("filter__6_df", filter__6_df) -most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'")) -most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df") + + filter__6_df.createOrReplaceTempView('filter__6_df') + + filter__6_dependency_key="filter__6" + + filter__6_execute_status="SUCCESS" +except Exception as e: + filter__6_error = e + log_error(LOGGER, f"Component filter__6 Failed", e) + filter__6_execute_status="ERROR" + + raise e + +finally: + filter__6_end_time=time.time() # %% +most_used_payment_method___start_time=time.time() + +most_used_payment_method___fail_on_error="" +try: + _most_used_payment_method___select_clause=filter__6_df.columns if False else [] + + _most_used_payment_method___select_clause.append('''payment_date AS payment_date''') + + _most_used_payment_method___select_clause.append('''payment_method AS most_used_payment_method''') + + try: + most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'")) + except Exception as e: + most_used_payment_method___df = filter__6_df.limit(0) + log_info(LOGGER, f"error while mapping the data :{e} " ) + most_used_payment_method___df, most_used_payment_method___observer = observe_metrics("most_used_payment_method___df", most_used_payment_method___df) + most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df") + most_used_payment_method___dependency_key="most_used_payment_method__" + most_used_payment_method___execute_status="SUCCESS" +except Exception as e: + most_used_payment_method___error = e + log_error(LOGGER, f"Component most_used_payment_method__ Failed", e) + most_used_payment_method___execute_status="ERROR" + raise e -_params = { - "datasource": "high_valued_payments_filter", - "selectFunctions" : [{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] -} - -_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( high_valued_payments_filter_df, -group_expression="payment_date", -cube="", -rollup="", -grouping_set="", -select_functions=[{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] -) - -_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) -for f in _rewritten_selects -] - -_all_group_cols = list({c for gs in _grouping_specs for c in gs}) - -_partials = [] -for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - -high_valued_payments___df = reduce(lambda a, b: a.unionByName(b), _partials) - -high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df') - - - - +finally: + most_used_payment_method___end_time=time.time() # %% +high_valued_payments___start_time=time.time() -failed_payments_reader_df = spark.read.table('dremio.failedpayments') -failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df') +high_valued_payments___fail_on_error="True" +try: + + + + + + + + + + + high_valued_payments___df = high_valued_payments_filter_df.groupBy( + + "payment_date" + ).agg( + + count('*').alias("high_valued_payments") + + ) + high_valued_payments___df, high_valued_payments___observer = observe_metrics("high_valued_payments___df", high_valued_payments___df) + + + + high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df') + + high_valued_payments___dependency_key="high_valued_payments__" + + print(high_valued_payments_filter_dependency_key) + + high_valued_payments___execute_status="SUCCESS" +except Exception as e: + high_valued_payments___error = e + log_error(LOGGER, f"Component high_valued_payments__ Failed", e) + high_valued_payments___execute_status="ERROR" + + raise e + +finally: + high_valued_payments___end_time=time.time() # %% -_failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else [] -_failed_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date") +failed_payments_reader_start_time=time.time() -_failed_payments_mapper_select_clause.append("payment_method AS payment_method") - -_failed_payments_mapper_select_clause.append("failure_reason AS failure_reason") - -_failed_payments_mapper_select_clause.append("gateway AS gateway") +failed_payments_reader_fail_on_error="" +try: + failed_payments_reader_df = spark.read.table('dremio.failedpayments') + failed_payments_reader_df, failed_payments_reader_observer = observe_metrics("failed_payments_reader_df", failed_payments_reader_df) -failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'")) -failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df") + + failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df') + + failed_payments_reader_dependency_key="failed_payments_reader" + + failed_payments_reader_execute_status="SUCCESS" +except Exception as e: + failed_payments_reader_error = e + log_error(LOGGER, f"Component failed_payments_reader Failed", e) + failed_payments_reader_execute_status="ERROR" + + raise e + +finally: + failed_payments_reader_end_time=time.time() # %% + +failed_payments_mapper_start_time=time.time() + +failed_payments_mapper_fail_on_error="" +try: + _failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else [] + + _failed_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''') + + _failed_payments_mapper_select_clause.append('''payment_method AS payment_method''') + + _failed_payments_mapper_select_clause.append('''failure_reason AS failure_reason''') + + _failed_payments_mapper_select_clause.append('''gateway AS gateway''') + + try: + failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'")) + except Exception as e: + failed_payments_mapper_df = failed_payments_reader_df.limit(0) + log_info(LOGGER, f"error while mapping the data :{e} " ) + failed_payments_mapper_df, failed_payments_mapper_observer = observe_metrics("failed_payments_mapper_df", failed_payments_mapper_df) + + + + failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df") + + failed_payments_mapper_dependency_key="failed_payments_mapper" + + failed_payments_mapper_execute_status="SUCCESS" +except Exception as e: + failed_payments_mapper_error = e + log_error(LOGGER, f"Component failed_payments_mapper Failed", e) + failed_payments_mapper_execute_status="ERROR" + + raise e + +finally: + failed_payments_mapper_end_time=time.time() + +# %% + +final_failed_payments_start_time=time.time() + print(failed_payments_mapper_df.columns) final_failed_payments_df = spark.sql("select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))") final_failed_payments_df.createOrReplaceTempView('final_failed_payments_df') final_failed_payments_df.persist() +final_failed_payments_end_time=time.time() + +final_failed_payments_dependency_key="final_failed_payments" + +print(failed_payments_mapper_dependency_key) + + # %% + +filter__13_start_time=time.time() + print(final_failed_payments_df.columns) -filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'") -filter__13_df.createOrReplaceTempView('filter__13_df') +filter__13_fail_on_error="" +try: + try: + filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'") + except AnalysisException as e: + log_info(LOGGER, f"error while filtering data : {e}") + filter__13_df = final_failed_payments_df.limit(0) + except Exception as e: + log_info(LOGGER, f"Unexpected error: {e}") + filter__13_df = final_failed_payments_df.limit(0) + filter__13_df, filter__13_observer = observe_metrics("filter__13_df", filter__13_df) + + + + filter__13_df.createOrReplaceTempView('filter__13_df') + + filter__13_dependency_key="filter__13" + + filter__13_execute_status="SUCCESS" +except Exception as e: + filter__13_error = e + log_error(LOGGER, f"Component filter__13 Failed", e) + filter__13_execute_status="ERROR" + + raise e + +finally: + filter__13_end_time=time.time() # %% +total_failed_payments___start_time=time.time() + +total_failed_payments___fail_on_error="True" +try: @@ -320,45 +626,45 @@ filter__13_df.createOrReplaceTempView('filter__13_df') -_params = { - "datasource": "filter__13", - "selectFunctions" : [{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}] -} - -_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( filter__13_df, -group_expression="payment_date", -cube="", -rollup="", -grouping_set="", -select_functions=[{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}] -) - -_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) -for f in _rewritten_selects -] - -_all_group_cols = list({c for gs in _grouping_specs for c in gs}) - -_partials = [] -for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - -total_failed_payments___df = reduce(lambda a, b: a.unionByName(b), _partials) - -total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df') + total_failed_payments___df = filter__13_df.groupBy( + "payment_date" + ).agg( + + count('*').alias("total_failed_payments") + + ) + total_failed_payments___df, total_failed_payments___observer = observe_metrics("total_failed_payments___df", total_failed_payments___df) + + + + total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df') + + total_failed_payments___dependency_key="total_failed_payments__" + + print(filter__13_dependency_key) + + total_failed_payments___execute_status="SUCCESS" +except Exception as e: + total_failed_payments___error = e + log_error(LOGGER, f"Component total_failed_payments__ Failed", e) + total_failed_payments___execute_status="ERROR" + + raise e + +finally: + total_failed_payments___end_time=time.time() # %% +failed_payment_metrics_start_time=time.time() + +failed_payment_metrics_fail_on_error="True" +try: @@ -366,70 +672,88 @@ total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df') -_params = { - "datasource": "final_failed_payments", - "selectFunctions" : [{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}] -} - -_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_failed_payments_df, -group_expression="payment_date, gateway, failure_reason", -cube="", -rollup="", -grouping_set="", -select_functions=[{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}] -) - -_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) -for f in _rewritten_selects -] - -_all_group_cols = list({c for gs in _grouping_specs for c in gs}) - -_partials = [] -for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - -failed_payment_metrics_df = reduce(lambda a, b: a.unionByName(b), _partials) - -failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df') + failed_payment_metrics_df = final_failed_payments_df.groupBy( + "payment_date", + + "gateway", + + "failure_reason" + ).agg( + + count('*').alias("failure_count") + + ) + failed_payment_metrics_df, failed_payment_metrics_observer = observe_metrics("failed_payment_metrics_df", failed_payment_metrics_df) + + + + failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df') + + failed_payment_metrics_dependency_key="failed_payment_metrics" + + print(final_failed_payments_dependency_key) + + failed_payment_metrics_execute_status="SUCCESS" +except Exception as e: + failed_payment_metrics_error = e + log_error(LOGGER, f"Component failed_payment_metrics Failed", e) + failed_payment_metrics_execute_status="ERROR" + + raise e + +finally: + failed_payment_metrics_end_time=time.time() # %% +data_writer__15_start_time=time.time() + +data_writer__15_fail_on_error="" +try: + + _data_writer__15_fields_to_update = failed_payment_metrics_df.columns + _data_writer__15_set_clause=[] + _data_writer__15_unique_key_clause= [] + + for _key in ['payment_date', 'gateway', 'failure_reason']: + _data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}') + + for _field in _data_writer__15_fields_to_update: + if(_field not in _data_writer__15_unique_key_clause): + _data_writer__15_set_clause.append(f't.{_field} = s.{_field}') + + _merge_query = ''' + MERGE INTO dremio.failedpaymentmetrics t + USING failed_payment_metrics_df s + ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN + UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' + + spark.sql(_merge_query) -_data_writer__15_fields_to_update = failed_payment_metrics_df.columns -_data_writer__15_set_clause=[] -_data_writer__15_unique_key_clause= [] -for _key in ['payment_date', 'gateway', 'failure_reason']: - _data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}') + data_writer__15_dependency_key="data_writer__15" -for _field in _data_writer__15_fields_to_update: - if(_field not in _data_writer__15_unique_key_clause): - _data_writer__15_set_clause.append(f't.{_field} = s.{_field}') - -_merge_query = ''' - MERGE INTO dremio.failedpaymentmetrics t - USING failed_payment_metrics_df s - ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN - UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' - -spark.sql(_merge_query) + data_writer__15_execute_status="SUCCESS" +except Exception as e: + data_writer__15_error = e + log_error(LOGGER, f"Component data_writer__15 Failed", e) + data_writer__15_execute_status="ERROR" + raise e +finally: + data_writer__15_end_time=time.time() # %% +success_payment_metrics_start_time=time.time() + print(total_payments_and_total_value_processed_df.columns) print(most_used_payment_method___df.columns) print(high_valued_payments___df.columns) @@ -454,28 +778,63 @@ LEFT JOIN high_valued_payments___df c success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df') +success_payment_metrics_end_time=time.time() + +success_payment_metrics_dependency_key="success_payment_metrics" + + # %% +success_payment_metrics_writer_start_time=time.time() + +success_payment_metrics_writer_fail_on_error="" +try: + + _success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns + _success_payment_metrics_writer_set_clause=[] + _success_payment_metrics_writer_unique_key_clause= [] + + for _key in ['payment_date']: + _success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}') + + for _field in _success_payment_metrics_writer_fields_to_update: + if(_field not in _success_payment_metrics_writer_unique_key_clause): + _success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}') + + _merge_query = ''' + MERGE INTO dremio.successpaymentmetrics t + USING success_payment_metrics_df s + ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN + UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' + + spark.sql(_merge_query) -_success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns -_success_payment_metrics_writer_set_clause=[] -_success_payment_metrics_writer_unique_key_clause= [] -for _key in ['payment_date']: - _success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}') + success_payment_metrics_writer_dependency_key="success_payment_metrics_writer" -for _field in _success_payment_metrics_writer_fields_to_update: - if(_field not in _success_payment_metrics_writer_unique_key_clause): - _success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}') + success_payment_metrics_writer_execute_status="SUCCESS" +except Exception as e: + success_payment_metrics_writer_error = e + log_error(LOGGER, f"Component success_payment_metrics_writer Failed", e) + success_payment_metrics_writer_execute_status="ERROR" -_merge_query = ''' - MERGE INTO dremio.successpaymentmetrics t - USING success_payment_metrics_df s - ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN - UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' + raise e -spark.sql(_merge_query) +finally: + success_payment_metrics_writer_end_time=time.time() +# %% +finalize_start_time=time.time() + +metrics = { + 'data': collect_metrics(locals()), +} +materialization.materialized_execution_history({'finalize': {'execute_status': 'SUCCESS', 'fail_on_error': 'False'}, **metrics['data']}) +log_info(LOGGER, f"Workflow Data metrics: {metrics['data']}") + +finalize_end_time=time.time() + +spark.stop() \ No newline at end of file diff --git a/payment_metrics/main.py.notebook b/payment_metrics/main.py.notebook index fdbfd45..cd49927 100644 --- a/payment_metrics/main.py.notebook +++ b/payment_metrics/main.py.notebook @@ -8,23 +8,30 @@ app = marimo.App() def init(): import sys + import time + from pyspark.sql.utils import AnalysisException sys.path.append('/opt/spark/work-dir/') from workflow_templates.spark.udf_manager import bootstrap_udfs + from util import get_logger, observe_metrics, collect_metrics, log_info, log_error, forgiving_serializer from pyspark.sql.functions import udf - from pyspark.sql.functions import lit + from pyspark.sql.functions import count, expr, lit from pyspark.sql.types import StringType, IntegerType import uuid from pathlib import Path from pyspark import SparkConf, Row from pyspark.sql import SparkSession + from pyspark.sql.observation import Observation + from pyspark import StorageLevel import os import pandas as pd import polars as pl import pyarrow as pa - from pyspark.sql.functions import expr,to_json,col,struct + from pyspark.sql.functions import approx_count_distinct, avg, collect_list, collect_set, corr, count, countDistinct, covar_pop, covar_samp, first, kurtosis, last, max, mean, min, skewness, stddev, stddev_pop, stddev_samp, sum, var_pop, var_samp, variance,expr,to_json,struct, date_format, col, lit, when, regexp_replace, ltrim from functools import reduce from handle_structs_or_arrays import preprocess_then_expand import requests + from requests.adapters import HTTPAdapter + from urllib3.util.retry import Retry from jinja2 import Template import json @@ -34,474 +41,879 @@ def init(): from WorkflowManager import WorkflowDSL, WorkflowManager from KnowledgebaseManager import KnowledgebaseManager from gitea_client import GiteaClient, WorkspaceVersionedContent + from FilesystemManager import FilesystemManager, SupportedFilesystemType + from Materialization import Materialization from dremio.flight.endpoint import DremioFlightEndpoint from dremio.flight.query import DremioFlightEndpointQuery + init_start_time=time.time() + LOGGER = get_logger() alias_str='abcdefghijklmnopqrstuvwxyz' workspace = os.getenv('WORKSPACE') or 'exp360cust' + workflow = 'payment_metrics' + execution_environment = os.getenv('EXECUTION_ENVIRONMENT') or 'CLUSTER' job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) + retry_job_id = os.getenv("RETRY_EXECUTION_ID") or '' + + log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}', Retry Job Id: '{retry_job_id}'") sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) secrets = sm.list_secrets(workspace) gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) - conf = SparkConf() - params = { + + filesystemManager = FilesystemManager.create(secrets.get('LAKEHOUSE_BUCKET'), storage_options={'key': secrets.get('S3_ACCESS_KEY'), 'secret': secrets.get('S3_SECRET_KEY'), 'region': secrets.get('S3_REGION')}) + if retry_job_id: + logs = Materialization.get_execution_history_by_job_id(filesystemManager, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, retry_job_id, selected_components=['finalize']).to_dicts() + if len(logs) == 1 and logs[0].get('metrics').get('execute_status') == 'SUCCESS': + log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}' - Retry Job Id: '{retry_job_id}' was already successful. Hence exiting to forward processing to next in chain.") + sys.exit(0) + + _conf = SparkConf() + _params = { "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), - "spark.hadoop.fs.s3a.aws.region": "us-west-1", + "spark.hadoop.fs.s3a.aws.region": secrets.get("S3_REGION") or "None", "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), + "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", + "spark.hadoop.fs.s3.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dremio.type" : "hadoop", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", - "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0,ch.cern.sparkmeasure:spark-measure_2.12:0.26" } - conf.setAll(list(params.items())) + _conf.setAll(list(_params.items())) - spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() + spark = SparkSession.builder.appName(workspace).config(conf=_conf).getOrCreate() bootstrap_udfs(spark) - return expr, job_id, lit, preprocess_then_expand, reduce, spark + + materialization = Materialization(spark, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, job_id, retry_job_id, execution_environment, LOGGER) + + init_dependency_key="init" + + + init_end_time=time.time() + return ( + AnalysisException, + LOGGER, + collect_metrics, + count, + job_id, + log_error, + log_info, + materialization, + observe_metrics, + spark, + sum, + time, + ) @app.cell -def success_payments_reader(spark): +def success_payments_reader(LOGGER, log_error, observe_metrics, spark, time): + success_payments_reader_start_time=time.time() - success_payments_reader_df = spark.read.table('dremio.payments') - success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df') + success_payments_reader_fail_on_error="" + try: + success_payments_reader_df = spark.read.table('dremio.payments') + success_payments_reader_df, success_payments_reader_observer = observe_metrics("success_payments_reader_df", success_payments_reader_df) + + + + success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df') + + success_payments_reader_dependency_key="success_payments_reader" + + success_payments_reader_execute_status="SUCCESS" + except Exception as e: + success_payments_reader_error = e + log_error(LOGGER, f"Component success_payments_reader Failed", e) + success_payments_reader_execute_status="ERROR" + + raise e + + finally: + success_payments_reader_end_time=time.time() return (success_payments_reader_df,) @app.cell -def success_payments_mapper(job_id, spark, success_payments_reader_df): - - _success_payments_mapper_select_clause=success_payments_reader_df.columns if False else [] - - _success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date") - - _success_payments_mapper_select_clause.append("amount AS amount") - - _success_payments_mapper_select_clause.append("gateway AS gateway") - - _success_payments_mapper_select_clause.append("payment_method AS payment_method") +def success_payments_mapper( + LOGGER, + job_id, + log_error, + log_info, + observe_metrics, + spark, + success_payments_reader_df, + time, +): - success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'")) - success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") + success_payments_mapper_start_time=time.time() + + success_payments_mapper_fail_on_error="" + try: + _success_payments_mapper_select_clause=success_payments_reader_df.columns if False else [] + + _success_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''') + + _success_payments_mapper_select_clause.append('''amount AS amount''') + + _success_payments_mapper_select_clause.append('''gateway AS gateway''') + + _success_payments_mapper_select_clause.append('''payment_method AS payment_method''') + + try: + success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'")) + except Exception as e: + success_payments_mapper_df = success_payments_reader_df.limit(0) + log_info(LOGGER, f"error while mapping the data :{e} " ) + success_payments_mapper_df, success_payments_mapper_observer = observe_metrics("success_payments_mapper_df", success_payments_mapper_df) + + + + success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") + + success_payments_mapper_dependency_key="success_payments_mapper" + + success_payments_mapper_execute_status="SUCCESS" + except Exception as e: + success_payments_mapper_error = e + log_error(LOGGER, f"Component success_payments_mapper Failed", e) + success_payments_mapper_execute_status="ERROR" + + raise e + + finally: + success_payments_mapper_end_time=time.time() return (success_payments_mapper_df,) @app.cell -def final_success_payments(spark, success_payments_mapper_df): +def final_success_payments( + AnalysisException, + LOGGER, + log_error, + log_info, + observe_metrics, + spark, + success_payments_mapper_df, + time, +): + + + final_success_payments_start_time=time.time() print(success_payments_mapper_df.columns) - final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'") - final_success_payments_df.createOrReplaceTempView('final_success_payments_df') - return (final_success_payments_df,) + final_success_payments_fail_on_error="" + try: + try: + final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'") + except AnalysisException as e: + log_info(LOGGER, f"error while filtering data : {e}") + final_success_payments_df = success_payments_mapper_df.limit(0) + except Exception as e: + log_info(LOGGER, f"Unexpected error: {e}") + final_success_payments_df = success_payments_mapper_df.limit(0) + final_success_payments_df, final_success_payments_observer = observe_metrics("final_success_payments_df", final_success_payments_df) + + + + final_success_payments_df.createOrReplaceTempView('final_success_payments_df') + + final_success_payments_dependency_key="final_success_payments" + + final_success_payments_execute_status="SUCCESS" + except Exception as e: + final_success_payments_error = e + log_error(LOGGER, f"Component final_success_payments Failed", e) + final_success_payments_execute_status="ERROR" + + raise e + + finally: + final_success_payments_end_time=time.time() + return final_success_payments_dependency_key, final_success_payments_df @app.cell -def high_valued_payments_filter(final_success_payments_df, spark): +def high_valued_payments_filter( + AnalysisException, + LOGGER, + final_success_payments_df, + log_error, + log_info, + observe_metrics, + spark, + time, +): + + + high_valued_payments_filter_start_time=time.time() print(final_success_payments_df.columns) - high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500") - high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df') - return (high_valued_payments_filter_df,) + high_valued_payments_filter_fail_on_error="" + try: + try: + high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500") + except AnalysisException as e: + log_info(LOGGER, f"error while filtering data : {e}") + high_valued_payments_filter_df = final_success_payments_df.limit(0) + except Exception as e: + log_info(LOGGER, f"Unexpected error: {e}") + high_valued_payments_filter_df = final_success_payments_df.limit(0) + high_valued_payments_filter_df, high_valued_payments_filter_observer = observe_metrics("high_valued_payments_filter_df", high_valued_payments_filter_df) + + + + high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df') + + high_valued_payments_filter_dependency_key="high_valued_payments_filter" + + high_valued_payments_filter_execute_status="SUCCESS" + except Exception as e: + high_valued_payments_filter_error = e + log_error(LOGGER, f"Component high_valued_payments_filter Failed", e) + high_valued_payments_filter_execute_status="ERROR" + + raise e + + finally: + high_valued_payments_filter_end_time=time.time() + return ( + high_valued_payments_filter_dependency_key, + high_valued_payments_filter_df, + ) @app.cell def total_payments_and_total_value_processed( - expr, + LOGGER, + count, + final_success_payments_dependency_key, final_success_payments_df, - lit, - preprocess_then_expand, - reduce, + log_error, + observe_metrics, + sum, + time, ): + total_payments_and_total_value_processed_start_time=time.time() + + total_payments_and_total_value_processed_fail_on_error="True" + try: + + + + + + + total_payments_and_total_value_processed_df = final_success_payments_df.groupBy( + + "payment_date" + ).agg( + + count('*').alias("total_payments"), + + sum('amount').alias("total_value_processed") + + ) + total_payments_and_total_value_processed_df, total_payments_and_total_value_processed_observer = observe_metrics("total_payments_and_total_value_processed_df", total_payments_and_total_value_processed_df) + + total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df') - _params = { - "datasource": "final_success_payments", - "selectFunctions" : [{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] - } - - _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, - group_expression="payment_date", - cube="", - rollup="", - grouping_set="", - select_functions=[{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] - ) - - _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) - for f in _rewritten_selects - ] - - _all_group_cols = list({c for gs in _grouping_specs for c in gs}) - - _partials = [] - for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - - total_payments_and_total_value_processed_df = reduce(lambda a, b: a.unionByName(b), _partials) - - total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df') - - - - + total_payments_and_total_value_processed_dependency_key="total_payments_and_total_value_processed" + + print(final_success_payments_dependency_key) + + total_payments_and_total_value_processed_execute_status="SUCCESS" + except Exception as e: + total_payments_and_total_value_processed_error = e + log_error(LOGGER, f"Component total_payments_and_total_value_processed Failed", e) + total_payments_and_total_value_processed_execute_status="ERROR" + + raise e + + finally: + total_payments_and_total_value_processed_end_time=time.time() return (total_payments_and_total_value_processed_df,) @app.cell def aggregate__4( - expr, + LOGGER, + count, + final_success_payments_dependency_key, final_success_payments_df, - lit, - preprocess_then_expand, - reduce, + log_error, + observe_metrics, + time, ): + aggregate__4_start_time=time.time() + + aggregate__4_fail_on_error="True" + try: + + + + + + + aggregate__4_df = final_success_payments_df.groupBy( + + "payment_date", + + "payment_method" + ).agg( + + count('*').alias("method_count") + + ) + aggregate__4_df, aggregate__4_observer = observe_metrics("aggregate__4_df", aggregate__4_df) + + aggregate__4_df.createOrReplaceTempView('aggregate__4_df') - _params = { - "datasource": "final_success_payments", - "selectFunctions" : [{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] - } - - _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, - group_expression="payment_date, payment_method", - cube="", - rollup="", - grouping_set="", - select_functions=[{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] - ) - - _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) - for f in _rewritten_selects - ] - - _all_group_cols = list({c for gs in _grouping_specs for c in gs}) - - _partials = [] - for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - - aggregate__4_df = reduce(lambda a, b: a.unionByName(b), _partials) - - aggregate__4_df.createOrReplaceTempView('aggregate__4_df') - - - - + aggregate__4_dependency_key="aggregate__4" + + print(final_success_payments_dependency_key) + + aggregate__4_execute_status="SUCCESS" + except Exception as e: + aggregate__4_error = e + log_error(LOGGER, f"Component aggregate__4 Failed", e) + aggregate__4_execute_status="ERROR" + + raise e + + finally: + aggregate__4_end_time=time.time() return (aggregate__4_df,) @app.cell -def data_mapper__5(aggregate__4_df, job_id, spark): - - _data_mapper__5_select_clause=aggregate__4_df.columns if False else [] - - _data_mapper__5_select_clause.append("payment_date AS payment_date") - - _data_mapper__5_select_clause.append("payment_method AS payment_method") - - _data_mapper__5_select_clause.append("method_count AS method_count") - - _data_mapper__5_select_clause.append("RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method") +def data_mapper__5( + LOGGER, + aggregate__4_df, + job_id, + log_error, + log_info, + observe_metrics, + spark, + time, +): - data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'")) - data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df") + data_mapper__5_start_time=time.time() + + data_mapper__5_fail_on_error="" + try: + _data_mapper__5_select_clause=aggregate__4_df.columns if False else [] + + _data_mapper__5_select_clause.append('''payment_date AS payment_date''') + + _data_mapper__5_select_clause.append('''payment_method AS payment_method''') + + _data_mapper__5_select_clause.append('''method_count AS method_count''') + + _data_mapper__5_select_clause.append('''RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method''') + + try: + data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'")) + except Exception as e: + data_mapper__5_df = aggregate__4_df.limit(0) + log_info(LOGGER, f"error while mapping the data :{e} " ) + data_mapper__5_df, data_mapper__5_observer = observe_metrics("data_mapper__5_df", data_mapper__5_df) + + + + data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df") + + data_mapper__5_dependency_key="data_mapper__5" + + data_mapper__5_execute_status="SUCCESS" + except Exception as e: + data_mapper__5_error = e + log_error(LOGGER, f"Component data_mapper__5 Failed", e) + data_mapper__5_execute_status="ERROR" + + raise e + + finally: + data_mapper__5_end_time=time.time() return (data_mapper__5_df,) @app.cell -def filter__6(data_mapper__5_df, spark): +def filter__6( + AnalysisException, + LOGGER, + data_mapper__5_df, + log_error, + log_info, + observe_metrics, + spark, + time, +): + + + filter__6_start_time=time.time() print(data_mapper__5_df.columns) - filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1") - filter__6_df.createOrReplaceTempView('filter__6_df') + filter__6_fail_on_error="" + try: + try: + filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1") + except AnalysisException as e: + log_info(LOGGER, f"error while filtering data : {e}") + filter__6_df = data_mapper__5_df.limit(0) + except Exception as e: + log_info(LOGGER, f"Unexpected error: {e}") + filter__6_df = data_mapper__5_df.limit(0) + filter__6_df, filter__6_observer = observe_metrics("filter__6_df", filter__6_df) + + + + filter__6_df.createOrReplaceTempView('filter__6_df') + + filter__6_dependency_key="filter__6" + + filter__6_execute_status="SUCCESS" + except Exception as e: + filter__6_error = e + log_error(LOGGER, f"Component filter__6 Failed", e) + filter__6_execute_status="ERROR" + + raise e + + finally: + filter__6_end_time=time.time() return (filter__6_df,) @app.cell -def most_used_payment_method__(filter__6_df, job_id, spark): - - _most_used_payment_method___select_clause=filter__6_df.columns if False else [] - - _most_used_payment_method___select_clause.append("payment_date AS payment_date") - - _most_used_payment_method___select_clause.append("payment_method AS most_used_payment_method") +def most_used_payment_method__( + LOGGER, + filter__6_df, + job_id, + log_error, + log_info, + observe_metrics, + spark, + time, +): - most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'")) - most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df") + most_used_payment_method___start_time=time.time() + + most_used_payment_method___fail_on_error="" + try: + _most_used_payment_method___select_clause=filter__6_df.columns if False else [] + + _most_used_payment_method___select_clause.append('''payment_date AS payment_date''') + + _most_used_payment_method___select_clause.append('''payment_method AS most_used_payment_method''') + + try: + most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'")) + except Exception as e: + most_used_payment_method___df = filter__6_df.limit(0) + log_info(LOGGER, f"error while mapping the data :{e} " ) + most_used_payment_method___df, most_used_payment_method___observer = observe_metrics("most_used_payment_method___df", most_used_payment_method___df) + + + + most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df") + + most_used_payment_method___dependency_key="most_used_payment_method__" + + most_used_payment_method___execute_status="SUCCESS" + except Exception as e: + most_used_payment_method___error = e + log_error(LOGGER, f"Component most_used_payment_method__ Failed", e) + most_used_payment_method___execute_status="ERROR" + + raise e + + finally: + most_used_payment_method___end_time=time.time() return (most_used_payment_method___df,) @app.cell def high_valued_payments__( - expr, + LOGGER, + count, + high_valued_payments_filter_dependency_key, high_valued_payments_filter_df, - lit, - preprocess_then_expand, - reduce, + log_error, + observe_metrics, + time, ): + high_valued_payments___start_time=time.time() + + high_valued_payments___fail_on_error="True" + try: + + + + + + + high_valued_payments___df = high_valued_payments_filter_df.groupBy( + + "payment_date" + ).agg( + + count('*').alias("high_valued_payments") + + ) + high_valued_payments___df, high_valued_payments___observer = observe_metrics("high_valued_payments___df", high_valued_payments___df) + + high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df') - _params = { - "datasource": "high_valued_payments_filter", - "selectFunctions" : [{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] - } - - _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( high_valued_payments_filter_df, - group_expression="payment_date", - cube="", - rollup="", - grouping_set="", - select_functions=[{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] - ) - - _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) - for f in _rewritten_selects - ] - - _all_group_cols = list({c for gs in _grouping_specs for c in gs}) - - _partials = [] - for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - - high_valued_payments___df = reduce(lambda a, b: a.unionByName(b), _partials) - - high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df') - - - - + high_valued_payments___dependency_key="high_valued_payments__" + + print(high_valued_payments_filter_dependency_key) + + high_valued_payments___execute_status="SUCCESS" + except Exception as e: + high_valued_payments___error = e + log_error(LOGGER, f"Component high_valued_payments__ Failed", e) + high_valued_payments___execute_status="ERROR" + + raise e + + finally: + high_valued_payments___end_time=time.time() return (high_valued_payments___df,) @app.cell -def failed_payments_reader(spark): +def failed_payments_reader(LOGGER, log_error, observe_metrics, spark, time): + failed_payments_reader_start_time=time.time() - failed_payments_reader_df = spark.read.table('dremio.failedpayments') - failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df') + failed_payments_reader_fail_on_error="" + try: + failed_payments_reader_df = spark.read.table('dremio.failedpayments') + failed_payments_reader_df, failed_payments_reader_observer = observe_metrics("failed_payments_reader_df", failed_payments_reader_df) + + + + failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df') + + failed_payments_reader_dependency_key="failed_payments_reader" + + failed_payments_reader_execute_status="SUCCESS" + except Exception as e: + failed_payments_reader_error = e + log_error(LOGGER, f"Component failed_payments_reader Failed", e) + failed_payments_reader_execute_status="ERROR" + + raise e + + finally: + failed_payments_reader_end_time=time.time() return (failed_payments_reader_df,) @app.cell -def failed_payments_mapper(failed_payments_reader_df, job_id, spark): - - _failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else [] - - _failed_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date") - - _failed_payments_mapper_select_clause.append("payment_method AS payment_method") - - _failed_payments_mapper_select_clause.append("failure_reason AS failure_reason") - - _failed_payments_mapper_select_clause.append("gateway AS gateway") +def failed_payments_mapper( + LOGGER, + failed_payments_reader_df, + job_id, + log_error, + log_info, + observe_metrics, + spark, + time, +): - failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'")) - failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df") - return (failed_payments_mapper_df,) + failed_payments_mapper_start_time=time.time() + + failed_payments_mapper_fail_on_error="" + try: + _failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else [] + + _failed_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''') + + _failed_payments_mapper_select_clause.append('''payment_method AS payment_method''') + + _failed_payments_mapper_select_clause.append('''failure_reason AS failure_reason''') + + _failed_payments_mapper_select_clause.append('''gateway AS gateway''') + + try: + failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'")) + except Exception as e: + failed_payments_mapper_df = failed_payments_reader_df.limit(0) + log_info(LOGGER, f"error while mapping the data :{e} " ) + failed_payments_mapper_df, failed_payments_mapper_observer = observe_metrics("failed_payments_mapper_df", failed_payments_mapper_df) + + + + failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df") + + failed_payments_mapper_dependency_key="failed_payments_mapper" + + failed_payments_mapper_execute_status="SUCCESS" + except Exception as e: + failed_payments_mapper_error = e + log_error(LOGGER, f"Component failed_payments_mapper Failed", e) + failed_payments_mapper_execute_status="ERROR" + + raise e + + finally: + failed_payments_mapper_end_time=time.time() + return failed_payments_mapper_dependency_key, failed_payments_mapper_df @app.cell -def filter__13(final_failed_payments_df, spark): +def filter__13( + AnalysisException, + LOGGER, + final_failed_payments_df, + log_error, + log_info, + observe_metrics, + spark, + time, +): + + + filter__13_start_time=time.time() print(final_failed_payments_df.columns) - filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'") - filter__13_df.createOrReplaceTempView('filter__13_df') - return (filter__13_df,) + filter__13_fail_on_error="" + try: + try: + filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'") + except AnalysisException as e: + log_info(LOGGER, f"error while filtering data : {e}") + filter__13_df = final_failed_payments_df.limit(0) + except Exception as e: + log_info(LOGGER, f"Unexpected error: {e}") + filter__13_df = final_failed_payments_df.limit(0) + filter__13_df, filter__13_observer = observe_metrics("filter__13_df", filter__13_df) + + + + filter__13_df.createOrReplaceTempView('filter__13_df') + + filter__13_dependency_key="filter__13" + + filter__13_execute_status="SUCCESS" + except Exception as e: + filter__13_error = e + log_error(LOGGER, f"Component filter__13 Failed", e) + filter__13_execute_status="ERROR" + + raise e + + finally: + filter__13_end_time=time.time() + return filter__13_dependency_key, filter__13_df @app.cell def total_failed_payments__( - expr, + LOGGER, + count, + filter__13_dependency_key, filter__13_df, - lit, - preprocess_then_expand, - reduce, + log_error, + observe_metrics, + time, ): + total_failed_payments___start_time=time.time() + + total_failed_payments___fail_on_error="True" + try: + + + + + + + total_failed_payments___df = filter__13_df.groupBy( + + "payment_date" + ).agg( + + count('*').alias("total_failed_payments") + + ) + total_failed_payments___df, total_failed_payments___observer = observe_metrics("total_failed_payments___df", total_failed_payments___df) + + total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df') - _params = { - "datasource": "filter__13", - "selectFunctions" : [{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}] - } - - _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( filter__13_df, - group_expression="payment_date", - cube="", - rollup="", - grouping_set="", - select_functions=[{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}] - ) - - _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) - for f in _rewritten_selects - ] - - _all_group_cols = list({c for gs in _grouping_specs for c in gs}) - - _partials = [] - for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - - total_failed_payments___df = reduce(lambda a, b: a.unionByName(b), _partials) - - total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df') - - - - + total_failed_payments___dependency_key="total_failed_payments__" + + print(filter__13_dependency_key) + + total_failed_payments___execute_status="SUCCESS" + except Exception as e: + total_failed_payments___error = e + log_error(LOGGER, f"Component total_failed_payments__ Failed", e) + total_failed_payments___execute_status="ERROR" + + raise e + + finally: + total_failed_payments___end_time=time.time() return (total_failed_payments___df,) @app.cell def failed_payment_metrics( - expr, + LOGGER, + count, + final_failed_payments_dependency_key, final_failed_payments_df, - lit, - preprocess_then_expand, - reduce, + log_error, + observe_metrics, + time, ): + failed_payment_metrics_start_time=time.time() + + failed_payment_metrics_fail_on_error="True" + try: + + + + + + + failed_payment_metrics_df = final_failed_payments_df.groupBy( + + "payment_date", + + "gateway", + + "failure_reason" + ).agg( + + count('*').alias("failure_count") + + ) + failed_payment_metrics_df, failed_payment_metrics_observer = observe_metrics("failed_payment_metrics_df", failed_payment_metrics_df) + + failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df') - _params = { - "datasource": "final_failed_payments", - "selectFunctions" : [{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}] - } - - _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_failed_payments_df, - group_expression="payment_date, gateway, failure_reason", - cube="", - rollup="", - grouping_set="", - select_functions=[{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}] - ) - - _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) - for f in _rewritten_selects - ] - - _all_group_cols = list({c for gs in _grouping_specs for c in gs}) - - _partials = [] - for _gs in _grouping_specs: - _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) - for _col in _all_group_cols: - if _col not in _gs: - _gdf = _gdf.withColumn(_col, lit(None)) - _partials.append(_gdf) - - - failed_payment_metrics_df = reduce(lambda a, b: a.unionByName(b), _partials) - - failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df') - - - - + failed_payment_metrics_dependency_key="failed_payment_metrics" + + print(final_failed_payments_dependency_key) + + failed_payment_metrics_execute_status="SUCCESS" + except Exception as e: + failed_payment_metrics_error = e + log_error(LOGGER, f"Component failed_payment_metrics Failed", e) + failed_payment_metrics_execute_status="ERROR" + + raise e + + finally: + failed_payment_metrics_end_time=time.time() return (failed_payment_metrics_df,) @app.cell -def data_writer__15(failed_payment_metrics_df, spark): +def data_writer__15(LOGGER, failed_payment_metrics_df, log_error, spark, time): + data_writer__15_start_time=time.time() + data_writer__15_fail_on_error="" + try: + + _data_writer__15_fields_to_update = failed_payment_metrics_df.columns + _data_writer__15_set_clause=[] + _data_writer__15_unique_key_clause= [] - _data_writer__15_fields_to_update = failed_payment_metrics_df.columns - _data_writer__15_set_clause=[] - _data_writer__15_unique_key_clause= [] + for _key in ['payment_date', 'gateway', 'failure_reason']: + _data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}') - for _key in ['payment_date', 'gateway', 'failure_reason']: - _data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}') + for _field in _data_writer__15_fields_to_update: + if(_field not in _data_writer__15_unique_key_clause): + _data_writer__15_set_clause.append(f't.{_field} = s.{_field}') - for _field in _data_writer__15_fields_to_update: - if(_field not in _data_writer__15_unique_key_clause): - _data_writer__15_set_clause.append(f't.{_field} = s.{_field}') + _merge_query = ''' + MERGE INTO dremio.failedpaymentmetrics t + USING failed_payment_metrics_df s + ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN + UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' - _merge_query = ''' - MERGE INTO dremio.failedpaymentmetrics t - USING failed_payment_metrics_df s - ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN - UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' - - spark.sql(_merge_query) + spark.sql(_merge_query) + + data_writer__15_dependency_key="data_writer__15" + + data_writer__15_execute_status="SUCCESS" + except Exception as e: + data_writer__15_error = e + log_error(LOGGER, f"Component data_writer__15 Failed", e) + data_writer__15_execute_status="ERROR" + + raise e + + finally: + data_writer__15_end_time=time.time() return @@ -510,10 +922,13 @@ def success_payment_metrics( high_valued_payments___df, most_used_payment_method___df, spark, + time, total_failed_payments___df, total_payments_and_total_value_processed_df, ): + success_payment_metrics_start_time=time.time() + print(total_payments_and_total_value_processed_df.columns) print(most_used_payment_method___df.columns) print(high_valued_payments___df.columns) @@ -537,46 +952,104 @@ def success_payment_metrics( """) success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df') + + success_payment_metrics_end_time=time.time() + + success_payment_metrics_dependency_key="success_payment_metrics" + return (success_payment_metrics_df,) @app.cell -def success_payment_metrics_writer(spark, success_payment_metrics_df): +def success_payment_metrics_writer( + LOGGER, + log_error, + spark, + success_payment_metrics_df, + time, +): + success_payment_metrics_writer_start_time=time.time() + success_payment_metrics_writer_fail_on_error="" + try: + + _success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns + _success_payment_metrics_writer_set_clause=[] + _success_payment_metrics_writer_unique_key_clause= [] - _success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns - _success_payment_metrics_writer_set_clause=[] - _success_payment_metrics_writer_unique_key_clause= [] + for _key in ['payment_date']: + _success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}') - for _key in ['payment_date']: - _success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}') + for _field in _success_payment_metrics_writer_fields_to_update: + if(_field not in _success_payment_metrics_writer_unique_key_clause): + _success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}') - for _field in _success_payment_metrics_writer_fields_to_update: - if(_field not in _success_payment_metrics_writer_unique_key_clause): - _success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}') + _merge_query = ''' + MERGE INTO dremio.successpaymentmetrics t + USING success_payment_metrics_df s + ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN + UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' - _merge_query = ''' - MERGE INTO dremio.successpaymentmetrics t - USING success_payment_metrics_df s - ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN - UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' - - spark.sql(_merge_query) + spark.sql(_merge_query) + + success_payment_metrics_writer_dependency_key="success_payment_metrics_writer" + + success_payment_metrics_writer_execute_status="SUCCESS" + except Exception as e: + success_payment_metrics_writer_error = e + log_error(LOGGER, f"Component success_payment_metrics_writer Failed", e) + success_payment_metrics_writer_execute_status="ERROR" + + raise e + + finally: + success_payment_metrics_writer_end_time=time.time() return @app.cell -def final_failed_payments(failed_payments_mapper_df, spark): +def final_failed_payments( + failed_payments_mapper_dependency_key, + failed_payments_mapper_df, + spark, + time, +): + + final_failed_payments_start_time=time.time() print(failed_payments_mapper_df.columns) final_failed_payments_df = spark.sql("select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))") final_failed_payments_df.createOrReplaceTempView('final_failed_payments_df') final_failed_payments_df.persist() - return (final_failed_payments_df,) + + final_failed_payments_end_time=time.time() + + final_failed_payments_dependency_key="final_failed_payments" + + print(failed_payments_mapper_dependency_key) + + return final_failed_payments_dependency_key, final_failed_payments_df + + +@app.cell +def finalize(LOGGER, collect_metrics, log_info, materialization, spark, time): + + finalize_start_time=time.time() + + metrics = { + 'data': collect_metrics(locals()), + } + materialization.materialized_execution_history({'finalize': {'execute_status': 'SUCCESS', 'fail_on_error': 'False'}, **metrics['data']}) + log_info(LOGGER, f"Workflow Data metrics: {metrics['data']}") + + finalize_end_time=time.time() + + spark.stop() + return if __name__ == "__main__": diff --git a/payment_metrics/main.workflow b/payment_metrics/main.workflow index 91c0667..6af2617 100644 --- a/payment_metrics/main.workflow +++ b/payment_metrics/main.workflow @@ -1 +1 @@ -{"version":"v1alpha","kind":"VisualBuilder","metadata":{"name":"payment_metrics","description":"Workflow to generate payment metrics.","runtime":"spark"},"spec":{"ui":{"edges":[{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-reader__1","target":"data-mapper__0","id":"xy-edge__data-reader__1-data-mapper__0"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__0","target":"filter__1","id":"xy-edge__data-mapper__0-filter__1"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"filter__3","id":"xy-edge__filter__1-filter__3"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"aggregate__4","id":"xy-edge__filter__1-aggregate__4"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"aggregate__7","id":"xy-edge__filter__1-aggregate__7"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__4","target":"data-mapper__5","id":"xy-edge__aggregate__4-data-mapper__5"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__5","target":"filter__6","id":"xy-edge__data-mapper__5-filter__6"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__6","target":"data-mapper__8","id":"xy-edge__filter__6-data-mapper__8"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__3","target":"aggregate__9","id":"xy-edge__filter__3-aggregate__9"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-reader__10","target":"data-mapper__11","id":"xy-edge__data-reader__10-data-mapper__11"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__13","target":"aggregate__16","id":"xy-edge__filter__13-aggregate__16"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__14","target":"data-writer__15","id":"xy-edge__aggregate__14-data-writer__15"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__8","target":"code-transform__17","id":"xy-edge__data-mapper__8-code-transform__17"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__7","target":"code-transform__17","id":"xy-edge__aggregate__7-code-transform__17"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__9","target":"code-transform__17","id":"xy-edge__aggregate__9-code-transform__17"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__16","target":"code-transform__17","id":"xy-edge__aggregate__16-code-transform__17"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"code-transform__17","target":"data-writer__18","id":"xy-edge__code-transform__17-data-writer__18"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__11","target":"code-transform__0","id":"xy-edge__data-mapper__11-code-transform__0"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"code-transform__0","target":"filter__13","id":"xy-edge__code-transform__0-filter__13"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"code-transform__0","target":"aggregate__14","id":"xy-edge__code-transform__0-aggregate__14"}],"nodes":[{"id":"data-reader__1","type":"workflowNode","position":{"x":-1762.5340546476618,"y":-168.4753522546897},"data":{"nodeType":"data-reader","id":"data-reader__1"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__0","type":"workflowNode","position":{"x":-1353.8681752505147,"y":-169.04851494308113},"data":{"nodeType":"data-mapper","id":"data-mapper__0"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__1","type":"workflowNode","position":{"x":-921.2468878928117,"y":-170.40923466009258},"data":{"nodeType":"filter","id":"filter__1"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__3","type":"workflowNode","position":{"x":-429.06026348805125,"y":120.4585301507131},"data":{"nodeType":"filter","id":"filter__3"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__4","type":"workflowNode","position":{"x":-408.1850633096815,"y":-543.9835681697932},"data":{"nodeType":"aggregate","id":"aggregate__4"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__5","type":"workflowNode","position":{"x":175.81493669031852,"y":-549.9835681697932},"data":{"nodeType":"data-mapper","id":"data-mapper__5"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__6","type":"workflowNode","position":{"x":767.8149366903185,"y":-547.9835681697932},"data":{"nodeType":"filter","id":"filter__6"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__7","type":"workflowNode","position":{"x":1393.8149366903185,"y":-177.98356816979322},"data":{"nodeType":"aggregate","id":"aggregate__7"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__8","type":"workflowNode","position":{"x":1383.8149366903185,"y":-549.9835681697932},"data":{"nodeType":"data-mapper","id":"data-mapper__8"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__9","type":"workflowNode","position":{"x":1393.8149366903185,"y":120.01643183020678},"data":{"nodeType":"aggregate","id":"aggregate__9"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-reader__10","type":"workflowNode","position":{"x":-1798.1850633096815,"y":580.0164318302068},"data":{"nodeType":"data-reader","id":"data-reader__10"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__11","type":"workflowNode","position":{"x":-1362.1850633096815,"y":580.0164318302068},"data":{"nodeType":"data-mapper","id":"data-mapper__11"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__13","type":"workflowNode","position":{"x":-416.1850633096815,"y":390.0164318302068},"data":{"nodeType":"filter","id":"filter__13"},"measured":{"width":320,"height":110},"selected":true,"dragging":false},{"id":"aggregate__14","type":"workflowNode","position":{"x":-410.1850633096815,"y":756.0164318302068},"data":{"nodeType":"aggregate","id":"aggregate__14"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-writer__15","type":"workflowNode","position":{"x":181.81493669031852,"y":736.0164318302068},"data":{"nodeType":"data-writer","id":"data-writer__15"},"measured":{"width":320,"height":138},"selected":false,"dragging":false},{"id":"aggregate__16","type":"workflowNode","position":{"x":1381.8149366903185,"y":378.0164318302068},"data":{"nodeType":"aggregate","id":"aggregate__16"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"code-transform__17","type":"workflowNode","position":{"x":2045.8149366903185,"y":-57.983568169793216},"data":{"nodeType":"code-transform","id":"code-transform__17"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-writer__18","type":"workflowNode","position":{"x":2611.8149366903185,"y":-81.98356816979322},"data":{"nodeType":"data-writer","id":"data-writer__18"},"measured":{"width":320,"height":138},"selected":false,"dragging":false},{"id":"code-transform__0","type":"workflowNode","position":{"x":-928.2552865705557,"y":564.8811573007712},"data":{"nodeType":"code-transform","id":"code-transform__0"},"measured":{"width":320,"height":110},"selected":false,"dragging":false}],"nodesData":{"data-reader__1":{"isDefault":false,"iceberg_catalog":"dremio","typeLabel":"Spark","name":"success_payments_reader","type":"SparkIcebergReader","table_name":"payments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{}},"data-mapper__0":{"name":"success_payments_mapper","type":"DataMapping","fromDataReader":"success_payments_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"amount","valueExpression":"amount"},{"fieldName":"gateway","valueExpression":"gateway"},{"fieldName":"payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756374444833","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756374453565","newFieldName":"amount","mappingType":"sourceColumn","value":"amount"},{"id":"mapping-1756374468084","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"},{"id":"mapping-1756374477932","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__1":{"name":"final_success_payments","type":"Filter","datasource":"success_payments_mapper","condition":" payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \\'CCS\\'","isDefault":false},"filter__3":{"name":"high_valued_payments_filter","type":"Filter","datasource":"final_success_payments","condition":"amount >= 500","isDefault":false},"aggregate__7":{"name":"total_payments_and_total_value_processed","type":"SQLAggregation","datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_payments","aggregationFunction":"COUNT(*)"},{"fieldName":"total_value_processed","aggregationFunction":"SUM(amount)"}],"isDefault":false},"aggregate__4":{"name":"aggregate__4","type":"SQLAggregation","datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date, payment_method"},"selectFunctions":[{"fieldName":"method_count","aggregationFunction":"COUNT(*)"}],"isDefault":false},"data-mapper__5":{"name":"data_mapper__5","type":"DataMapping","fromDataReader":"aggregate__4","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"method_count","valueExpression":"method_count"},{"fieldName":"rank_method","valueExpression":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375393396","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375408077","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756375415696","newFieldName":"method_count","mappingType":"sourceColumn","value":"method_count"},{"id":"mapping-1756375432743","newFieldName":"rank_method","mappingType":"expression","value":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__6":{"name":"filter__6","type":"Filter","datasource":"data_mapper__5","condition":"rank_method = 1","isDefault":false},"data-mapper__8":{"name":"most_used_payment_method__","type":"DataMapping","fromDataReader":"filter__6","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"most_used_payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375574506","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375594556","newFieldName":"most_used_payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"aggregate__9":{"name":"high_valued_payments__","type":"SQLAggregation","datasource":"high_valued_payments_filter","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"high_valued_payments","aggregationFunction":"COUNT(*)"}],"isDefault":false},"data-reader__10":{"iceberg_catalog":"dremio","typeLabel":"Spark","isDefault":false,"name":"failed_payments_reader","type":"SparkIcebergReader","table_name":"failedpayments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{}},"data-mapper__11":{"name":"failed_payments_mapper","type":"DataMapping","fromDataReader":"failed_payments_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"failure_reason","valueExpression":"failure_reason"},{"fieldName":"gateway","valueExpression":"gateway"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756377601359","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756377617744","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756377625433","newFieldName":"failure_reason","mappingType":"sourceColumn","value":"failure_reason"},{"id":"mapping-1756377634290","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__13":{"name":"filter__13","type":"Filter","datasource":"final_failed_payments","condition":"gateway = \\'CCS\\'","isDefault":false},"aggregate__16":{"name":"total_failed_payments__","type":"SQLAggregation","datasource":"filter__13","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_failed_payments","aggregationFunction":"COUNT(*)"}],"isDefault":false},"aggregate__14":{"name":"failed_payment_metrics","type":"SQLAggregation","datasource":"final_failed_payments","groupByParams":{"group_expression":"payment_date, gateway, failure_reason"},"selectFunctions":[{"fieldName":"failure_count","aggregationFunction":"COUNT(*)"}],"isDefault":false},"data-writer__15":{"name":"data_writer__15","type":"IcebergWriter","iceberg_catalog":"dremio","warehouse_directory":"failedpaymentmetrics","datasource":"failed_payment_metrics","mode":"merge","typeLabel":"Iceberg","unique_id":["payment_date","gateway","failure_reason"],"isDefault":false},"code-transform__17":{"name":"success_payment_metrics","type":"CodeTransform","code":"print(total_payments_and_total_value_processed_df.columns)\nprint(most_used_payment_method___df.columns)\nprint(high_valued_payments___df.columns)\nprint(total_failed_payments___df.columns)\n\n{{name}}_df = spark.sql(\"\"\"\nSELECT \n COALESCE(a.payment_date, d.payment_date) AS payment_date,\n a.total_payments,\n a.total_value_processed,\n b.most_used_payment_method,\n c.high_valued_payments,\n d.total_failed_payments\nFROM total_failed_payments___df d\nFULL OUTER JOIN total_payments_and_total_value_processed_df a \n ON a.payment_date = d.payment_date\nLEFT JOIN most_used_payment_method___df b \n ON a.payment_date = b.payment_date\nLEFT JOIN high_valued_payments___df c \n ON a.payment_date = c.payment_date\n\"\"\")\n\n{{name}}_df.createOrReplaceTempView('{{name}}_df')","isDefault":false},"data-writer__18":{"name":"success_payment_metrics_writer","type":"IcebergWriter","iceberg_catalog":"dremio","warehouse_directory":"successpaymentmetrics","datasource":"success_payment_metrics","mode":"merge","typeLabel":"Iceberg (Legacy)","unique_id":["payment_date"],"isDefault":false},"code-transform__0":{"name":"final_failed_payments","type":"CodeTransform","language":"python","datasource":"failed_payments_mapper","code":"print(failed_payments_mapper_df.columns)\nfinal_failed_payments_df = spark.sql(\"select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))\")\n{{name}}_df.createOrReplaceTempView('final_failed_payments_df')\n{{name}}_df.persist()","isDefault":false,"connectedComponents":["failed_payments_mapper"]}}},"blocks":[{"name":"success_payments_reader","type":"SparkIcebergReader","options":{},"isDefault":false,"iceberg_catalog":"dremio","typeLabel":"Spark","table_name":"payments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"}},{"name":"success_payments_mapper","type":"DataMapping","options":{},"fromDataReader":"success_payments_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"amount","valueExpression":"amount"},{"fieldName":"gateway","valueExpression":"gateway"},{"fieldName":"payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756374444833","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756374453565","newFieldName":"amount","mappingType":"sourceColumn","value":"amount"},{"id":"mapping-1756374468084","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"},{"id":"mapping-1756374477932","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"final_success_payments","type":"Filter","options":{},"datasource":"success_payments_mapper","condition":" payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \\'CCS\\'","isDefault":false},{"name":"high_valued_payments_filter","type":"Filter","options":{},"datasource":"final_success_payments","condition":"amount >= 500","isDefault":false},{"name":"total_payments_and_total_value_processed","type":"SQLAggregation","options":{},"datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_payments","aggregationFunction":"COUNT(*)"},{"fieldName":"total_value_processed","aggregationFunction":"SUM(amount)"}],"isDefault":false},{"name":"aggregate__4","type":"SQLAggregation","options":{},"datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date, payment_method"},"selectFunctions":[{"fieldName":"method_count","aggregationFunction":"COUNT(*)"}],"isDefault":false},{"name":"data_mapper__5","type":"DataMapping","options":{},"fromDataReader":"aggregate__4","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"method_count","valueExpression":"method_count"},{"fieldName":"rank_method","valueExpression":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375393396","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375408077","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756375415696","newFieldName":"method_count","mappingType":"sourceColumn","value":"method_count"},{"id":"mapping-1756375432743","newFieldName":"rank_method","mappingType":"expression","value":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"filter__6","type":"Filter","options":{},"datasource":"data_mapper__5","condition":"rank_method = 1","isDefault":false},{"name":"most_used_payment_method__","type":"DataMapping","options":{},"fromDataReader":"filter__6","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"most_used_payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375574506","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375594556","newFieldName":"most_used_payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"high_valued_payments__","type":"SQLAggregation","options":{},"datasource":"high_valued_payments_filter","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"high_valued_payments","aggregationFunction":"COUNT(*)"}],"isDefault":false},{"name":"failed_payments_reader","type":"SparkIcebergReader","options":{},"iceberg_catalog":"dremio","typeLabel":"Spark","isDefault":false,"table_name":"failedpayments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"}},{"name":"failed_payments_mapper","type":"DataMapping","options":{},"fromDataReader":"failed_payments_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"failure_reason","valueExpression":"failure_reason"},{"fieldName":"gateway","valueExpression":"gateway"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756377601359","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756377617744","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756377625433","newFieldName":"failure_reason","mappingType":"sourceColumn","value":"failure_reason"},{"id":"mapping-1756377634290","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"filter__13","type":"Filter","options":{},"datasource":"final_failed_payments","condition":"gateway = \\'CCS\\'","isDefault":false},{"name":"total_failed_payments__","type":"SQLAggregation","options":{},"datasource":"filter__13","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_failed_payments","aggregationFunction":"COUNT(*)"}],"isDefault":false},{"name":"failed_payment_metrics","type":"SQLAggregation","options":{},"datasource":"final_failed_payments","groupByParams":{"group_expression":"payment_date, gateway, failure_reason"},"selectFunctions":[{"fieldName":"failure_count","aggregationFunction":"COUNT(*)"}],"isDefault":false},{"name":"data_writer__15","type":"IcebergWriter","options":{},"iceberg_catalog":"dremio","warehouse_directory":"failedpaymentmetrics","datasource":"failed_payment_metrics","mode":"merge","typeLabel":"Iceberg","unique_id":["payment_date","gateway","failure_reason"],"isDefault":false},{"name":"success_payment_metrics","type":"CodeTransform","options":{},"code":"print(total_payments_and_total_value_processed_df.columns)\nprint(most_used_payment_method___df.columns)\nprint(high_valued_payments___df.columns)\nprint(total_failed_payments___df.columns)\n\n{{name}}_df = spark.sql(\"\"\"\nSELECT \n COALESCE(a.payment_date, d.payment_date) AS payment_date,\n a.total_payments,\n a.total_value_processed,\n b.most_used_payment_method,\n c.high_valued_payments,\n d.total_failed_payments\nFROM total_failed_payments___df d\nFULL OUTER JOIN total_payments_and_total_value_processed_df a \n ON a.payment_date = d.payment_date\nLEFT JOIN most_used_payment_method___df b \n ON a.payment_date = b.payment_date\nLEFT JOIN high_valued_payments___df c \n ON a.payment_date = c.payment_date\n\"\"\")\n\n{{name}}_df.createOrReplaceTempView('{{name}}_df')","isDefault":false},{"name":"success_payment_metrics_writer","type":"IcebergWriter","options":{},"iceberg_catalog":"dremio","warehouse_directory":"successpaymentmetrics","datasource":"success_payment_metrics","mode":"merge","typeLabel":"Iceberg (Legacy)","unique_id":["payment_date"],"isDefault":false},{"name":"final_failed_payments","type":"CodeTransform","options":{},"language":"python","datasource":"failed_payments_mapper","code":"print(failed_payments_mapper_df.columns)\nfinal_failed_payments_df = spark.sql(\"select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))\")\n{{name}}_df.createOrReplaceTempView('final_failed_payments_df')\n{{name}}_df.persist()","isDefault":false,"connectedComponents":["failed_payments_mapper"]}]}} \ No newline at end of file +{"version":"v1alpha","kind":"VisualBuilder","metadata":{"name":"payment_metrics","description":"Workflow to generate payment metrics.","runtime":"spark"},"spec":{"ui":{"edges":[{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-reader__1","target":"data-mapper__0","id":"xy-edge__data-reader__1-data-mapper__0"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__0","target":"filter__1","id":"xy-edge__data-mapper__0-filter__1"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"filter__3","id":"xy-edge__filter__1-filter__3"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"aggregate__4","id":"xy-edge__filter__1-aggregate__4"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"aggregate__7","id":"xy-edge__filter__1-aggregate__7"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__4","target":"data-mapper__5","id":"xy-edge__aggregate__4-data-mapper__5"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__5","target":"filter__6","id":"xy-edge__data-mapper__5-filter__6"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__6","target":"data-mapper__8","id":"xy-edge__filter__6-data-mapper__8"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__3","target":"aggregate__9","id":"xy-edge__filter__3-aggregate__9"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-reader__10","target":"data-mapper__11","id":"xy-edge__data-reader__10-data-mapper__11"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__13","target":"aggregate__16","id":"xy-edge__filter__13-aggregate__16"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__14","target":"data-writer__15","id":"xy-edge__aggregate__14-data-writer__15"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__8","target":"code-transform__17","id":"xy-edge__data-mapper__8-code-transform__17"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__7","target":"code-transform__17","id":"xy-edge__aggregate__7-code-transform__17"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__9","target":"code-transform__17","id":"xy-edge__aggregate__9-code-transform__17"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__16","target":"code-transform__17","id":"xy-edge__aggregate__16-code-transform__17"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"code-transform__17","target":"data-writer__18","id":"xy-edge__code-transform__17-data-writer__18"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__11","target":"code-transform__0","id":"xy-edge__data-mapper__11-code-transform__0"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"code-transform__0","target":"filter__13","id":"xy-edge__code-transform__0-filter__13"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"code-transform__0","target":"aggregate__14","id":"xy-edge__code-transform__0-aggregate__14"}],"nodes":[{"id":"data-reader__1","type":"workflowNode","position":{"x":-1762.5340546476618,"y":-168.4753522546897},"data":{"nodeType":"data-reader","id":"data-reader__1"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__0","type":"workflowNode","position":{"x":-1353.8681752505147,"y":-169.04851494308113},"data":{"nodeType":"data-mapper","id":"data-mapper__0"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__1","type":"workflowNode","position":{"x":-921.2468878928117,"y":-170.40923466009258},"data":{"nodeType":"filter","id":"filter__1"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__3","type":"workflowNode","position":{"x":-429.06026348805125,"y":120.4585301507131},"data":{"nodeType":"filter","id":"filter__3"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__4","type":"workflowNode","position":{"x":-408.1850633096815,"y":-543.9835681697932},"data":{"nodeType":"aggregate","id":"aggregate__4"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__5","type":"workflowNode","position":{"x":175.81493669031852,"y":-549.9835681697932},"data":{"nodeType":"data-mapper","id":"data-mapper__5"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__6","type":"workflowNode","position":{"x":767.8149366903185,"y":-547.9835681697932},"data":{"nodeType":"filter","id":"filter__6"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__7","type":"workflowNode","position":{"x":1393.8149366903185,"y":-177.98356816979322},"data":{"nodeType":"aggregate","id":"aggregate__7"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__8","type":"workflowNode","position":{"x":1383.8149366903185,"y":-549.9835681697932},"data":{"nodeType":"data-mapper","id":"data-mapper__8"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__9","type":"workflowNode","position":{"x":1393.8149366903185,"y":120.01643183020678},"data":{"nodeType":"aggregate","id":"aggregate__9"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-reader__10","type":"workflowNode","position":{"x":-1798.1850633096815,"y":580.0164318302068},"data":{"nodeType":"data-reader","id":"data-reader__10"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__11","type":"workflowNode","position":{"x":-1362.1850633096815,"y":580.0164318302068},"data":{"nodeType":"data-mapper","id":"data-mapper__11"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__13","type":"workflowNode","position":{"x":-416.1850633096815,"y":390.0164318302068},"data":{"nodeType":"filter","id":"filter__13"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__14","type":"workflowNode","position":{"x":-410.1850633096815,"y":756.0164318302068},"data":{"nodeType":"aggregate","id":"aggregate__14"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-writer__15","type":"workflowNode","position":{"x":181.81493669031852,"y":736.0164318302068},"data":{"nodeType":"data-writer","id":"data-writer__15"},"measured":{"width":320,"height":138},"selected":false,"dragging":false},{"id":"aggregate__16","type":"workflowNode","position":{"x":1381.8149366903185,"y":378.0164318302068},"data":{"nodeType":"aggregate","id":"aggregate__16"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"code-transform__17","type":"workflowNode","position":{"x":2045.8149366903185,"y":-57.983568169793216},"data":{"nodeType":"code-transform","id":"code-transform__17"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-writer__18","type":"workflowNode","position":{"x":2611.8149366903185,"y":-81.98356816979322},"data":{"nodeType":"data-writer","id":"data-writer__18"},"measured":{"width":320,"height":138},"selected":false,"dragging":false},{"id":"code-transform__0","type":"workflowNode","position":{"x":-928.2552865705557,"y":564.8811573007712},"data":{"nodeType":"code-transform","id":"code-transform__0"},"measured":{"width":320,"height":110},"selected":false,"dragging":false}],"nodesData":{"data-reader__1":{"isDefault":false,"iceberg_catalog":"dremio","typeLabel":"Spark","name":"success_payments_reader","type":"SparkIcebergReader","table_name":"payments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{}},"data-mapper__0":{"name":"success_payments_mapper","type":"DataMapping","fromDataReader":"success_payments_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"amount","valueExpression":"amount"},{"fieldName":"gateway","valueExpression":"gateway"},{"fieldName":"payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756374444833","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756374453565","newFieldName":"amount","mappingType":"sourceColumn","value":"amount"},{"id":"mapping-1756374468084","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"},{"id":"mapping-1756374477932","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__1":{"name":"final_success_payments","type":"Filter","datasource":"success_payments_mapper","condition":" payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \\'CCS\\'","isDefault":false},"filter__3":{"name":"high_valued_payments_filter","type":"Filter","datasource":"final_success_payments","condition":"amount >= 500","isDefault":false},"aggregate__7":{"name":"total_payments_and_total_value_processed","type":"SQLAggregation","datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_payments","aggregationFunction":"count('*')"},{"fieldName":"total_value_processed","aggregationFunction":"sum('amount')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["final_success_payments"]},"aggregate__4":{"name":"aggregate__4","type":"SQLAggregation","datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date, payment_method"},"selectFunctions":[{"fieldName":"method_count","aggregationFunction":"count('*')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["final_success_payments"]},"data-mapper__5":{"name":"data_mapper__5","type":"DataMapping","fromDataReader":"aggregate__4","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"method_count","valueExpression":"method_count"},{"fieldName":"rank_method","valueExpression":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375393396","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375408077","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756375415696","newFieldName":"method_count","mappingType":"sourceColumn","value":"method_count"},{"id":"mapping-1756375432743","newFieldName":"rank_method","mappingType":"expression","value":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__6":{"name":"filter__6","type":"Filter","datasource":"data_mapper__5","condition":"rank_method = 1","isDefault":false},"data-mapper__8":{"name":"most_used_payment_method__","type":"DataMapping","fromDataReader":"filter__6","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"most_used_payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375574506","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375594556","newFieldName":"most_used_payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"aggregate__9":{"name":"high_valued_payments__","type":"SQLAggregation","datasource":"high_valued_payments_filter","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"high_valued_payments","aggregationFunction":"count('*')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["high_valued_payments_filter"]},"data-reader__10":{"iceberg_catalog":"dremio","typeLabel":"Spark","isDefault":false,"name":"failed_payments_reader","type":"SparkIcebergReader","table_name":"failedpayments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{}},"data-mapper__11":{"name":"failed_payments_mapper","type":"DataMapping","fromDataReader":"failed_payments_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"failure_reason","valueExpression":"failure_reason"},{"fieldName":"gateway","valueExpression":"gateway"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756377601359","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756377617744","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756377625433","newFieldName":"failure_reason","mappingType":"sourceColumn","value":"failure_reason"},{"id":"mapping-1756377634290","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__13":{"name":"filter__13","type":"Filter","datasource":"final_failed_payments","condition":"gateway = \\'CCS\\'","isDefault":false},"aggregate__16":{"name":"total_failed_payments__","type":"SQLAggregation","datasource":"filter__13","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_failed_payments","aggregationFunction":"count('*')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["filter__13"]},"aggregate__14":{"name":"failed_payment_metrics","type":"SQLAggregation","datasource":"final_failed_payments","groupByParams":{"group_expression":"payment_date, gateway, failure_reason"},"selectFunctions":[{"fieldName":"failure_count","aggregationFunction":"count('*')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["final_failed_payments"]},"data-writer__15":{"name":"data_writer__15","type":"IcebergWriter","iceberg_catalog":"dremio","warehouse_directory":"failedpaymentmetrics","datasource":"failed_payment_metrics","mode":"merge","typeLabel":"Iceberg","unique_id":["payment_date","gateway","failure_reason"],"isDefault":false},"code-transform__17":{"name":"success_payment_metrics","type":"CodeTransform","code":"print(total_payments_and_total_value_processed_df.columns)\nprint(most_used_payment_method___df.columns)\nprint(high_valued_payments___df.columns)\nprint(total_failed_payments___df.columns)\n\n{{name}}_df = spark.sql(\"\"\"\nSELECT \n COALESCE(a.payment_date, d.payment_date) AS payment_date,\n a.total_payments,\n a.total_value_processed,\n b.most_used_payment_method,\n c.high_valued_payments,\n d.total_failed_payments\nFROM total_failed_payments___df d\nFULL OUTER JOIN total_payments_and_total_value_processed_df a \n ON a.payment_date = d.payment_date\nLEFT JOIN most_used_payment_method___df b \n ON a.payment_date = b.payment_date\nLEFT JOIN high_valued_payments___df c \n ON a.payment_date = c.payment_date\n\"\"\")\n\n{{name}}_df.createOrReplaceTempView('{{name}}_df')","isDefault":false},"data-writer__18":{"name":"success_payment_metrics_writer","type":"IcebergWriter","iceberg_catalog":"dremio","warehouse_directory":"successpaymentmetrics","datasource":"success_payment_metrics","mode":"merge","typeLabel":"Iceberg (Legacy)","unique_id":["payment_date"],"isDefault":false},"code-transform__0":{"name":"final_failed_payments","type":"CodeTransform","language":"python","datasource":"failed_payments_mapper","code":"print(failed_payments_mapper_df.columns)\nfinal_failed_payments_df = spark.sql(\"select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))\")\n{{name}}_df.createOrReplaceTempView('final_failed_payments_df')\n{{name}}_df.persist()","isDefault":false,"connectedComponents":["failed_payments_mapper"]}}},"blocks":[{"name":"success_payments_reader","type":"SparkIcebergReader","options":{},"isDefault":false,"iceberg_catalog":"dremio","typeLabel":"Spark","table_name":"payments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"}},{"name":"success_payments_mapper","type":"DataMapping","options":{},"fromDataReader":"success_payments_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"amount","valueExpression":"amount"},{"fieldName":"gateway","valueExpression":"gateway"},{"fieldName":"payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756374444833","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756374453565","newFieldName":"amount","mappingType":"sourceColumn","value":"amount"},{"id":"mapping-1756374468084","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"},{"id":"mapping-1756374477932","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"final_success_payments","type":"Filter","options":{},"datasource":"success_payments_mapper","condition":" payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \\'CCS\\'","isDefault":false},{"name":"high_valued_payments_filter","type":"Filter","options":{},"datasource":"final_success_payments","condition":"amount >= 500","isDefault":false},{"name":"total_payments_and_total_value_processed","type":"SQLAggregation","options":{},"datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_payments","aggregationFunction":"count('*')"},{"fieldName":"total_value_processed","aggregationFunction":"sum('amount')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["final_success_payments"]},{"name":"aggregate__4","type":"SQLAggregation","options":{},"datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date, payment_method"},"selectFunctions":[{"fieldName":"method_count","aggregationFunction":"count('*')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["final_success_payments"]},{"name":"data_mapper__5","type":"DataMapping","options":{},"fromDataReader":"aggregate__4","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"method_count","valueExpression":"method_count"},{"fieldName":"rank_method","valueExpression":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375393396","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375408077","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756375415696","newFieldName":"method_count","mappingType":"sourceColumn","value":"method_count"},{"id":"mapping-1756375432743","newFieldName":"rank_method","mappingType":"expression","value":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"filter__6","type":"Filter","options":{},"datasource":"data_mapper__5","condition":"rank_method = 1","isDefault":false},{"name":"most_used_payment_method__","type":"DataMapping","options":{},"fromDataReader":"filter__6","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"most_used_payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375574506","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375594556","newFieldName":"most_used_payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"high_valued_payments__","type":"SQLAggregation","options":{},"datasource":"high_valued_payments_filter","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"high_valued_payments","aggregationFunction":"count('*')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["high_valued_payments_filter"]},{"name":"failed_payments_reader","type":"SparkIcebergReader","options":{},"iceberg_catalog":"dremio","typeLabel":"Spark","isDefault":false,"table_name":"failedpayments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"}},{"name":"failed_payments_mapper","type":"DataMapping","options":{},"fromDataReader":"failed_payments_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"failure_reason","valueExpression":"failure_reason"},{"fieldName":"gateway","valueExpression":"gateway"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756377601359","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756377617744","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756377625433","newFieldName":"failure_reason","mappingType":"sourceColumn","value":"failure_reason"},{"id":"mapping-1756377634290","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"filter__13","type":"Filter","options":{},"datasource":"final_failed_payments","condition":"gateway = \\'CCS\\'","isDefault":false},{"name":"total_failed_payments__","type":"SQLAggregation","options":{},"datasource":"filter__13","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_failed_payments","aggregationFunction":"count('*')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["filter__13"]},{"name":"failed_payment_metrics","type":"SQLAggregation","options":{},"datasource":"final_failed_payments","groupByParams":{"group_expression":"payment_date, gateway, failure_reason"},"selectFunctions":[{"fieldName":"failure_count","aggregationFunction":"count('*')"}],"isDefault":false,"materialization_strategy":"NONE","fail_on_error":true,"connectedComponents":["final_failed_payments"]},{"name":"data_writer__15","type":"IcebergWriter","options":{},"iceberg_catalog":"dremio","warehouse_directory":"failedpaymentmetrics","datasource":"failed_payment_metrics","mode":"merge","typeLabel":"Iceberg","unique_id":["payment_date","gateway","failure_reason"],"isDefault":false},{"name":"success_payment_metrics","type":"CodeTransform","options":{},"code":"print(total_payments_and_total_value_processed_df.columns)\nprint(most_used_payment_method___df.columns)\nprint(high_valued_payments___df.columns)\nprint(total_failed_payments___df.columns)\n\n{{name}}_df = spark.sql(\"\"\"\nSELECT \n COALESCE(a.payment_date, d.payment_date) AS payment_date,\n a.total_payments,\n a.total_value_processed,\n b.most_used_payment_method,\n c.high_valued_payments,\n d.total_failed_payments\nFROM total_failed_payments___df d\nFULL OUTER JOIN total_payments_and_total_value_processed_df a \n ON a.payment_date = d.payment_date\nLEFT JOIN most_used_payment_method___df b \n ON a.payment_date = b.payment_date\nLEFT JOIN high_valued_payments___df c \n ON a.payment_date = c.payment_date\n\"\"\")\n\n{{name}}_df.createOrReplaceTempView('{{name}}_df')","isDefault":false},{"name":"success_payment_metrics_writer","type":"IcebergWriter","options":{},"iceberg_catalog":"dremio","warehouse_directory":"successpaymentmetrics","datasource":"success_payment_metrics","mode":"merge","typeLabel":"Iceberg (Legacy)","unique_id":["payment_date"],"isDefault":false},{"name":"final_failed_payments","type":"CodeTransform","options":{},"language":"python","datasource":"failed_payments_mapper","code":"print(failed_payments_mapper_df.columns)\nfinal_failed_payments_df = spark.sql(\"select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))\")\n{{name}}_df.createOrReplaceTempView('final_failed_payments_df')\n{{name}}_df.persist()","isDefault":false,"connectedComponents":["failed_payments_mapper"]}]}} \ No newline at end of file