Workflow saved
This commit is contained in:
@@ -71,12 +71,12 @@ bootstrap_udfs(spark)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
success_payments_df = spark.read.table('dremio.payments')
|
success_payments_reader_df = spark.read.table('dremio.payments')
|
||||||
success_payments_df.createOrReplaceTempView('success_payments_df')
|
success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df')
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
|
|
||||||
_success_payments_mapper_select_clause=success_payments_df.columns if False else []
|
_success_payments_mapper_select_clause=success_payments_reader_df.columns if False else []
|
||||||
|
|
||||||
_success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
|
_success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
|
||||||
|
|
||||||
@@ -87,7 +87,7 @@ _success_payments_mapper_select_clause.append("gateway AS gateway")
|
|||||||
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
|
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
|
||||||
|
|
||||||
|
|
||||||
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_df").replace("{job_id}",f"'{job_id}'"))
|
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
|
||||||
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
|
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
|
||||||
|
|
||||||
# %%
|
# %%
|
||||||
@@ -273,3 +273,182 @@ high_valued_payments_df.createOrReplaceTempView('high_valued_payments_df')
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
|
||||||
|
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else []
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause.append("payment_method AS payment_method")
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause.append("failure_reason AS failure_reason")
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause.append("gateway AS gateway")
|
||||||
|
|
||||||
|
|
||||||
|
failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
|
||||||
|
failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df")
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
print(failed_payments_mapper_df.columns)
|
||||||
|
final_failed_payments_df = spark.sql("select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))")
|
||||||
|
final_failed_payments_df.createOrReplaceTempView('final_failed_payments_df')
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
print(final_failed_payments_df.columns)
|
||||||
|
filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'")
|
||||||
|
filter__13_df.createOrReplaceTempView('filter__13_df')
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_params = {
|
||||||
|
"datasource": "filter__13",
|
||||||
|
"selectFunctions" : [{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}]
|
||||||
|
}
|
||||||
|
|
||||||
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( filter__13_df,
|
||||||
|
group_expression="payment_date",
|
||||||
|
cube="",
|
||||||
|
rollup="",
|
||||||
|
grouping_set="",
|
||||||
|
select_functions=[{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}]
|
||||||
|
)
|
||||||
|
|
||||||
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
||||||
|
for f in _rewritten_selects
|
||||||
|
]
|
||||||
|
|
||||||
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
||||||
|
|
||||||
|
_partials = []
|
||||||
|
for _gs in _grouping_specs:
|
||||||
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
||||||
|
for _col in _all_group_cols:
|
||||||
|
if _col not in _gs:
|
||||||
|
_gdf = _gdf.withColumn(_col, lit(None))
|
||||||
|
_partials.append(_gdf)
|
||||||
|
|
||||||
|
|
||||||
|
total_failed_payments___df = reduce(lambda a, b: a.unionByName(b), _partials)
|
||||||
|
|
||||||
|
total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_params = {
|
||||||
|
"datasource": "final_failed_payments",
|
||||||
|
"selectFunctions" : [{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}]
|
||||||
|
}
|
||||||
|
|
||||||
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_failed_payments_df,
|
||||||
|
group_expression="payment_date, gateway, failure_reason",
|
||||||
|
cube="",
|
||||||
|
rollup="",
|
||||||
|
grouping_set="",
|
||||||
|
select_functions=[{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}]
|
||||||
|
)
|
||||||
|
|
||||||
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
||||||
|
for f in _rewritten_selects
|
||||||
|
]
|
||||||
|
|
||||||
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
||||||
|
|
||||||
|
_partials = []
|
||||||
|
for _gs in _grouping_specs:
|
||||||
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
||||||
|
for _col in _all_group_cols:
|
||||||
|
if _col not in _gs:
|
||||||
|
_gdf = _gdf.withColumn(_col, lit(None))
|
||||||
|
_partials.append(_gdf)
|
||||||
|
|
||||||
|
|
||||||
|
failed_payment_metrics_df = reduce(lambda a, b: a.unionByName(b), _partials)
|
||||||
|
|
||||||
|
failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_data_writer__15_fields_to_update = failed_payment_metrics_df.columns
|
||||||
|
_data_writer__15_set_clause=[]
|
||||||
|
_data_writer__15_unique_key_clause= []
|
||||||
|
|
||||||
|
for _key in ['payment_date', 'gateway', 'failure_reason']:
|
||||||
|
_data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}')
|
||||||
|
|
||||||
|
for _field in _data_writer__15_fields_to_update:
|
||||||
|
if(_field not in _data_writer__15_unique_key_clause):
|
||||||
|
_data_writer__15_set_clause.append(f't.{_field} = s.{_field}')
|
||||||
|
|
||||||
|
_merge_query = '''
|
||||||
|
MERGE INTO dremio.failedpaymentmetrics t
|
||||||
|
USING failed_payment_metrics_df s
|
||||||
|
ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN
|
||||||
|
UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
||||||
|
|
||||||
|
spark.sql(_merge_query)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# %%
|
||||||
|
|
||||||
|
print(total_payments_and_total_value_processed_df.columns)
|
||||||
|
print(most_used_payment_method___df.columns)
|
||||||
|
print(high_valued_payments_df.columns)
|
||||||
|
print(total_failed_payments___df.columns)
|
||||||
|
|
||||||
|
success_payment_metrics_df = spark.sql("""
|
||||||
|
SELECT
|
||||||
|
COALESCE(a.payment_date, d.payment_date) AS payment_date,
|
||||||
|
a.total_payments,
|
||||||
|
a.total_value_processed,
|
||||||
|
b.most_used_payment_method,
|
||||||
|
c.high_valued_payments,
|
||||||
|
d.total_failed_payments
|
||||||
|
FROM total_failed_payments___df d
|
||||||
|
FULL OUTER JOIN total_payments_and_total_value_processed_df a
|
||||||
|
ON a.payment_date = d.payment_date
|
||||||
|
LEFT JOIN most_used_payment_method___df b
|
||||||
|
ON a.payment_date = b.payment_date
|
||||||
|
LEFT JOIN high_valued_payments_df c
|
||||||
|
ON a.payment_date = c.payment_date
|
||||||
|
""")
|
||||||
|
|
||||||
|
success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df')
|
||||||
|
|||||||
@@ -74,19 +74,19 @@ def init():
|
|||||||
|
|
||||||
|
|
||||||
@app.cell
|
@app.cell
|
||||||
def success_payments(spark):
|
def success_payments_reader(spark):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
success_payments_df = spark.read.table('dremio.payments')
|
success_payments_reader_df = spark.read.table('dremio.payments')
|
||||||
success_payments_df.createOrReplaceTempView('success_payments_df')
|
success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df')
|
||||||
return (success_payments_df,)
|
return (success_payments_reader_df,)
|
||||||
|
|
||||||
|
|
||||||
@app.cell
|
@app.cell
|
||||||
def success_payments_mapper(job_id, spark, success_payments_df):
|
def success_payments_mapper(job_id, spark, success_payments_reader_df):
|
||||||
|
|
||||||
_success_payments_mapper_select_clause=success_payments_df.columns if False else []
|
_success_payments_mapper_select_clause=success_payments_reader_df.columns if False else []
|
||||||
|
|
||||||
_success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
|
_success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
|
||||||
|
|
||||||
@@ -97,7 +97,7 @@ def success_payments_mapper(job_id, spark, success_payments_df):
|
|||||||
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
|
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
|
||||||
|
|
||||||
|
|
||||||
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_df").replace("{job_id}",f"'{job_id}'"))
|
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
|
||||||
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
|
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
|
||||||
return (success_payments_mapper_df,)
|
return (success_payments_mapper_df,)
|
||||||
|
|
||||||
@@ -172,7 +172,7 @@ def total_payments_and_total_value_processed(
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
return
|
return (total_payments_and_total_value_processed_df,)
|
||||||
|
|
||||||
|
|
||||||
@app.cell
|
@app.cell
|
||||||
@@ -270,7 +270,7 @@ def most_used_payment_method__(filter__6_df, job_id, spark):
|
|||||||
|
|
||||||
most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'"))
|
most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'"))
|
||||||
most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df")
|
most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df")
|
||||||
return
|
return (most_used_payment_method___df,)
|
||||||
|
|
||||||
|
|
||||||
@app.cell
|
@app.cell
|
||||||
@@ -325,6 +325,227 @@ def high_valued_payments(
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return (high_valued_payments_df,)
|
||||||
|
|
||||||
|
|
||||||
|
@app.cell
|
||||||
|
def failed_payments_reader(spark):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
|
||||||
|
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
|
||||||
|
return (failed_payments_reader_df,)
|
||||||
|
|
||||||
|
|
||||||
|
@app.cell
|
||||||
|
def failed_payments_mapper(failed_payments_reader_df, job_id, spark):
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else []
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date")
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause.append("payment_method AS payment_method")
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause.append("failure_reason AS failure_reason")
|
||||||
|
|
||||||
|
_failed_payments_mapper_select_clause.append("gateway AS gateway")
|
||||||
|
|
||||||
|
|
||||||
|
failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
|
||||||
|
failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df")
|
||||||
|
return (failed_payments_mapper_df,)
|
||||||
|
|
||||||
|
|
||||||
|
@app.cell
|
||||||
|
def final_failed_payments(failed_payments_mapper_df, spark):
|
||||||
|
|
||||||
|
print(failed_payments_mapper_df.columns)
|
||||||
|
final_failed_payments_df = spark.sql("select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))")
|
||||||
|
final_failed_payments_df.createOrReplaceTempView('final_failed_payments_df')
|
||||||
|
return (final_failed_payments_df,)
|
||||||
|
|
||||||
|
|
||||||
|
@app.cell
|
||||||
|
def filter__13(final_failed_payments_df, spark):
|
||||||
|
|
||||||
|
print(final_failed_payments_df.columns)
|
||||||
|
filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'")
|
||||||
|
filter__13_df.createOrReplaceTempView('filter__13_df')
|
||||||
|
return (filter__13_df,)
|
||||||
|
|
||||||
|
|
||||||
|
@app.cell
|
||||||
|
def total_failed_payments__(
|
||||||
|
expr,
|
||||||
|
filter__13_df,
|
||||||
|
lit,
|
||||||
|
preprocess_then_expand,
|
||||||
|
reduce,
|
||||||
|
):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_params = {
|
||||||
|
"datasource": "filter__13",
|
||||||
|
"selectFunctions" : [{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}]
|
||||||
|
}
|
||||||
|
|
||||||
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( filter__13_df,
|
||||||
|
group_expression="payment_date",
|
||||||
|
cube="",
|
||||||
|
rollup="",
|
||||||
|
grouping_set="",
|
||||||
|
select_functions=[{'fieldName': 'total_failed_payments', 'aggregationFunction': 'COUNT(*)'}]
|
||||||
|
)
|
||||||
|
|
||||||
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
||||||
|
for f in _rewritten_selects
|
||||||
|
]
|
||||||
|
|
||||||
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
||||||
|
|
||||||
|
_partials = []
|
||||||
|
for _gs in _grouping_specs:
|
||||||
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
||||||
|
for _col in _all_group_cols:
|
||||||
|
if _col not in _gs:
|
||||||
|
_gdf = _gdf.withColumn(_col, lit(None))
|
||||||
|
_partials.append(_gdf)
|
||||||
|
|
||||||
|
|
||||||
|
total_failed_payments___df = reduce(lambda a, b: a.unionByName(b), _partials)
|
||||||
|
|
||||||
|
total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return (total_failed_payments___df,)
|
||||||
|
|
||||||
|
|
||||||
|
@app.cell
|
||||||
|
def failed_payment_metrics(
|
||||||
|
expr,
|
||||||
|
final_failed_payments_df,
|
||||||
|
lit,
|
||||||
|
preprocess_then_expand,
|
||||||
|
reduce,
|
||||||
|
):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_params = {
|
||||||
|
"datasource": "final_failed_payments",
|
||||||
|
"selectFunctions" : [{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}]
|
||||||
|
}
|
||||||
|
|
||||||
|
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_failed_payments_df,
|
||||||
|
group_expression="payment_date, gateway, failure_reason",
|
||||||
|
cube="",
|
||||||
|
rollup="",
|
||||||
|
grouping_set="",
|
||||||
|
select_functions=[{'fieldName': 'failure_count', 'aggregationFunction': 'COUNT(*)'}]
|
||||||
|
)
|
||||||
|
|
||||||
|
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
||||||
|
for f in _rewritten_selects
|
||||||
|
]
|
||||||
|
|
||||||
|
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
||||||
|
|
||||||
|
_partials = []
|
||||||
|
for _gs in _grouping_specs:
|
||||||
|
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
||||||
|
for _col in _all_group_cols:
|
||||||
|
if _col not in _gs:
|
||||||
|
_gdf = _gdf.withColumn(_col, lit(None))
|
||||||
|
_partials.append(_gdf)
|
||||||
|
|
||||||
|
|
||||||
|
failed_payment_metrics_df = reduce(lambda a, b: a.unionByName(b), _partials)
|
||||||
|
|
||||||
|
failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return (failed_payment_metrics_df,)
|
||||||
|
|
||||||
|
|
||||||
|
@app.cell
|
||||||
|
def data_writer__15(failed_payment_metrics_df, spark):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_data_writer__15_fields_to_update = failed_payment_metrics_df.columns
|
||||||
|
_data_writer__15_set_clause=[]
|
||||||
|
_data_writer__15_unique_key_clause= []
|
||||||
|
|
||||||
|
for _key in ['payment_date', 'gateway', 'failure_reason']:
|
||||||
|
_data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}')
|
||||||
|
|
||||||
|
for _field in _data_writer__15_fields_to_update:
|
||||||
|
if(_field not in _data_writer__15_unique_key_clause):
|
||||||
|
_data_writer__15_set_clause.append(f't.{_field} = s.{_field}')
|
||||||
|
|
||||||
|
_merge_query = '''
|
||||||
|
MERGE INTO dremio.failedpaymentmetrics t
|
||||||
|
USING failed_payment_metrics_df s
|
||||||
|
ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN
|
||||||
|
UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
||||||
|
|
||||||
|
spark.sql(_merge_query)
|
||||||
|
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@app.cell
|
||||||
|
def success_payment_metrics(
|
||||||
|
high_valued_payments_df,
|
||||||
|
most_used_payment_method___df,
|
||||||
|
spark,
|
||||||
|
total_failed_payments___df,
|
||||||
|
total_payments_and_total_value_processed_df,
|
||||||
|
):
|
||||||
|
|
||||||
|
print(total_payments_and_total_value_processed_df.columns)
|
||||||
|
print(most_used_payment_method___df.columns)
|
||||||
|
print(high_valued_payments_df.columns)
|
||||||
|
print(total_failed_payments___df.columns)
|
||||||
|
|
||||||
|
success_payment_metrics_df = spark.sql("""
|
||||||
|
SELECT
|
||||||
|
COALESCE(a.payment_date, d.payment_date) AS payment_date,
|
||||||
|
a.total_payments,
|
||||||
|
a.total_value_processed,
|
||||||
|
b.most_used_payment_method,
|
||||||
|
c.high_valued_payments,
|
||||||
|
d.total_failed_payments
|
||||||
|
FROM total_failed_payments___df d
|
||||||
|
FULL OUTER JOIN total_payments_and_total_value_processed_df a
|
||||||
|
ON a.payment_date = d.payment_date
|
||||||
|
LEFT JOIN most_used_payment_method___df b
|
||||||
|
ON a.payment_date = b.payment_date
|
||||||
|
LEFT JOIN high_valued_payments_df c
|
||||||
|
ON a.payment_date = c.payment_date
|
||||||
|
""")
|
||||||
|
|
||||||
|
success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df')
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user