From 4554a42d83995ee4e2b77c6ede9c5f8ffa526fca Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 28 Aug 2025 10:09:39 +0000 Subject: [PATCH] Workflow saved --- payment_metrics/main.py | 211 ++++++++++++++++++++++++- payment_metrics/main.py.notebook | 260 ++++++++++++++++++++++++++++++- payment_metrics/main.workflow | 2 +- 3 files changed, 460 insertions(+), 13 deletions(-) diff --git a/payment_metrics/main.py b/payment_metrics/main.py index d6e7a33..509052b 100644 --- a/payment_metrics/main.py +++ b/payment_metrics/main.py @@ -50,13 +50,14 @@ params = { "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), "spark.hadoop.fs.s3a.aws.region": "us-west-1", - "spark.sql.catalog.dremio.warehouse" : 's3://'+ (secrets.get('LAKEHOUSE_BUCKET') or ''), + "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dremio.type" : "hadoop", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2" + "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" } @@ -70,5 +71,205 @@ bootstrap_udfs(spark) -data_reader__1_df = spark.read.table('dremio.payments') -data_reader__1_df.createOrReplaceTempView('data_reader__1_df') +success_payments_df = spark.read.table('dremio.payments') +success_payments_df.createOrReplaceTempView('success_payments_df') + +# %% + +_success_payments_mapper_select_clause=success_payments_df.columns if False else [] + +_success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date") + +_success_payments_mapper_select_clause.append("amount AS amount") + +_success_payments_mapper_select_clause.append("gateway AS gateway") + +_success_payments_mapper_select_clause.append("payment_method AS payment_method") + + +success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_df").replace("{job_id}",f"'{job_id}'")) +success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") + +# %% + +print(success_payments_mapper_df.columns) +final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'") +final_success_payments_df.createOrReplaceTempView('final_success_payments_df') + +# %% + +print(final_success_payments_df.columns) +high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500") +high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df') + +# %% + + + + + + + + + +_params = { + "datasource": "final_success_payments", + "selectFunctions" : [{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] +} + +_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, +group_expression="payment_date", +cube="", +rollup="", +grouping_set="", +select_functions=[{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] +) + +_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) +for f in _rewritten_selects +] + +_all_group_cols = list({c for gs in _grouping_specs for c in gs}) + +_partials = [] +for _gs in _grouping_specs: + _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) + for _col in _all_group_cols: + if _col not in _gs: + _gdf = _gdf.withColumn(_col, lit(None)) + _partials.append(_gdf) + + +total_payments_and_total_value_processed_df = reduce(lambda a, b: a.unionByName(b), _partials) + +total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df') + + + + + +# %% + + + + + + + + + +_params = { + "datasource": "final_success_payments", + "selectFunctions" : [{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] +} + +_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, +group_expression="payment_date, payment_method", +cube="", +rollup="", +grouping_set="", +select_functions=[{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] +) + +_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) +for f in _rewritten_selects +] + +_all_group_cols = list({c for gs in _grouping_specs for c in gs}) + +_partials = [] +for _gs in _grouping_specs: + _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) + for _col in _all_group_cols: + if _col not in _gs: + _gdf = _gdf.withColumn(_col, lit(None)) + _partials.append(_gdf) + + +aggregate__4_df = reduce(lambda a, b: a.unionByName(b), _partials) + +aggregate__4_df.createOrReplaceTempView('aggregate__4_df') + + + + + +# %% + +_data_mapper__5_select_clause=aggregate__4_df.columns if False else [] + +_data_mapper__5_select_clause.append("payment_date AS payment_date") + +_data_mapper__5_select_clause.append("payment_method AS payment_method") + +_data_mapper__5_select_clause.append("method_count AS method_count") + +_data_mapper__5_select_clause.append("RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method") + + +data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'")) +data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df") + +# %% + +print(data_mapper__5_df.columns) +filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1") +filter__6_df.createOrReplaceTempView('filter__6_df') + +# %% + +_most_used_payment_method___select_clause=filter__6_df.columns if False else [] + +_most_used_payment_method___select_clause.append("payment_date AS payment_date") + +_most_used_payment_method___select_clause.append("payment_method AS most_used_payment_method") + + +most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'")) +most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df") + +# %% + + + + + + + + + +_params = { + "datasource": "high_valued_payments_filter", + "selectFunctions" : [{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] +} + +_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( high_valued_payments_filter_df, +group_expression="payment_date", +cube="", +rollup="", +grouping_set="", +select_functions=[{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] +) + +_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) +for f in _rewritten_selects +] + +_all_group_cols = list({c for gs in _grouping_specs for c in gs}) + +_partials = [] +for _gs in _grouping_specs: + _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) + for _col in _all_group_cols: + if _col not in _gs: + _gdf = _gdf.withColumn(_col, lit(None)) + _partials.append(_gdf) + + +high_valued_payments_df = reduce(lambda a, b: a.unionByName(b), _partials) + +high_valued_payments_df.createOrReplaceTempView('high_valued_payments_df') + + + + diff --git a/payment_metrics/main.py.notebook b/payment_metrics/main.py.notebook index 3503f74..bec9c5b 100644 --- a/payment_metrics/main.py.notebook +++ b/payment_metrics/main.py.notebook @@ -54,13 +54,14 @@ def init(): "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), "spark.hadoop.fs.s3a.aws.region": "us-west-1", - "spark.sql.catalog.dremio.warehouse" : 's3://'+ (secrets.get('LAKEHOUSE_BUCKET') or ''), + "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dremio.type" : "hadoop", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2" + "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" } @@ -69,16 +70,261 @@ def init(): spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() bootstrap_udfs(spark) - return (spark,) + return expr, job_id, lit, preprocess_then_expand, reduce, spark @app.cell -def data_reader__1(spark): +def success_payments(spark): + + + + success_payments_df = spark.read.table('dremio.payments') + success_payments_df.createOrReplaceTempView('success_payments_df') + return (success_payments_df,) + + +@app.cell +def success_payments_mapper(job_id, spark, success_payments_df): + + _success_payments_mapper_select_clause=success_payments_df.columns if False else [] + + _success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date") + + _success_payments_mapper_select_clause.append("amount AS amount") + + _success_payments_mapper_select_clause.append("gateway AS gateway") + + _success_payments_mapper_select_clause.append("payment_method AS payment_method") + + + success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_df").replace("{job_id}",f"'{job_id}'")) + success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") + return (success_payments_mapper_df,) + + +@app.cell +def final_success_payments(spark, success_payments_mapper_df): + + print(success_payments_mapper_df.columns) + final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'") + final_success_payments_df.createOrReplaceTempView('final_success_payments_df') + return (final_success_payments_df,) + + +@app.cell +def high_valued_payments_filter(final_success_payments_df, spark): + + print(final_success_payments_df.columns) + high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500") + high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df') + return (high_valued_payments_filter_df,) + + +@app.cell +def total_payments_and_total_value_processed( + expr, + final_success_payments_df, + lit, + preprocess_then_expand, + reduce, +): + + + + + + + + + + _params = { + "datasource": "final_success_payments", + "selectFunctions" : [{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] + } + + _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, + group_expression="payment_date", + cube="", + rollup="", + grouping_set="", + select_functions=[{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] + ) + + _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) + for f in _rewritten_selects + ] + + _all_group_cols = list({c for gs in _grouping_specs for c in gs}) + + _partials = [] + for _gs in _grouping_specs: + _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) + for _col in _all_group_cols: + if _col not in _gs: + _gdf = _gdf.withColumn(_col, lit(None)) + _partials.append(_gdf) + + + total_payments_and_total_value_processed_df = reduce(lambda a, b: a.unionByName(b), _partials) + + total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df') + + + + + return + + +@app.cell +def aggregate__4( + expr, + final_success_payments_df, + lit, + preprocess_then_expand, + reduce, +): + + + + + + + + + + _params = { + "datasource": "final_success_payments", + "selectFunctions" : [{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] + } + + _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, + group_expression="payment_date, payment_method", + cube="", + rollup="", + grouping_set="", + select_functions=[{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] + ) + + _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) + for f in _rewritten_selects + ] + + _all_group_cols = list({c for gs in _grouping_specs for c in gs}) + + _partials = [] + for _gs in _grouping_specs: + _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) + for _col in _all_group_cols: + if _col not in _gs: + _gdf = _gdf.withColumn(_col, lit(None)) + _partials.append(_gdf) + + + aggregate__4_df = reduce(lambda a, b: a.unionByName(b), _partials) + + aggregate__4_df.createOrReplaceTempView('aggregate__4_df') + + + + + return (aggregate__4_df,) + + +@app.cell +def data_mapper__5(aggregate__4_df, job_id, spark): + + _data_mapper__5_select_clause=aggregate__4_df.columns if False else [] + + _data_mapper__5_select_clause.append("payment_date AS payment_date") + + _data_mapper__5_select_clause.append("payment_method AS payment_method") + + _data_mapper__5_select_clause.append("method_count AS method_count") + + _data_mapper__5_select_clause.append("RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method") + + + data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'")) + data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df") + return (data_mapper__5_df,) + + +@app.cell +def filter__6(data_mapper__5_df, spark): + + print(data_mapper__5_df.columns) + filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1") + filter__6_df.createOrReplaceTempView('filter__6_df') + return (filter__6_df,) + + +@app.cell +def most_used_payment_method__(filter__6_df, job_id, spark): + + _most_used_payment_method___select_clause=filter__6_df.columns if False else [] + + _most_used_payment_method___select_clause.append("payment_date AS payment_date") + + _most_used_payment_method___select_clause.append("payment_method AS most_used_payment_method") + + + most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'")) + most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df") + return + + +@app.cell +def high_valued_payments( + expr, + high_valued_payments_filter_df, + lit, + preprocess_then_expand, + reduce, +): + + + + + + + + + + _params = { + "datasource": "high_valued_payments_filter", + "selectFunctions" : [{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] + } + + _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( high_valued_payments_filter_df, + group_expression="payment_date", + cube="", + rollup="", + grouping_set="", + select_functions=[{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] + ) + + _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) + for f in _rewritten_selects + ] + + _all_group_cols = list({c for gs in _grouping_specs for c in gs}) + + _partials = [] + for _gs in _grouping_specs: + _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) + for _col in _all_group_cols: + if _col not in _gs: + _gdf = _gdf.withColumn(_col, lit(None)) + _partials.append(_gdf) + + + high_valued_payments_df = reduce(lambda a, b: a.unionByName(b), _partials) + + high_valued_payments_df.createOrReplaceTempView('high_valued_payments_df') + - data_reader__1_df = spark.read.table('dremio.payments') - data_reader__1_df.createOrReplaceTempView('data_reader__1_df') return diff --git a/payment_metrics/main.workflow b/payment_metrics/main.workflow index 32a9ae2..12b98d9 100644 --- a/payment_metrics/main.workflow +++ b/payment_metrics/main.workflow @@ -1 +1 @@ -{"version":"v1alpha","kind":"VisualBuilder","metadata":{"name":"payment_metrics","description":"Workflow to generate payment metrics.","runtime":"spark"},"spec":{"ui":{"edges":[],"nodes":[{"id":"data-reader__1","type":"workflowNode","position":{"x":618.5,"y":-49.5},"data":{"nodeType":"data-reader","id":"data-reader__1"},"measured":{"width":320,"height":110},"selected":false}],"nodesData":{"data-reader__1":{"isDefault":false,"iceberg_catalog":"dremio","typeLabel":"Spark","name":"data_reader__1","type":"SparkIcebergReader","table_name":"payments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{}}}},"blocks":[{"name":"data_reader__1","type":"SparkIcebergReader","options":{},"isDefault":false,"iceberg_catalog":"dremio","typeLabel":"Spark","table_name":"payments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"}}]}} \ No newline at end of file +{"version":"v1alpha","kind":"VisualBuilder","metadata":{"name":"payment_metrics","description":"Workflow to generate payment metrics.","runtime":"spark"},"spec":{"ui":{"edges":[{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-reader__1","target":"data-mapper__0","id":"xy-edge__data-reader__1-data-mapper__0"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__0","target":"filter__1","id":"xy-edge__data-mapper__0-filter__1"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"filter__3","id":"xy-edge__filter__1-filter__3"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"aggregate__4","id":"xy-edge__filter__1-aggregate__4"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__1","target":"aggregate__7","id":"xy-edge__filter__1-aggregate__7"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__4","target":"data-mapper__5","id":"xy-edge__aggregate__4-data-mapper__5"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__5","target":"filter__6","id":"xy-edge__data-mapper__5-filter__6"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__6","target":"data-mapper__8","id":"xy-edge__filter__6-data-mapper__8"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__3","target":"aggregate__9","id":"xy-edge__filter__3-aggregate__9"}],"nodes":[{"id":"data-reader__1","type":"workflowNode","position":{"x":-1768.5340546476618,"y":-162.4753522546897},"data":{"nodeType":"data-reader","id":"data-reader__1"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__0","type":"workflowNode","position":{"x":-1353.8681752505147,"y":-169.04851494308113},"data":{"nodeType":"data-mapper","id":"data-mapper__0"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__1","type":"workflowNode","position":{"x":-921.2468878928117,"y":-170.40923466009258},"data":{"nodeType":"filter","id":"filter__1"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__3","type":"workflowNode","position":{"x":-429.06026348805125,"y":120.4585301507131},"data":{"nodeType":"filter","id":"filter__3"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__4","type":"workflowNode","position":{"x":-408.1850633096815,"y":-543.9835681697932},"data":{"nodeType":"aggregate","id":"aggregate__4"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__5","type":"workflowNode","position":{"x":175.81493669031852,"y":-549.9835681697932},"data":{"nodeType":"data-mapper","id":"data-mapper__5"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__6","type":"workflowNode","position":{"x":767.8149366903185,"y":-547.9835681697932},"data":{"nodeType":"filter","id":"filter__6"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__7","type":"workflowNode","position":{"x":1393.8149366903185,"y":-165.98356816979322},"data":{"nodeType":"aggregate","id":"aggregate__7"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__8","type":"workflowNode","position":{"x":1281.8149366903185,"y":-555.9835681697932},"data":{"nodeType":"data-mapper","id":"data-mapper__8"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__9","type":"workflowNode","position":{"x":1413.8149366903185,"y":100.01643183020678},"data":{"nodeType":"aggregate","id":"aggregate__9"},"measured":{"width":320,"height":110},"selected":true,"dragging":false},{"id":"data-reader__10","type":"workflowNode","position":{"x":-1798.1850633096815,"y":568.0164318302068},"data":{"nodeType":"data-reader","id":"data-reader__10"},"measured":{"width":320,"height":142},"selected":false,"dragging":false},{"id":"data-mapper__11","type":"workflowNode","position":{"x":-1374.1850633096815,"y":566.0164318302068},"data":{"nodeType":"data-mapper","id":"data-mapper__11"},"measured":{"width":320,"height":142},"selected":false,"dragging":false},{"id":"filter__12","type":"workflowNode","position":{"x":-930.1850633096815,"y":584.0164318302068},"data":{"nodeType":"filter","id":"filter__12"},"measured":{"width":320,"height":97},"selected":false,"dragging":false},{"id":"filter__13","type":"workflowNode","position":{"x":-414.1850633096815,"y":390.0164318302068},"data":{"nodeType":"filter","id":"filter__13"},"measured":{"width":320,"height":97},"selected":false,"dragging":false},{"id":"aggregate__14","type":"workflowNode","position":{"x":-422.1850633096815,"y":718.0164318302068},"data":{"nodeType":"aggregate","id":"aggregate__14"},"measured":{"width":320,"height":120},"selected":false,"dragging":false},{"id":"data-writer__15","type":"workflowNode","position":{"x":183.81493669031852,"y":716.0164318302068},"data":{"nodeType":"data-writer","id":"data-writer__15"},"measured":{"width":320,"height":142},"selected":false,"dragging":false},{"id":"aggregate__16","type":"workflowNode","position":{"x":1415.8149366903185,"y":360.0164318302068},"data":{"nodeType":"aggregate","id":"aggregate__16"},"measured":{"width":320,"height":120},"selected":false,"dragging":false},{"id":"code-transform__17","type":"workflowNode","position":{"x":2037.8149366903185,"y":-109.98356816979322},"data":{"nodeType":"code-transform","id":"code-transform__17"},"measured":{"width":320,"height":142},"selected":false,"dragging":false},{"id":"data-writer__18","type":"workflowNode","position":{"x":2617.8149366903185,"y":-109.98356816979322},"data":{"nodeType":"data-writer","id":"data-writer__18"},"measured":{"width":320,"height":142},"selected":false,"dragging":false}],"nodesData":{"data-reader__1":{"isDefault":false,"iceberg_catalog":"dremio","typeLabel":"Spark","name":"success_payments","type":"SparkIcebergReader","table_name":"payments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{}},"data-mapper__0":{"name":"success_payments_mapper","type":"DataMapping","fromDataReader":"success_payments","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"amount","valueExpression":"amount"},{"fieldName":"gateway","valueExpression":"gateway"},{"fieldName":"payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756374444833","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756374453565","newFieldName":"amount","mappingType":"sourceColumn","value":"amount"},{"id":"mapping-1756374468084","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"},{"id":"mapping-1756374477932","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__1":{"name":"final_success_payments","type":"Filter","datasource":"success_payments_mapper","condition":" payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \\'CCS\\'","isDefault":false},"filter__3":{"name":"high_valued_payments_filter","type":"Filter","datasource":"final_success_payments","condition":"amount >= 500","isDefault":false},"aggregate__7":{"name":"total_payments_and_total_value_processed","type":"SQLAggregation","datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_payments","aggregationFunction":"COUNT(*)"},{"fieldName":"total_value_processed","aggregationFunction":"SUM(amount)"}],"isDefault":false},"aggregate__4":{"name":"aggregate__4","type":"SQLAggregation","datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date, payment_method"},"selectFunctions":[{"fieldName":"method_count","aggregationFunction":"COUNT(*)"}],"isDefault":false},"data-mapper__5":{"name":"data_mapper__5","type":"DataMapping","fromDataReader":"aggregate__4","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"method_count","valueExpression":"method_count"},{"fieldName":"rank_method","valueExpression":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375393396","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375408077","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756375415696","newFieldName":"method_count","mappingType":"sourceColumn","value":"method_count"},{"id":"mapping-1756375432743","newFieldName":"rank_method","mappingType":"expression","value":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__6":{"name":"filter__6","type":"Filter","datasource":"data_mapper__5","condition":"rank_method = 1","isDefault":false},"data-mapper__8":{"name":"most_used_payment_method__","type":"DataMapping","fromDataReader":"filter__6","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"most_used_payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375574506","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375594556","newFieldName":"most_used_payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"aggregate__9":{"name":"high_valued_payments","type":"SQLAggregation","datasource":"high_valued_payments_filter","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"high_valued_payments","aggregationFunction":"COUNT(*)"}],"isDefault":false}}},"blocks":[{"name":"success_payments","type":"SparkIcebergReader","options":{},"isDefault":false,"iceberg_catalog":"dremio","typeLabel":"Spark","table_name":"payments","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"}},{"name":"success_payments_mapper","type":"DataMapping","options":{},"fromDataReader":"success_payments","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"DATE(payment_date)"},{"fieldName":"amount","valueExpression":"amount"},{"fieldName":"gateway","valueExpression":"gateway"},{"fieldName":"payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756374444833","newFieldName":"payment_date","mappingType":"expression","value":"DATE(payment_date)"},{"id":"mapping-1756374453565","newFieldName":"amount","mappingType":"sourceColumn","value":"amount"},{"id":"mapping-1756374468084","newFieldName":"gateway","mappingType":"sourceColumn","value":"gateway"},{"id":"mapping-1756374477932","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"final_success_payments","type":"Filter","options":{},"datasource":"success_payments_mapper","condition":" payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \\'CCS\\'","isDefault":false},{"name":"high_valued_payments_filter","type":"Filter","options":{},"datasource":"final_success_payments","condition":"amount >= 500","isDefault":false},{"name":"total_payments_and_total_value_processed","type":"SQLAggregation","options":{},"datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"total_payments","aggregationFunction":"COUNT(*)"},{"fieldName":"total_value_processed","aggregationFunction":"SUM(amount)"}],"isDefault":false},{"name":"aggregate__4","type":"SQLAggregation","options":{},"datasource":"final_success_payments","groupByParams":{"group_expression":"payment_date, payment_method"},"selectFunctions":[{"fieldName":"method_count","aggregationFunction":"COUNT(*)"}],"isDefault":false},{"name":"data_mapper__5","type":"DataMapping","options":{},"fromDataReader":"aggregate__4","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"payment_method","valueExpression":"payment_method"},{"fieldName":"method_count","valueExpression":"method_count"},{"fieldName":"rank_method","valueExpression":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375393396","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375408077","newFieldName":"payment_method","mappingType":"sourceColumn","value":"payment_method"},{"id":"mapping-1756375415696","newFieldName":"method_count","mappingType":"sourceColumn","value":"method_count"},{"id":"mapping-1756375432743","newFieldName":"rank_method","mappingType":"expression","value":"RANK() OVER (PARTITION BY payment_date ORDER BY method_count)"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"filter__6","type":"Filter","options":{},"datasource":"data_mapper__5","condition":"rank_method = 1","isDefault":false},{"name":"most_used_payment_method__","type":"DataMapping","options":{},"fromDataReader":"filter__6","includeExistingColumns":false,"toSchema":[{"fieldName":"payment_date","valueExpression":"payment_date"},{"fieldName":"most_used_payment_method","valueExpression":"payment_method"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756375574506","newFieldName":"payment_date","mappingType":"sourceColumn","value":"payment_date"},{"id":"mapping-1756375594556","newFieldName":"most_used_payment_method","mappingType":"sourceColumn","value":"payment_method"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"high_valued_payments","type":"SQLAggregation","options":{},"datasource":"high_valued_payments_filter","groupByParams":{"group_expression":"payment_date"},"selectFunctions":[{"fieldName":"high_valued_payments","aggregationFunction":"COUNT(*)"}],"isDefault":false}]}} \ No newline at end of file