__generated_with = "0.13.15" # %% import sys import time from pyspark.sql.utils import AnalysisException sys.path.append('/opt/spark/work-dir/') from workflow_templates.spark.udf_manager import bootstrap_udfs from util import get_logger, observe_metrics, collect_metrics, log_info, log_error, forgiving_serializer from pyspark.sql.functions import udf from pyspark.sql.functions import count, expr, lit from pyspark.sql.types import StringType, IntegerType import uuid from pathlib import Path from pyspark import SparkConf, Row from pyspark.sql import SparkSession from pyspark.sql.observation import Observation from pyspark import StorageLevel import os import pandas as pd import polars as pl import pyarrow as pa from pyspark.sql.functions import approx_count_distinct, avg, collect_list, collect_set, corr, count, countDistinct, covar_pop, covar_samp, first, kurtosis, last, max, mean, min, skewness, stddev, stddev_pop, stddev_samp, sum, var_pop, var_samp, variance,expr,to_json,struct, date_format, col, lit, when, regexp_replace, ltrim from functools import reduce from handle_structs_or_arrays import preprocess_then_expand import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from jinja2 import Template import json from secrets_manager import SecretsManager from WorkflowManager import WorkflowDSL, WorkflowManager from KnowledgebaseManager import KnowledgebaseManager from gitea_client import GiteaClient, WorkspaceVersionedContent from FilesystemManager import FilesystemManager, SupportedFilesystemType from Materialization import Materialization from dremio.flight.endpoint import DremioFlightEndpoint from dremio.flight.query import DremioFlightEndpointQuery init_start_time=time.time() LOGGER = get_logger() alias_str='abcdefghijklmnopqrstuvwxyz' workspace = os.getenv('WORKSPACE') or 'exp360cust' workflow = 'payment_metrics' execution_environment = os.getenv('EXECUTION_ENVIRONMENT') or 'CLUSTER' job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) retry_job_id = os.getenv("RETRY_EXECUTION_ID") or '' log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}', Retry Job Id: '{retry_job_id}'") sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) secrets = sm.list_secrets(workspace) gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) filesystemManager = FilesystemManager.create(secrets.get('LAKEHOUSE_BUCKET'), storage_options={'key': secrets.get('S3_ACCESS_KEY'), 'secret': secrets.get('S3_SECRET_KEY'), 'region': secrets.get('S3_REGION')}) if retry_job_id: logs = Materialization.get_execution_history_by_job_id(filesystemManager, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, retry_job_id, selected_components=['finalize']).to_dicts() if len(logs) == 1 and logs[0].get('metrics').get('execute_status') == 'SUCCESS': log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}' - Retry Job Id: '{retry_job_id}' was already successful. Hence exiting to forward processing to next in chain.") sys.exit(0) _conf = SparkConf() _params = { "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), "spark.hadoop.fs.s3a.aws.region": secrets.get("S3_REGION") or "None", "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "spark.hadoop.fs.s3.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dremio.type" : "hadoop", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0,ch.cern.sparkmeasure:spark-measure_2.12:0.26" } _conf.setAll(list(_params.items())) spark = SparkSession.builder.appName(workspace).config(conf=_conf).getOrCreate() bootstrap_udfs(spark) materialization = Materialization(spark, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, job_id, retry_job_id, execution_environment, LOGGER) init_dependency_key="init" init_end_time=time.time() # %% success_payments_reader_start_time=time.time() success_payments_reader_fail_on_error="" try: success_payments_reader_df = spark.read.table('dremio.payments') success_payments_reader_df, success_payments_reader_observer = observe_metrics("success_payments_reader_df", success_payments_reader_df) success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df') success_payments_reader_dependency_key="success_payments_reader" success_payments_reader_execute_status="SUCCESS" except Exception as e: success_payments_reader_error = e log_error(LOGGER, f"Component success_payments_reader Failed", e) success_payments_reader_execute_status="ERROR" raise e finally: success_payments_reader_end_time=time.time() # %% success_payments_mapper_start_time=time.time() success_payments_mapper_fail_on_error="" try: _success_payments_mapper_select_clause=success_payments_reader_df.columns if False else [] _success_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''') _success_payments_mapper_select_clause.append('''amount AS amount''') _success_payments_mapper_select_clause.append('''gateway AS gateway''') _success_payments_mapper_select_clause.append('''payment_method AS payment_method''') try: success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'")) except Exception as e: success_payments_mapper_df = success_payments_reader_df.limit(0) log_info(LOGGER, f"error while mapping the data :{e} " ) success_payments_mapper_df, success_payments_mapper_observer = observe_metrics("success_payments_mapper_df", success_payments_mapper_df) success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") success_payments_mapper_dependency_key="success_payments_mapper" success_payments_mapper_execute_status="SUCCESS" except Exception as e: success_payments_mapper_error = e log_error(LOGGER, f"Component success_payments_mapper Failed", e) success_payments_mapper_execute_status="ERROR" raise e finally: success_payments_mapper_end_time=time.time() # %% final_success_payments_start_time=time.time() print(success_payments_mapper_df.columns) final_success_payments_fail_on_error="" try: try: final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'") except AnalysisException as e: log_info(LOGGER, f"error while filtering data : {e}") final_success_payments_df = success_payments_mapper_df.limit(0) except Exception as e: log_info(LOGGER, f"Unexpected error: {e}") final_success_payments_df = success_payments_mapper_df.limit(0) final_success_payments_df, final_success_payments_observer = observe_metrics("final_success_payments_df", final_success_payments_df) final_success_payments_df.createOrReplaceTempView('final_success_payments_df') final_success_payments_dependency_key="final_success_payments" final_success_payments_execute_status="SUCCESS" except Exception as e: final_success_payments_error = e log_error(LOGGER, f"Component final_success_payments Failed", e) final_success_payments_execute_status="ERROR" raise e finally: final_success_payments_end_time=time.time() # %% high_valued_payments_filter_start_time=time.time() print(final_success_payments_df.columns) high_valued_payments_filter_fail_on_error="" try: try: high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500") except AnalysisException as e: log_info(LOGGER, f"error while filtering data : {e}") high_valued_payments_filter_df = final_success_payments_df.limit(0) except Exception as e: log_info(LOGGER, f"Unexpected error: {e}") high_valued_payments_filter_df = final_success_payments_df.limit(0) high_valued_payments_filter_df, high_valued_payments_filter_observer = observe_metrics("high_valued_payments_filter_df", high_valued_payments_filter_df) high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df') high_valued_payments_filter_dependency_key="high_valued_payments_filter" high_valued_payments_filter_execute_status="SUCCESS" except Exception as e: high_valued_payments_filter_error = e log_error(LOGGER, f"Component high_valued_payments_filter Failed", e) high_valued_payments_filter_execute_status="ERROR" raise e finally: high_valued_payments_filter_end_time=time.time() # %% total_payments_and_total_value_processed_start_time=time.time() total_payments_and_total_value_processed_fail_on_error="True" try: total_payments_and_total_value_processed_df = final_success_payments_df.groupBy( "payment_date" ).agg( count('*').alias("total_payments"), sum('amount').alias("total_value_processed") ) total_payments_and_total_value_processed_df, total_payments_and_total_value_processed_observer = observe_metrics("total_payments_and_total_value_processed_df", total_payments_and_total_value_processed_df) total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df') total_payments_and_total_value_processed_dependency_key="total_payments_and_total_value_processed" print(final_success_payments_dependency_key) total_payments_and_total_value_processed_execute_status="SUCCESS" except Exception as e: total_payments_and_total_value_processed_error = e log_error(LOGGER, f"Component total_payments_and_total_value_processed Failed", e) total_payments_and_total_value_processed_execute_status="ERROR" raise e finally: total_payments_and_total_value_processed_end_time=time.time() # %% aggregate__4_start_time=time.time() aggregate__4_fail_on_error="True" try: aggregate__4_df = final_success_payments_df.groupBy( "payment_date", "payment_method" ).agg( count('*').alias("method_count") ) aggregate__4_df, aggregate__4_observer = observe_metrics("aggregate__4_df", aggregate__4_df) aggregate__4_df.createOrReplaceTempView('aggregate__4_df') aggregate__4_dependency_key="aggregate__4" print(final_success_payments_dependency_key) aggregate__4_execute_status="SUCCESS" except Exception as e: aggregate__4_error = e log_error(LOGGER, f"Component aggregate__4 Failed", e) aggregate__4_execute_status="ERROR" raise e finally: aggregate__4_end_time=time.time() # %% data_mapper__5_start_time=time.time() data_mapper__5_fail_on_error="" try: _data_mapper__5_select_clause=aggregate__4_df.columns if False else [] _data_mapper__5_select_clause.append('''payment_date AS payment_date''') _data_mapper__5_select_clause.append('''payment_method AS payment_method''') _data_mapper__5_select_clause.append('''method_count AS method_count''') _data_mapper__5_select_clause.append('''RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method''') try: data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'")) except Exception as e: data_mapper__5_df = aggregate__4_df.limit(0) log_info(LOGGER, f"error while mapping the data :{e} " ) data_mapper__5_df, data_mapper__5_observer = observe_metrics("data_mapper__5_df", data_mapper__5_df) data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df") data_mapper__5_dependency_key="data_mapper__5" data_mapper__5_execute_status="SUCCESS" except Exception as e: data_mapper__5_error = e log_error(LOGGER, f"Component data_mapper__5 Failed", e) data_mapper__5_execute_status="ERROR" raise e finally: data_mapper__5_end_time=time.time() # %% filter__6_start_time=time.time() print(data_mapper__5_df.columns) filter__6_fail_on_error="" try: try: filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1") except AnalysisException as e: log_info(LOGGER, f"error while filtering data : {e}") filter__6_df = data_mapper__5_df.limit(0) except Exception as e: log_info(LOGGER, f"Unexpected error: {e}") filter__6_df = data_mapper__5_df.limit(0) filter__6_df, filter__6_observer = observe_metrics("filter__6_df", filter__6_df) filter__6_df.createOrReplaceTempView('filter__6_df') filter__6_dependency_key="filter__6" filter__6_execute_status="SUCCESS" except Exception as e: filter__6_error = e log_error(LOGGER, f"Component filter__6 Failed", e) filter__6_execute_status="ERROR" raise e finally: filter__6_end_time=time.time() # %% most_used_payment_method___start_time=time.time() most_used_payment_method___fail_on_error="" try: _most_used_payment_method___select_clause=filter__6_df.columns if False else [] _most_used_payment_method___select_clause.append('''payment_date AS payment_date''') _most_used_payment_method___select_clause.append('''payment_method AS most_used_payment_method''') try: most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'")) except Exception as e: most_used_payment_method___df = filter__6_df.limit(0) log_info(LOGGER, f"error while mapping the data :{e} " ) most_used_payment_method___df, most_used_payment_method___observer = observe_metrics("most_used_payment_method___df", most_used_payment_method___df) most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df") most_used_payment_method___dependency_key="most_used_payment_method__" most_used_payment_method___execute_status="SUCCESS" except Exception as e: most_used_payment_method___error = e log_error(LOGGER, f"Component most_used_payment_method__ Failed", e) most_used_payment_method___execute_status="ERROR" raise e finally: most_used_payment_method___end_time=time.time() # %% high_valued_payments___start_time=time.time() high_valued_payments___fail_on_error="True" try: high_valued_payments___df = high_valued_payments_filter_df.groupBy( "payment_date" ).agg( count('*').alias("high_valued_payments") ) high_valued_payments___df, high_valued_payments___observer = observe_metrics("high_valued_payments___df", high_valued_payments___df) high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df') high_valued_payments___dependency_key="high_valued_payments__" print(high_valued_payments_filter_dependency_key) high_valued_payments___execute_status="SUCCESS" except Exception as e: high_valued_payments___error = e log_error(LOGGER, f"Component high_valued_payments__ Failed", e) high_valued_payments___execute_status="ERROR" raise e finally: high_valued_payments___end_time=time.time() # %% failed_payments_reader_start_time=time.time() failed_payments_reader_fail_on_error="" try: failed_payments_reader_df = spark.read.table('dremio.failedpayments') failed_payments_reader_df, failed_payments_reader_observer = observe_metrics("failed_payments_reader_df", failed_payments_reader_df) failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df') failed_payments_reader_dependency_key="failed_payments_reader" failed_payments_reader_execute_status="SUCCESS" except Exception as e: failed_payments_reader_error = e log_error(LOGGER, f"Component failed_payments_reader Failed", e) failed_payments_reader_execute_status="ERROR" raise e finally: failed_payments_reader_end_time=time.time() # %% failed_payments_mapper_start_time=time.time() failed_payments_mapper_fail_on_error="" try: _failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else [] _failed_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''') _failed_payments_mapper_select_clause.append('''payment_method AS payment_method''') _failed_payments_mapper_select_clause.append('''failure_reason AS failure_reason''') _failed_payments_mapper_select_clause.append('''gateway AS gateway''') try: failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'")) except Exception as e: failed_payments_mapper_df = failed_payments_reader_df.limit(0) log_info(LOGGER, f"error while mapping the data :{e} " ) failed_payments_mapper_df, failed_payments_mapper_observer = observe_metrics("failed_payments_mapper_df", failed_payments_mapper_df) failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df") failed_payments_mapper_dependency_key="failed_payments_mapper" failed_payments_mapper_execute_status="SUCCESS" except Exception as e: failed_payments_mapper_error = e log_error(LOGGER, f"Component failed_payments_mapper Failed", e) failed_payments_mapper_execute_status="ERROR" raise e finally: failed_payments_mapper_end_time=time.time() # %% final_failed_payments_start_time=time.time() print(failed_payments_mapper_df.columns) final_failed_payments_df = spark.sql("select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))") final_failed_payments_df.createOrReplaceTempView('final_failed_payments_df') final_failed_payments_df.persist() final_failed_payments_end_time=time.time() final_failed_payments_dependency_key="final_failed_payments" print(failed_payments_mapper_dependency_key) # %% filter__13_start_time=time.time() print(final_failed_payments_df.columns) filter__13_fail_on_error="" try: try: filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'") except AnalysisException as e: log_info(LOGGER, f"error while filtering data : {e}") filter__13_df = final_failed_payments_df.limit(0) except Exception as e: log_info(LOGGER, f"Unexpected error: {e}") filter__13_df = final_failed_payments_df.limit(0) filter__13_df, filter__13_observer = observe_metrics("filter__13_df", filter__13_df) filter__13_df.createOrReplaceTempView('filter__13_df') filter__13_dependency_key="filter__13" filter__13_execute_status="SUCCESS" except Exception as e: filter__13_error = e log_error(LOGGER, f"Component filter__13 Failed", e) filter__13_execute_status="ERROR" raise e finally: filter__13_end_time=time.time() # %% total_failed_payments___start_time=time.time() total_failed_payments___fail_on_error="True" try: total_failed_payments___df = filter__13_df.groupBy( "payment_date" ).agg( count('*').alias("total_failed_payments") ) total_failed_payments___df, total_failed_payments___observer = observe_metrics("total_failed_payments___df", total_failed_payments___df) total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df') total_failed_payments___dependency_key="total_failed_payments__" print(filter__13_dependency_key) total_failed_payments___execute_status="SUCCESS" except Exception as e: total_failed_payments___error = e log_error(LOGGER, f"Component total_failed_payments__ Failed", e) total_failed_payments___execute_status="ERROR" raise e finally: total_failed_payments___end_time=time.time() # %% failed_payment_metrics_start_time=time.time() failed_payment_metrics_fail_on_error="True" try: failed_payment_metrics_df = final_failed_payments_df.groupBy( "payment_date", "gateway", "failure_reason" ).agg( count('*').alias("failure_count") ) failed_payment_metrics_df, failed_payment_metrics_observer = observe_metrics("failed_payment_metrics_df", failed_payment_metrics_df) failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df') failed_payment_metrics_dependency_key="failed_payment_metrics" print(final_failed_payments_dependency_key) failed_payment_metrics_execute_status="SUCCESS" except Exception as e: failed_payment_metrics_error = e log_error(LOGGER, f"Component failed_payment_metrics Failed", e) failed_payment_metrics_execute_status="ERROR" raise e finally: failed_payment_metrics_end_time=time.time() # %% data_writer__15_start_time=time.time() data_writer__15_fail_on_error="" try: _data_writer__15_fields_to_update = failed_payment_metrics_df.columns _data_writer__15_set_clause=[] _data_writer__15_unique_key_clause= [] for _key in ['payment_date', 'gateway', 'failure_reason']: _data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}') for _field in _data_writer__15_fields_to_update: if(_field not in _data_writer__15_unique_key_clause): _data_writer__15_set_clause.append(f't.{_field} = s.{_field}') _merge_query = ''' MERGE INTO dremio.failedpaymentmetrics t USING failed_payment_metrics_df s ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' spark.sql(_merge_query) data_writer__15_dependency_key="data_writer__15" data_writer__15_execute_status="SUCCESS" except Exception as e: data_writer__15_error = e log_error(LOGGER, f"Component data_writer__15 Failed", e) data_writer__15_execute_status="ERROR" raise e finally: data_writer__15_end_time=time.time() # %% success_payment_metrics_start_time=time.time() print(total_payments_and_total_value_processed_df.columns) print(most_used_payment_method___df.columns) print(high_valued_payments___df.columns) print(total_failed_payments___df.columns) success_payment_metrics_df = spark.sql(""" SELECT COALESCE(a.payment_date, d.payment_date) AS payment_date, a.total_payments, a.total_value_processed, b.most_used_payment_method, c.high_valued_payments, d.total_failed_payments FROM total_failed_payments___df d FULL OUTER JOIN total_payments_and_total_value_processed_df a ON a.payment_date = d.payment_date LEFT JOIN most_used_payment_method___df b ON a.payment_date = b.payment_date LEFT JOIN high_valued_payments___df c ON a.payment_date = c.payment_date """) success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df') success_payment_metrics_end_time=time.time() success_payment_metrics_dependency_key="success_payment_metrics" # %% success_payment_metrics_writer_start_time=time.time() success_payment_metrics_writer_fail_on_error="" try: _success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns _success_payment_metrics_writer_set_clause=[] _success_payment_metrics_writer_unique_key_clause= [] for _key in ['payment_date']: _success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}') for _field in _success_payment_metrics_writer_fields_to_update: if(_field not in _success_payment_metrics_writer_unique_key_clause): _success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}') _merge_query = ''' MERGE INTO dremio.successpaymentmetrics t USING success_payment_metrics_df s ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' spark.sql(_merge_query) success_payment_metrics_writer_dependency_key="success_payment_metrics_writer" success_payment_metrics_writer_execute_status="SUCCESS" except Exception as e: success_payment_metrics_writer_error = e log_error(LOGGER, f"Component success_payment_metrics_writer Failed", e) success_payment_metrics_writer_execute_status="ERROR" raise e finally: success_payment_metrics_writer_end_time=time.time() # %% finalize_start_time=time.time() metrics = { 'data': collect_metrics(locals()), } materialization.materialized_execution_history({'finalize': {'execute_status': 'SUCCESS', 'fail_on_error': 'False'}, **metrics['data']}) log_info(LOGGER, f"Workflow Data metrics: {metrics['data']}") finalize_end_time=time.time() spark.stop()