Workflow saved

This commit is contained in:
unknown
2025-08-29 08:56:42 +00:00
parent d2dadaa5f6
commit 0dcde9e5cb
3 changed files with 88 additions and 1 deletions

View File

@@ -159,6 +159,48 @@ payment_api_df.createOrReplaceTempView('payment_api_df')
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
_failed_payments_update_mapper_select_clause.append("id AS id")
_failed_payments_update_mapper_select_clause.append("retry_attempt_count + 1 AS retry_attempt_count")
_failed_payments_update_mapper_select_clause.append("TRUE AS batch_processed")
_failed_payments_update_mapper_select_clause.append("{job_id} AS batch_job_id")
_failed_payments_update_mapper_select_clause.append("CASE WHEN response_body.success = \'true\' THEN \'success\' WHEN retry_attempt_count >= 3 THEN \'permanently_failed\' ELSE \'failed\' END AS retry_status")
_failed_payments_update_mapper_select_clause.append("account_id AS account_id")
_failed_payments_update_mapper_select_clause.append("failure_reason AS failure_reason")
_failed_payments_update_mapper_select_clause.append("failure_timestamp AS failure_timestamp")
_failed_payments_update_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
_failed_payments_update_mapper_select_clause.append("created_at AS created_at")
_failed_payments_update_mapper_select_clause.append("payment_id AS payment_id")
_failed_payments_update_mapper_select_clause.append("source_of_failure AS source_of_failure")
_failed_payments_update_mapper_select_clause.append("currency AS currency")
_failed_payments_update_mapper_select_clause.append("gateway AS gateway")
_failed_payments_update_mapper_select_clause.append("payment_method AS payment_method")
_failed_payments_update_mapper_select_clause.append("payment_date AS payment_date")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS last_retry_timestamp")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")
# %%
print(payment_api_df.columns)
success_payment_filter_df = spark.sql("select * from payment_api_df where response_body.success=\'true\'")
success_payment_filter_df.createOrReplaceTempView('success_payment_filter_df')

View File

@@ -182,11 +182,56 @@ def failed_payments_update_mapper(job_id, payment_api_df, spark):
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
_failed_payments_update_mapper_select_clause.append("id AS id")
_failed_payments_update_mapper_select_clause.append("retry_attempt_count + 1 AS retry_attempt_count")
_failed_payments_update_mapper_select_clause.append("TRUE AS batch_processed")
_failed_payments_update_mapper_select_clause.append("{job_id} AS batch_job_id")
_failed_payments_update_mapper_select_clause.append("CASE WHEN response_body.success = \'true\' THEN \'success\' WHEN retry_attempt_count >= 3 THEN \'permanently_failed\' ELSE \'failed\' END AS retry_status")
_failed_payments_update_mapper_select_clause.append("account_id AS account_id")
_failed_payments_update_mapper_select_clause.append("failure_reason AS failure_reason")
_failed_payments_update_mapper_select_clause.append("failure_timestamp AS failure_timestamp")
_failed_payments_update_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
_failed_payments_update_mapper_select_clause.append("created_at AS created_at")
_failed_payments_update_mapper_select_clause.append("payment_id AS payment_id")
_failed_payments_update_mapper_select_clause.append("source_of_failure AS source_of_failure")
_failed_payments_update_mapper_select_clause.append("currency AS currency")
_failed_payments_update_mapper_select_clause.append("gateway AS gateway")
_failed_payments_update_mapper_select_clause.append("payment_method AS payment_method")
_failed_payments_update_mapper_select_clause.append("payment_date AS payment_date")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS last_retry_timestamp")
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")
return
@app.cell
def success_payment_filter(payment_api_df, spark):
print(payment_api_df.columns)
success_payment_filter_df = spark.sql("select * from payment_api_df where response_body.success=\'true\'")
success_payment_filter_df.createOrReplaceTempView('success_payment_filter_df')
return
if __name__ == "__main__":
app.run()

File diff suppressed because one or more lines are too long