Compare commits
22 Commits
Default
...
exp360cust
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bf935f3f53 | ||
|
|
681045c4f0 | ||
|
|
331f72d1a7 | ||
|
|
c928074d4e | ||
|
|
2f13af0751 | ||
|
|
2e42932fab | ||
|
|
1b840c8730 | ||
|
|
b255836d13 | ||
|
|
ab9c61fddd | ||
|
|
0dcde9e5cb | ||
|
|
d2dadaa5f6 | ||
|
|
0027602621 | ||
|
|
c5a5dfb7cc | ||
|
|
6722686188 | ||
|
|
0a844e08a1 | ||
|
|
4554a42d83 | ||
| 172ea4483b | |||
|
|
588386cab7 | ||
|
|
67e81d79c6 | ||
| 526a3f705b | |||
|
|
c7658869d6 | ||
| fdc83c60ab |
268
failed_payments_retry/main.py
Normal file
268
failed_payments_retry/main.py
Normal file
@@ -0,0 +1,268 @@
|
||||
|
||||
__generated_with = "0.13.15"
|
||||
|
||||
# %%
|
||||
|
||||
import sys
|
||||
sys.path.append('/opt/spark/work-dir/')
|
||||
from workflow_templates.spark.udf_manager import bootstrap_udfs
|
||||
from pyspark.sql.functions import udf
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql.types import StringType, IntegerType
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from pyspark import SparkConf, Row
|
||||
from pyspark.sql import SparkSession
|
||||
import os
|
||||
import pandas as pd
|
||||
import polars as pl
|
||||
import pyarrow as pa
|
||||
from pyspark.sql.functions import expr,to_json,col,struct
|
||||
from functools import reduce
|
||||
from handle_structs_or_arrays import preprocess_then_expand
|
||||
import requests
|
||||
from jinja2 import Template
|
||||
import json
|
||||
|
||||
|
||||
from secrets_manager import SecretsManager
|
||||
|
||||
from WorkflowManager import WorkflowDSL, WorkflowManager
|
||||
from KnowledgebaseManager import KnowledgebaseManager
|
||||
from gitea_client import GiteaClient, WorkspaceVersionedContent
|
||||
|
||||
from dremio.flight.endpoint import DremioFlightEndpoint
|
||||
from dremio.flight.query import DremioFlightEndpointQuery
|
||||
|
||||
|
||||
alias_str='abcdefghijklmnopqrstuvwxyz'
|
||||
workspace = os.getenv('WORKSPACE') or 'exp360cust'
|
||||
|
||||
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
|
||||
|
||||
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
|
||||
secrets = sm.list_secrets(workspace)
|
||||
|
||||
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
|
||||
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
|
||||
conf = SparkConf()
|
||||
params = {
|
||||
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
|
||||
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
|
||||
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
|
||||
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
|
||||
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
|
||||
"spark.sql.catalog.dremio.type" : "hadoop",
|
||||
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
|
||||
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
||||
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
|
||||
}
|
||||
|
||||
|
||||
|
||||
conf.setAll(list(params.items()))
|
||||
|
||||
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
|
||||
bootstrap_udfs(spark)
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
|
||||
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
|
||||
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
|
||||
|
||||
# %%
|
||||
|
||||
print(failed_payments_reader_df.columns)
|
||||
failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\')")
|
||||
failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df')
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
|
||||
|
||||
_payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish'
|
||||
|
||||
_payment_api_headers = dict()
|
||||
|
||||
for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() :
|
||||
|
||||
if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''):
|
||||
_payment_api_headers[_payment_api_k] = _payment_api_v.get('value')
|
||||
elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''):
|
||||
_payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret'))
|
||||
|
||||
|
||||
|
||||
def _payment_api_call_api_udf(row):
|
||||
body_dict = row.asDict(recursive=True)
|
||||
template = Template('''{
|
||||
"msgData": {
|
||||
"canDistribute": true,
|
||||
"payment": [
|
||||
{
|
||||
"accountId3": "{{account_id}}",
|
||||
"currency2": "{{currency}}",
|
||||
"paymentAmount2": "{{amount}}"
|
||||
}
|
||||
],
|
||||
"paymentDate": "{{payment_date[:10]}}",
|
||||
"paymentTender": [
|
||||
{
|
||||
"currency3": "{{currency}}",
|
||||
"name": "NAME",
|
||||
"payorAccountId2": "{{account_id}}",
|
||||
"tenderAmount": "{{amount}}",
|
||||
"tenderType2": "CASH"
|
||||
}
|
||||
],
|
||||
"payorAccountId": "{{account_id}}",
|
||||
"shouldAllPaymentsFreeze": true,
|
||||
"tndrSrceCd": "CASH-01",
|
||||
"user": ""
|
||||
},
|
||||
"outMsgConfigCode": "EXP_CREATE_PAYMENT"
|
||||
}''')
|
||||
|
||||
body = json.loads(template.render(**body_dict))
|
||||
print("request : "+ json.dumps(body))
|
||||
|
||||
|
||||
response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={})
|
||||
|
||||
|
||||
data = response.json()
|
||||
print("response : " + json.dumps(data))
|
||||
|
||||
merged = {**body_dict, "response_body": data}
|
||||
return json.dumps(merged,default=str)
|
||||
|
||||
_payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist()
|
||||
payment_api_df = spark.read.json(_payment_api_df_rdd)
|
||||
if not "response_body" in payment_api_df.columns:
|
||||
raise Exception("Dataframe is empty")
|
||||
payment_api_df.createOrReplaceTempView('payment_api_df')
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# %%
|
||||
|
||||
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("id AS id")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("retry_attempt_count + 1 AS retry_attempt_count")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("TRUE AS batch_processed")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("{job_id} AS batch_job_id")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("CASE WHEN response_body.success = \'true\' THEN \'success\' WHEN retry_attempt_count >= 3 THEN \'permanently_failed\' ELSE \'failed\' END AS retry_status")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("account_id AS account_id")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("failure_reason AS failure_reason")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("failure_timestamp AS failure_timestamp")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("created_at AS created_at")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("payment_id AS payment_id")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("source_of_failure AS source_of_failure")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("currency AS currency")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("gateway AS gateway")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("payment_method AS payment_method")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("payment_date AS payment_date")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS last_retry_timestamp")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
|
||||
|
||||
|
||||
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
|
||||
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")
|
||||
|
||||
# %%
|
||||
|
||||
print(payment_api_df.columns)
|
||||
success_payment_filter_df = spark.sql("select * from payment_api_df where response_body.success=\'true\'")
|
||||
success_payment_filter_df.createOrReplaceTempView('success_payment_filter_df')
|
||||
|
||||
# %%
|
||||
|
||||
_success_payments_mapper_select_clause=success_payment_filter_df.columns if False else []
|
||||
|
||||
_success_payments_mapper_select_clause.append("uuid() AS id")
|
||||
|
||||
_success_payments_mapper_select_clause.append("account_id AS account_id")
|
||||
|
||||
_success_payments_mapper_select_clause.append("currency AS currency")
|
||||
|
||||
_success_payments_mapper_select_clause.append("payment_id AS payment_id")
|
||||
|
||||
_success_payments_mapper_select_clause.append("gateway AS gateway")
|
||||
|
||||
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
|
||||
|
||||
_success_payments_mapper_select_clause.append("payment_date AS payment_date")
|
||||
|
||||
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS created_at")
|
||||
|
||||
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
|
||||
|
||||
_success_payments_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
|
||||
|
||||
|
||||
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payment_filter_df").replace("{job_id}",f"'{job_id}'"))
|
||||
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
|
||||
|
||||
_failed_payments_update_writer_fields_to_update = failed_payments_update_mapper_df.columns
|
||||
_failed_payments_update_writer_set_clause=[]
|
||||
_failed_payments_update_writer_unique_key_clause= []
|
||||
|
||||
for _key in ['payment_id']:
|
||||
_failed_payments_update_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
|
||||
|
||||
for _field in _failed_payments_update_writer_fields_to_update:
|
||||
if(_field not in _failed_payments_update_writer_unique_key_clause):
|
||||
_failed_payments_update_writer_set_clause.append(f't.{_field} = s.{_field}')
|
||||
|
||||
_merge_query = '''
|
||||
MERGE INTO dremio.failedpayments t
|
||||
USING failed_payments_update_mapper_df s
|
||||
ON ''' + ' AND '.join(_failed_payments_update_writer_unique_key_clause) + ''' WHEN MATCHED THEN
|
||||
UPDATE SET ''' + ', '.join(_failed_payments_update_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
||||
|
||||
spark.sql(_merge_query)
|
||||
|
||||
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
|
||||
|
||||
success_payments_mapper_df.write.mode('append').saveAsTable('dremio.payments')
|
||||
|
||||
308
failed_payments_retry/main.py.notebook
Normal file
308
failed_payments_retry/main.py.notebook
Normal file
@@ -0,0 +1,308 @@
|
||||
import marimo
|
||||
|
||||
__generated_with = "0.13.15"
|
||||
app = marimo.App()
|
||||
|
||||
|
||||
@app.cell
|
||||
def init():
|
||||
|
||||
import sys
|
||||
sys.path.append('/opt/spark/work-dir/')
|
||||
from workflow_templates.spark.udf_manager import bootstrap_udfs
|
||||
from pyspark.sql.functions import udf
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql.types import StringType, IntegerType
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from pyspark import SparkConf, Row
|
||||
from pyspark.sql import SparkSession
|
||||
import os
|
||||
import pandas as pd
|
||||
import polars as pl
|
||||
import pyarrow as pa
|
||||
from pyspark.sql.functions import expr,to_json,col,struct
|
||||
from functools import reduce
|
||||
from handle_structs_or_arrays import preprocess_then_expand
|
||||
import requests
|
||||
from jinja2 import Template
|
||||
import json
|
||||
|
||||
|
||||
from secrets_manager import SecretsManager
|
||||
|
||||
from WorkflowManager import WorkflowDSL, WorkflowManager
|
||||
from KnowledgebaseManager import KnowledgebaseManager
|
||||
from gitea_client import GiteaClient, WorkspaceVersionedContent
|
||||
|
||||
from dremio.flight.endpoint import DremioFlightEndpoint
|
||||
from dremio.flight.query import DremioFlightEndpointQuery
|
||||
|
||||
|
||||
alias_str='abcdefghijklmnopqrstuvwxyz'
|
||||
workspace = os.getenv('WORKSPACE') or 'exp360cust'
|
||||
|
||||
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
|
||||
|
||||
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
|
||||
secrets = sm.list_secrets(workspace)
|
||||
|
||||
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
|
||||
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
|
||||
conf = SparkConf()
|
||||
params = {
|
||||
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
|
||||
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
|
||||
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
|
||||
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
|
||||
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
|
||||
"spark.sql.catalog.dremio.type" : "hadoop",
|
||||
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
|
||||
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
||||
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
|
||||
}
|
||||
|
||||
|
||||
|
||||
conf.setAll(list(params.items()))
|
||||
|
||||
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
|
||||
bootstrap_udfs(spark)
|
||||
return Template, job_id, json, requests, secrets, spark
|
||||
|
||||
|
||||
@app.cell
|
||||
def failed_payments_reader(spark):
|
||||
|
||||
|
||||
|
||||
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
|
||||
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
|
||||
return (failed_payments_reader_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def failed_payments_filter(failed_payments_reader_df, spark):
|
||||
|
||||
print(failed_payments_reader_df.columns)
|
||||
failed_payments_filter_df = spark.sql("select * from failed_payments_reader_df where retry_attempt_count < 3 AND gateway = \'CCS\' AND (retry_status = \'new\' OR retry_status = \'failed\')")
|
||||
failed_payments_filter_df.createOrReplaceTempView('failed_payments_filter_df')
|
||||
return (failed_payments_filter_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def payment_api(
|
||||
Template,
|
||||
failed_payments_filter_df,
|
||||
json,
|
||||
requests,
|
||||
secrets,
|
||||
spark,
|
||||
):
|
||||
|
||||
|
||||
|
||||
|
||||
_payment_api_url = 'https://cod.uat.arconecloud.com/fw-notification/outbound-message-config/publish'
|
||||
|
||||
_payment_api_headers = dict()
|
||||
|
||||
for _payment_api_k,_payment_api_v in {'Content-Type': {'value': 'application/json', 'secret': None}, 'api-key': {'value': None, 'secret': 'API_KEY'}}.items() :
|
||||
|
||||
if(_payment_api_v.get('value') is not None and _payment_api_v.get('value') != ''):
|
||||
_payment_api_headers[_payment_api_k] = _payment_api_v.get('value')
|
||||
elif(_payment_api_v.get('secret') is not None and _payment_api_v.get('secret') != ''):
|
||||
_payment_api_headers[_payment_api_k] = secrets.get(_payment_api_v.get('secret'))
|
||||
|
||||
|
||||
|
||||
def _payment_api_call_api_udf(row):
|
||||
body_dict = row.asDict(recursive=True)
|
||||
template = Template('''{
|
||||
"msgData": {
|
||||
"canDistribute": true,
|
||||
"payment": [
|
||||
{
|
||||
"accountId3": "{{account_id}}",
|
||||
"currency2": "{{currency}}",
|
||||
"paymentAmount2": "{{amount}}"
|
||||
}
|
||||
],
|
||||
"paymentDate": "{{payment_date[:10]}}",
|
||||
"paymentTender": [
|
||||
{
|
||||
"currency3": "{{currency}}",
|
||||
"name": "NAME",
|
||||
"payorAccountId2": "{{account_id}}",
|
||||
"tenderAmount": "{{amount}}",
|
||||
"tenderType2": "CASH"
|
||||
}
|
||||
],
|
||||
"payorAccountId": "{{account_id}}",
|
||||
"shouldAllPaymentsFreeze": true,
|
||||
"tndrSrceCd": "CASH-01",
|
||||
"user": ""
|
||||
},
|
||||
"outMsgConfigCode": "EXP_CREATE_PAYMENT"
|
||||
}''')
|
||||
|
||||
body = json.loads(template.render(**body_dict))
|
||||
print("request : "+ json.dumps(body))
|
||||
|
||||
|
||||
response = requests.post(_payment_api_url, headers=_payment_api_headers, json=body, params={})
|
||||
|
||||
|
||||
data = response.json()
|
||||
print("response : " + json.dumps(data))
|
||||
|
||||
merged = {**body_dict, "response_body": data}
|
||||
return json.dumps(merged,default=str)
|
||||
|
||||
_payment_api_df_rdd=failed_payments_filter_df.rdd.map(lambda row: _payment_api_call_api_udf(row)).persist()
|
||||
payment_api_df = spark.read.json(_payment_api_df_rdd)
|
||||
if not "response_body" in payment_api_df.columns:
|
||||
raise Exception("Dataframe is empty")
|
||||
payment_api_df.createOrReplaceTempView('payment_api_df')
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
return (payment_api_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def failed_payments_update_mapper(job_id, payment_api_df, spark):
|
||||
|
||||
_failed_payments_update_mapper_select_clause=payment_api_df.columns if False else []
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("id AS id")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("retry_attempt_count + 1 AS retry_attempt_count")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("TRUE AS batch_processed")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("{job_id} AS batch_job_id")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("CASE WHEN response_body.success = \'true\' THEN \'success\' WHEN retry_attempt_count >= 3 THEN \'permanently_failed\' ELSE \'failed\' END AS retry_status")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("account_id AS account_id")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("failure_reason AS failure_reason")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("failure_timestamp AS failure_timestamp")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("created_at AS created_at")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("payment_id AS payment_id")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("source_of_failure AS source_of_failure")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("currency AS currency")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("gateway AS gateway")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("payment_method AS payment_method")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("payment_date AS payment_date")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS last_retry_timestamp")
|
||||
|
||||
_failed_payments_update_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
|
||||
|
||||
|
||||
failed_payments_update_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_update_mapper_select_clause) + " FROM payment_api_df").replace("{job_id}",f"'{job_id}'"))
|
||||
failed_payments_update_mapper_df.createOrReplaceTempView("failed_payments_update_mapper_df")
|
||||
return (failed_payments_update_mapper_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def success_payment_filter(payment_api_df, spark):
|
||||
|
||||
print(payment_api_df.columns)
|
||||
success_payment_filter_df = spark.sql("select * from payment_api_df where response_body.success=\'true\'")
|
||||
success_payment_filter_df.createOrReplaceTempView('success_payment_filter_df')
|
||||
return (success_payment_filter_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def success_payments_mapper(job_id, spark, success_payment_filter_df):
|
||||
|
||||
_success_payments_mapper_select_clause=success_payment_filter_df.columns if False else []
|
||||
|
||||
_success_payments_mapper_select_clause.append("uuid() AS id")
|
||||
|
||||
_success_payments_mapper_select_clause.append("account_id AS account_id")
|
||||
|
||||
_success_payments_mapper_select_clause.append("currency AS currency")
|
||||
|
||||
_success_payments_mapper_select_clause.append("payment_id AS payment_id")
|
||||
|
||||
_success_payments_mapper_select_clause.append("gateway AS gateway")
|
||||
|
||||
_success_payments_mapper_select_clause.append("payment_method AS payment_method")
|
||||
|
||||
_success_payments_mapper_select_clause.append("payment_date AS payment_date")
|
||||
|
||||
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS created_at")
|
||||
|
||||
_success_payments_mapper_select_clause.append("date_format(current_timestamp(), \"yyyy-MM-dd\'T\'HH:mm:ss.SSS\") AS updated_at")
|
||||
|
||||
_success_payments_mapper_select_clause.append("CAST(amount AS DECIMAL(10,2)) AS amount")
|
||||
|
||||
|
||||
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payment_filter_df").replace("{job_id}",f"'{job_id}'"))
|
||||
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
|
||||
return (success_payments_mapper_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def failed_payments_update_writer(failed_payments_update_mapper_df, spark):
|
||||
|
||||
|
||||
|
||||
|
||||
_failed_payments_update_writer_fields_to_update = failed_payments_update_mapper_df.columns
|
||||
_failed_payments_update_writer_set_clause=[]
|
||||
_failed_payments_update_writer_unique_key_clause= []
|
||||
|
||||
for _key in ['payment_id']:
|
||||
_failed_payments_update_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
|
||||
|
||||
for _field in _failed_payments_update_writer_fields_to_update:
|
||||
if(_field not in _failed_payments_update_writer_unique_key_clause):
|
||||
_failed_payments_update_writer_set_clause.append(f't.{_field} = s.{_field}')
|
||||
|
||||
_merge_query = '''
|
||||
MERGE INTO dremio.failedpayments t
|
||||
USING failed_payments_update_mapper_df s
|
||||
ON ''' + ' AND '.join(_failed_payments_update_writer_unique_key_clause) + ''' WHEN MATCHED THEN
|
||||
UPDATE SET ''' + ', '.join(_failed_payments_update_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
||||
|
||||
spark.sql(_merge_query)
|
||||
|
||||
|
||||
return
|
||||
|
||||
|
||||
@app.cell
|
||||
def success_payments_writer(success_payments_mapper_df):
|
||||
|
||||
|
||||
|
||||
|
||||
success_payments_mapper_df.write.mode('append').saveAsTable('dremio.payments')
|
||||
|
||||
return
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run()
|
||||
1
failed_payments_retry/main.workflow
Normal file
1
failed_payments_retry/main.workflow
Normal file
File diff suppressed because one or more lines are too long
842
payment_metrics/main.py
Normal file
842
payment_metrics/main.py
Normal file
@@ -0,0 +1,842 @@
|
||||
|
||||
__generated_with = "0.13.15"
|
||||
|
||||
# %%
|
||||
|
||||
import sys
|
||||
import time
|
||||
from pyspark.sql.utils import AnalysisException
|
||||
sys.path.append('/opt/spark/work-dir/')
|
||||
from workflow_templates.spark.udf_manager import bootstrap_udfs
|
||||
from util import get_logger, observe_metrics, collect_metrics, log_info, log_error, forgiving_serializer
|
||||
from pyspark.sql.functions import udf
|
||||
from pyspark.sql.functions import count, expr, lit
|
||||
from pyspark.sql.types import StringType, IntegerType
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from pyspark import SparkConf, Row
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql.observation import Observation
|
||||
from pyspark import StorageLevel
|
||||
import os
|
||||
import pandas as pd
|
||||
import polars as pl
|
||||
import pyarrow as pa
|
||||
from pyspark.sql.functions import approx_count_distinct, avg, collect_list, collect_set, corr, count, countDistinct, covar_pop, covar_samp, first, kurtosis, last, max, mean, min, skewness, stddev, stddev_pop, stddev_samp, sum, var_pop, var_samp, variance,expr,to_json,struct, date_format, col, lit, when, regexp_replace, ltrim
|
||||
from functools import reduce
|
||||
from handle_structs_or_arrays import preprocess_then_expand
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
from jinja2 import Template
|
||||
import json
|
||||
|
||||
|
||||
from secrets_manager import SecretsManager
|
||||
|
||||
from WorkflowManager import WorkflowDSL, WorkflowManager
|
||||
from KnowledgebaseManager import KnowledgebaseManager
|
||||
from gitea_client import GiteaClient, WorkspaceVersionedContent
|
||||
from FilesystemManager import FilesystemManager, SupportedFilesystemType
|
||||
from Materialization import Materialization
|
||||
|
||||
from dremio.flight.endpoint import DremioFlightEndpoint
|
||||
from dremio.flight.query import DremioFlightEndpointQuery
|
||||
|
||||
init_start_time=time.time()
|
||||
|
||||
LOGGER = get_logger()
|
||||
alias_str='abcdefghijklmnopqrstuvwxyz'
|
||||
workspace = os.getenv('WORKSPACE') or 'exp360cust'
|
||||
workflow = 'payment_metrics'
|
||||
execution_environment = os.getenv('EXECUTION_ENVIRONMENT') or 'CLUSTER'
|
||||
|
||||
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
|
||||
retry_job_id = os.getenv("RETRY_EXECUTION_ID") or ''
|
||||
|
||||
log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}', Retry Job Id: '{retry_job_id}'")
|
||||
|
||||
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
|
||||
secrets = sm.list_secrets(workspace)
|
||||
|
||||
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
|
||||
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
|
||||
|
||||
filesystemManager = FilesystemManager.create(secrets.get('LAKEHOUSE_BUCKET'), storage_options={'key': secrets.get('S3_ACCESS_KEY'), 'secret': secrets.get('S3_SECRET_KEY'), 'region': secrets.get('S3_REGION')})
|
||||
if retry_job_id:
|
||||
logs = Materialization.get_execution_history_by_job_id(filesystemManager, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, retry_job_id, selected_components=['finalize']).to_dicts()
|
||||
if len(logs) == 1 and logs[0].get('metrics').get('execute_status') == 'SUCCESS':
|
||||
log_info(LOGGER, f"Workspace: '{workspace}', Workflow: '{workflow}', Execution Environment: '{execution_environment}', Job Id: '{job_id}' - Retry Job Id: '{retry_job_id}' was already successful. Hence exiting to forward processing to next in chain.")
|
||||
sys.exit(0)
|
||||
|
||||
_conf = SparkConf()
|
||||
_params = {
|
||||
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
|
||||
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
|
||||
"spark.hadoop.fs.s3a.aws.region": secrets.get("S3_REGION") or "None",
|
||||
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
|
||||
"spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
|
||||
"spark.hadoop.fs.s3.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
|
||||
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
|
||||
"spark.sql.catalog.dremio.type" : "hadoop",
|
||||
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
|
||||
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
||||
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0,ch.cern.sparkmeasure:spark-measure_2.12:0.26"
|
||||
}
|
||||
|
||||
|
||||
|
||||
_conf.setAll(list(_params.items()))
|
||||
|
||||
spark = SparkSession.builder.appName(workspace).config(conf=_conf).getOrCreate()
|
||||
bootstrap_udfs(spark)
|
||||
|
||||
materialization = Materialization(spark, secrets.get('LAKEHOUSE_BUCKET'), workspace, workflow, job_id, retry_job_id, execution_environment, LOGGER)
|
||||
|
||||
init_dependency_key="init"
|
||||
|
||||
|
||||
init_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
success_payments_reader_start_time=time.time()
|
||||
|
||||
success_payments_reader_fail_on_error=""
|
||||
try:
|
||||
success_payments_reader_df = spark.read.table('dremio.payments')
|
||||
success_payments_reader_df, success_payments_reader_observer = observe_metrics("success_payments_reader_df", success_payments_reader_df)
|
||||
|
||||
|
||||
|
||||
success_payments_reader_df.createOrReplaceTempView('success_payments_reader_df')
|
||||
|
||||
success_payments_reader_dependency_key="success_payments_reader"
|
||||
|
||||
success_payments_reader_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
success_payments_reader_error = e
|
||||
log_error(LOGGER, f"Component success_payments_reader Failed", e)
|
||||
success_payments_reader_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
success_payments_reader_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
success_payments_mapper_start_time=time.time()
|
||||
|
||||
success_payments_mapper_fail_on_error=""
|
||||
try:
|
||||
_success_payments_mapper_select_clause=success_payments_reader_df.columns if False else []
|
||||
|
||||
_success_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''')
|
||||
|
||||
_success_payments_mapper_select_clause.append('''amount AS amount''')
|
||||
|
||||
_success_payments_mapper_select_clause.append('''gateway AS gateway''')
|
||||
|
||||
_success_payments_mapper_select_clause.append('''payment_method AS payment_method''')
|
||||
|
||||
try:
|
||||
success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
|
||||
except Exception as e:
|
||||
success_payments_mapper_df = success_payments_reader_df.limit(0)
|
||||
log_info(LOGGER, f"error while mapping the data :{e} " )
|
||||
success_payments_mapper_df, success_payments_mapper_observer = observe_metrics("success_payments_mapper_df", success_payments_mapper_df)
|
||||
|
||||
|
||||
|
||||
success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df")
|
||||
|
||||
success_payments_mapper_dependency_key="success_payments_mapper"
|
||||
|
||||
success_payments_mapper_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
success_payments_mapper_error = e
|
||||
log_error(LOGGER, f"Component success_payments_mapper Failed", e)
|
||||
success_payments_mapper_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
success_payments_mapper_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
final_success_payments_start_time=time.time()
|
||||
|
||||
print(success_payments_mapper_df.columns)
|
||||
final_success_payments_fail_on_error=""
|
||||
try:
|
||||
try:
|
||||
final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'")
|
||||
except AnalysisException as e:
|
||||
log_info(LOGGER, f"error while filtering data : {e}")
|
||||
final_success_payments_df = success_payments_mapper_df.limit(0)
|
||||
except Exception as e:
|
||||
log_info(LOGGER, f"Unexpected error: {e}")
|
||||
final_success_payments_df = success_payments_mapper_df.limit(0)
|
||||
final_success_payments_df, final_success_payments_observer = observe_metrics("final_success_payments_df", final_success_payments_df)
|
||||
|
||||
|
||||
|
||||
final_success_payments_df.createOrReplaceTempView('final_success_payments_df')
|
||||
|
||||
final_success_payments_dependency_key="final_success_payments"
|
||||
|
||||
final_success_payments_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
final_success_payments_error = e
|
||||
log_error(LOGGER, f"Component final_success_payments Failed", e)
|
||||
final_success_payments_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
final_success_payments_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
high_valued_payments_filter_start_time=time.time()
|
||||
|
||||
print(final_success_payments_df.columns)
|
||||
high_valued_payments_filter_fail_on_error=""
|
||||
try:
|
||||
try:
|
||||
high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500")
|
||||
except AnalysisException as e:
|
||||
log_info(LOGGER, f"error while filtering data : {e}")
|
||||
high_valued_payments_filter_df = final_success_payments_df.limit(0)
|
||||
except Exception as e:
|
||||
log_info(LOGGER, f"Unexpected error: {e}")
|
||||
high_valued_payments_filter_df = final_success_payments_df.limit(0)
|
||||
high_valued_payments_filter_df, high_valued_payments_filter_observer = observe_metrics("high_valued_payments_filter_df", high_valued_payments_filter_df)
|
||||
|
||||
|
||||
|
||||
high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df')
|
||||
|
||||
high_valued_payments_filter_dependency_key="high_valued_payments_filter"
|
||||
|
||||
high_valued_payments_filter_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
high_valued_payments_filter_error = e
|
||||
log_error(LOGGER, f"Component high_valued_payments_filter Failed", e)
|
||||
high_valued_payments_filter_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
high_valued_payments_filter_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
total_payments_and_total_value_processed_start_time=time.time()
|
||||
|
||||
total_payments_and_total_value_processed_fail_on_error="True"
|
||||
try:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
total_payments_and_total_value_processed_df = final_success_payments_df.groupBy(
|
||||
|
||||
"payment_date"
|
||||
).agg(
|
||||
|
||||
count('*').alias("total_payments"),
|
||||
|
||||
sum('amount').alias("total_value_processed")
|
||||
|
||||
)
|
||||
total_payments_and_total_value_processed_df, total_payments_and_total_value_processed_observer = observe_metrics("total_payments_and_total_value_processed_df", total_payments_and_total_value_processed_df)
|
||||
|
||||
|
||||
|
||||
total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df')
|
||||
|
||||
total_payments_and_total_value_processed_dependency_key="total_payments_and_total_value_processed"
|
||||
|
||||
print(final_success_payments_dependency_key)
|
||||
|
||||
total_payments_and_total_value_processed_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
total_payments_and_total_value_processed_error = e
|
||||
log_error(LOGGER, f"Component total_payments_and_total_value_processed Failed", e)
|
||||
total_payments_and_total_value_processed_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
total_payments_and_total_value_processed_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
aggregate__4_start_time=time.time()
|
||||
|
||||
aggregate__4_fail_on_error="True"
|
||||
try:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
aggregate__4_df = final_success_payments_df.groupBy(
|
||||
|
||||
"payment_date",
|
||||
|
||||
"payment_method"
|
||||
).agg(
|
||||
|
||||
count('*').alias("method_count")
|
||||
|
||||
)
|
||||
aggregate__4_df, aggregate__4_observer = observe_metrics("aggregate__4_df", aggregate__4_df)
|
||||
|
||||
|
||||
|
||||
aggregate__4_df.createOrReplaceTempView('aggregate__4_df')
|
||||
|
||||
aggregate__4_dependency_key="aggregate__4"
|
||||
|
||||
print(final_success_payments_dependency_key)
|
||||
|
||||
aggregate__4_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
aggregate__4_error = e
|
||||
log_error(LOGGER, f"Component aggregate__4 Failed", e)
|
||||
aggregate__4_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
aggregate__4_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
data_mapper__5_start_time=time.time()
|
||||
|
||||
data_mapper__5_fail_on_error=""
|
||||
try:
|
||||
_data_mapper__5_select_clause=aggregate__4_df.columns if False else []
|
||||
|
||||
_data_mapper__5_select_clause.append('''payment_date AS payment_date''')
|
||||
|
||||
_data_mapper__5_select_clause.append('''payment_method AS payment_method''')
|
||||
|
||||
_data_mapper__5_select_clause.append('''method_count AS method_count''')
|
||||
|
||||
_data_mapper__5_select_clause.append('''RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method''')
|
||||
|
||||
try:
|
||||
data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'"))
|
||||
except Exception as e:
|
||||
data_mapper__5_df = aggregate__4_df.limit(0)
|
||||
log_info(LOGGER, f"error while mapping the data :{e} " )
|
||||
data_mapper__5_df, data_mapper__5_observer = observe_metrics("data_mapper__5_df", data_mapper__5_df)
|
||||
|
||||
|
||||
|
||||
data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df")
|
||||
|
||||
data_mapper__5_dependency_key="data_mapper__5"
|
||||
|
||||
data_mapper__5_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
data_mapper__5_error = e
|
||||
log_error(LOGGER, f"Component data_mapper__5 Failed", e)
|
||||
data_mapper__5_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
data_mapper__5_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
filter__6_start_time=time.time()
|
||||
|
||||
print(data_mapper__5_df.columns)
|
||||
filter__6_fail_on_error=""
|
||||
try:
|
||||
try:
|
||||
filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1")
|
||||
except AnalysisException as e:
|
||||
log_info(LOGGER, f"error while filtering data : {e}")
|
||||
filter__6_df = data_mapper__5_df.limit(0)
|
||||
except Exception as e:
|
||||
log_info(LOGGER, f"Unexpected error: {e}")
|
||||
filter__6_df = data_mapper__5_df.limit(0)
|
||||
filter__6_df, filter__6_observer = observe_metrics("filter__6_df", filter__6_df)
|
||||
|
||||
|
||||
|
||||
filter__6_df.createOrReplaceTempView('filter__6_df')
|
||||
|
||||
filter__6_dependency_key="filter__6"
|
||||
|
||||
filter__6_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
filter__6_error = e
|
||||
log_error(LOGGER, f"Component filter__6 Failed", e)
|
||||
filter__6_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
filter__6_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
most_used_payment_method___start_time=time.time()
|
||||
|
||||
most_used_payment_method___fail_on_error=""
|
||||
try:
|
||||
_most_used_payment_method___select_clause=filter__6_df.columns if False else []
|
||||
|
||||
_most_used_payment_method___select_clause.append('''payment_date AS payment_date''')
|
||||
|
||||
_most_used_payment_method___select_clause.append('''payment_method AS most_used_payment_method''')
|
||||
|
||||
try:
|
||||
most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'"))
|
||||
except Exception as e:
|
||||
most_used_payment_method___df = filter__6_df.limit(0)
|
||||
log_info(LOGGER, f"error while mapping the data :{e} " )
|
||||
most_used_payment_method___df, most_used_payment_method___observer = observe_metrics("most_used_payment_method___df", most_used_payment_method___df)
|
||||
|
||||
|
||||
|
||||
most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df")
|
||||
|
||||
most_used_payment_method___dependency_key="most_used_payment_method__"
|
||||
|
||||
most_used_payment_method___execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
most_used_payment_method___error = e
|
||||
log_error(LOGGER, f"Component most_used_payment_method__ Failed", e)
|
||||
most_used_payment_method___execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
most_used_payment_method___end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
high_valued_payments___start_time=time.time()
|
||||
|
||||
high_valued_payments___fail_on_error="True"
|
||||
try:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
high_valued_payments___df = high_valued_payments_filter_df.groupBy(
|
||||
|
||||
"payment_date"
|
||||
).agg(
|
||||
|
||||
count('*').alias("high_valued_payments")
|
||||
|
||||
)
|
||||
high_valued_payments___df, high_valued_payments___observer = observe_metrics("high_valued_payments___df", high_valued_payments___df)
|
||||
|
||||
|
||||
|
||||
high_valued_payments___df.createOrReplaceTempView('high_valued_payments___df')
|
||||
|
||||
high_valued_payments___dependency_key="high_valued_payments__"
|
||||
|
||||
print(high_valued_payments_filter_dependency_key)
|
||||
|
||||
high_valued_payments___execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
high_valued_payments___error = e
|
||||
log_error(LOGGER, f"Component high_valued_payments__ Failed", e)
|
||||
high_valued_payments___execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
high_valued_payments___end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
failed_payments_reader_start_time=time.time()
|
||||
|
||||
failed_payments_reader_fail_on_error=""
|
||||
try:
|
||||
failed_payments_reader_df = spark.read.table('dremio.failedpayments')
|
||||
failed_payments_reader_df, failed_payments_reader_observer = observe_metrics("failed_payments_reader_df", failed_payments_reader_df)
|
||||
|
||||
|
||||
|
||||
failed_payments_reader_df.createOrReplaceTempView('failed_payments_reader_df')
|
||||
|
||||
failed_payments_reader_dependency_key="failed_payments_reader"
|
||||
|
||||
failed_payments_reader_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
failed_payments_reader_error = e
|
||||
log_error(LOGGER, f"Component failed_payments_reader Failed", e)
|
||||
failed_payments_reader_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
failed_payments_reader_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
failed_payments_mapper_start_time=time.time()
|
||||
|
||||
failed_payments_mapper_fail_on_error=""
|
||||
try:
|
||||
_failed_payments_mapper_select_clause=failed_payments_reader_df.columns if False else []
|
||||
|
||||
_failed_payments_mapper_select_clause.append('''DATE(payment_date) AS payment_date''')
|
||||
|
||||
_failed_payments_mapper_select_clause.append('''payment_method AS payment_method''')
|
||||
|
||||
_failed_payments_mapper_select_clause.append('''failure_reason AS failure_reason''')
|
||||
|
||||
_failed_payments_mapper_select_clause.append('''gateway AS gateway''')
|
||||
|
||||
try:
|
||||
failed_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_failed_payments_mapper_select_clause) + " FROM failed_payments_reader_df").replace("{job_id}",f"'{job_id}'"))
|
||||
except Exception as e:
|
||||
failed_payments_mapper_df = failed_payments_reader_df.limit(0)
|
||||
log_info(LOGGER, f"error while mapping the data :{e} " )
|
||||
failed_payments_mapper_df, failed_payments_mapper_observer = observe_metrics("failed_payments_mapper_df", failed_payments_mapper_df)
|
||||
|
||||
|
||||
|
||||
failed_payments_mapper_df.createOrReplaceTempView("failed_payments_mapper_df")
|
||||
|
||||
failed_payments_mapper_dependency_key="failed_payments_mapper"
|
||||
|
||||
failed_payments_mapper_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
failed_payments_mapper_error = e
|
||||
log_error(LOGGER, f"Component failed_payments_mapper Failed", e)
|
||||
failed_payments_mapper_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
failed_payments_mapper_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
final_failed_payments_start_time=time.time()
|
||||
|
||||
print(failed_payments_mapper_df.columns)
|
||||
final_failed_payments_df = spark.sql("select * from failed_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.failedpaymentmetrics), (SELECT MIN(payment_date) FROM failed_payments_mapper_df))")
|
||||
final_failed_payments_df.createOrReplaceTempView('final_failed_payments_df')
|
||||
final_failed_payments_df.persist()
|
||||
|
||||
final_failed_payments_end_time=time.time()
|
||||
|
||||
final_failed_payments_dependency_key="final_failed_payments"
|
||||
|
||||
print(failed_payments_mapper_dependency_key)
|
||||
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
filter__13_start_time=time.time()
|
||||
|
||||
print(final_failed_payments_df.columns)
|
||||
filter__13_fail_on_error=""
|
||||
try:
|
||||
try:
|
||||
filter__13_df = spark.sql("select * from final_failed_payments_df where gateway = \'CCS\'")
|
||||
except AnalysisException as e:
|
||||
log_info(LOGGER, f"error while filtering data : {e}")
|
||||
filter__13_df = final_failed_payments_df.limit(0)
|
||||
except Exception as e:
|
||||
log_info(LOGGER, f"Unexpected error: {e}")
|
||||
filter__13_df = final_failed_payments_df.limit(0)
|
||||
filter__13_df, filter__13_observer = observe_metrics("filter__13_df", filter__13_df)
|
||||
|
||||
|
||||
|
||||
filter__13_df.createOrReplaceTempView('filter__13_df')
|
||||
|
||||
filter__13_dependency_key="filter__13"
|
||||
|
||||
filter__13_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
filter__13_error = e
|
||||
log_error(LOGGER, f"Component filter__13 Failed", e)
|
||||
filter__13_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
filter__13_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
total_failed_payments___start_time=time.time()
|
||||
|
||||
total_failed_payments___fail_on_error="True"
|
||||
try:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
total_failed_payments___df = filter__13_df.groupBy(
|
||||
|
||||
"payment_date"
|
||||
).agg(
|
||||
|
||||
count('*').alias("total_failed_payments")
|
||||
|
||||
)
|
||||
total_failed_payments___df, total_failed_payments___observer = observe_metrics("total_failed_payments___df", total_failed_payments___df)
|
||||
|
||||
|
||||
|
||||
total_failed_payments___df.createOrReplaceTempView('total_failed_payments___df')
|
||||
|
||||
total_failed_payments___dependency_key="total_failed_payments__"
|
||||
|
||||
print(filter__13_dependency_key)
|
||||
|
||||
total_failed_payments___execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
total_failed_payments___error = e
|
||||
log_error(LOGGER, f"Component total_failed_payments__ Failed", e)
|
||||
total_failed_payments___execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
total_failed_payments___end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
failed_payment_metrics_start_time=time.time()
|
||||
|
||||
failed_payment_metrics_fail_on_error="True"
|
||||
try:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
failed_payment_metrics_df = final_failed_payments_df.groupBy(
|
||||
|
||||
"payment_date",
|
||||
|
||||
"gateway",
|
||||
|
||||
"failure_reason"
|
||||
).agg(
|
||||
|
||||
count('*').alias("failure_count")
|
||||
|
||||
)
|
||||
failed_payment_metrics_df, failed_payment_metrics_observer = observe_metrics("failed_payment_metrics_df", failed_payment_metrics_df)
|
||||
|
||||
|
||||
|
||||
failed_payment_metrics_df.createOrReplaceTempView('failed_payment_metrics_df')
|
||||
|
||||
failed_payment_metrics_dependency_key="failed_payment_metrics"
|
||||
|
||||
print(final_failed_payments_dependency_key)
|
||||
|
||||
failed_payment_metrics_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
failed_payment_metrics_error = e
|
||||
log_error(LOGGER, f"Component failed_payment_metrics Failed", e)
|
||||
failed_payment_metrics_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
failed_payment_metrics_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
data_writer__15_start_time=time.time()
|
||||
|
||||
data_writer__15_fail_on_error=""
|
||||
try:
|
||||
|
||||
_data_writer__15_fields_to_update = failed_payment_metrics_df.columns
|
||||
_data_writer__15_set_clause=[]
|
||||
_data_writer__15_unique_key_clause= []
|
||||
|
||||
for _key in ['payment_date', 'gateway', 'failure_reason']:
|
||||
_data_writer__15_unique_key_clause.append(f't.{_key} = s.{_key}')
|
||||
|
||||
for _field in _data_writer__15_fields_to_update:
|
||||
if(_field not in _data_writer__15_unique_key_clause):
|
||||
_data_writer__15_set_clause.append(f't.{_field} = s.{_field}')
|
||||
|
||||
_merge_query = '''
|
||||
MERGE INTO dremio.failedpaymentmetrics t
|
||||
USING failed_payment_metrics_df s
|
||||
ON ''' + ' AND '.join(_data_writer__15_unique_key_clause) + ''' WHEN MATCHED THEN
|
||||
UPDATE SET ''' + ', '.join(_data_writer__15_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
||||
|
||||
spark.sql(_merge_query)
|
||||
|
||||
|
||||
|
||||
data_writer__15_dependency_key="data_writer__15"
|
||||
|
||||
print(failed_payment_metrics_dependency_key)
|
||||
|
||||
data_writer__15_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
data_writer__15_error = e
|
||||
log_error(LOGGER, f"Component data_writer__15 Failed", e)
|
||||
data_writer__15_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
data_writer__15_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
success_payment_metrics_start_time=time.time()
|
||||
|
||||
print(total_payments_and_total_value_processed_df.columns)
|
||||
print(most_used_payment_method___df.columns)
|
||||
print(high_valued_payments___df.columns)
|
||||
print(total_failed_payments___df.columns)
|
||||
|
||||
success_payment_metrics_df = spark.sql("""
|
||||
SELECT
|
||||
COALESCE(a.payment_date, d.payment_date) AS payment_date,
|
||||
a.total_payments,
|
||||
a.total_value_processed,
|
||||
b.most_used_payment_method,
|
||||
c.high_valued_payments,
|
||||
d.total_failed_payments
|
||||
FROM total_failed_payments___df d
|
||||
FULL OUTER JOIN total_payments_and_total_value_processed_df a
|
||||
ON a.payment_date = d.payment_date
|
||||
LEFT JOIN most_used_payment_method___df b
|
||||
ON a.payment_date = b.payment_date
|
||||
LEFT JOIN high_valued_payments___df c
|
||||
ON a.payment_date = c.payment_date
|
||||
""")
|
||||
|
||||
success_payment_metrics_df.createOrReplaceTempView('success_payment_metrics_df')
|
||||
|
||||
success_payment_metrics_end_time=time.time()
|
||||
|
||||
success_payment_metrics_dependency_key="success_payment_metrics"
|
||||
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
success_payment_metrics_writer_start_time=time.time()
|
||||
|
||||
success_payment_metrics_writer_fail_on_error=""
|
||||
try:
|
||||
|
||||
_success_payment_metrics_writer_fields_to_update = success_payment_metrics_df.columns
|
||||
_success_payment_metrics_writer_set_clause=[]
|
||||
_success_payment_metrics_writer_unique_key_clause= []
|
||||
|
||||
for _key in ['payment_date']:
|
||||
_success_payment_metrics_writer_unique_key_clause.append(f't.{_key} = s.{_key}')
|
||||
|
||||
for _field in _success_payment_metrics_writer_fields_to_update:
|
||||
if(_field not in _success_payment_metrics_writer_unique_key_clause):
|
||||
_success_payment_metrics_writer_set_clause.append(f't.{_field} = s.{_field}')
|
||||
|
||||
_merge_query = '''
|
||||
MERGE INTO dremio.successpaymentmetrics t
|
||||
USING success_payment_metrics_df s
|
||||
ON ''' + ' AND '.join(_success_payment_metrics_writer_unique_key_clause) + ''' WHEN MATCHED THEN
|
||||
UPDATE SET ''' + ', '.join(_success_payment_metrics_writer_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
||||
|
||||
spark.sql(_merge_query)
|
||||
|
||||
|
||||
|
||||
success_payment_metrics_writer_dependency_key="success_payment_metrics_writer"
|
||||
|
||||
success_payment_metrics_writer_execute_status="SUCCESS"
|
||||
except Exception as e:
|
||||
success_payment_metrics_writer_error = e
|
||||
log_error(LOGGER, f"Component success_payment_metrics_writer Failed", e)
|
||||
success_payment_metrics_writer_execute_status="ERROR"
|
||||
|
||||
raise e
|
||||
|
||||
finally:
|
||||
success_payment_metrics_writer_end_time=time.time()
|
||||
|
||||
# %%
|
||||
|
||||
finalize_start_time=time.time()
|
||||
|
||||
metrics = {
|
||||
'data': collect_metrics(locals()),
|
||||
}
|
||||
materialization.materialized_execution_history({'finalize': {'execute_status': 'SUCCESS', 'fail_on_error': 'False'}, **metrics['data']})
|
||||
log_info(LOGGER, f"Workflow Data metrics: {metrics['data']}")
|
||||
|
||||
finalize_end_time=time.time()
|
||||
|
||||
spark.stop()
|
||||
1065
payment_metrics/main.py.notebook
Normal file
1065
payment_metrics/main.py.notebook
Normal file
File diff suppressed because it is too large
Load Diff
1
payment_metrics/main.workflow
Normal file
1
payment_metrics/main.workflow
Normal file
File diff suppressed because one or more lines are too long
167
service_request_metrics/main.py
Normal file
167
service_request_metrics/main.py
Normal file
@@ -0,0 +1,167 @@
|
||||
|
||||
__generated_with = "0.13.15"
|
||||
|
||||
# %%
|
||||
|
||||
import sys
|
||||
sys.path.append('/opt/spark/work-dir/')
|
||||
from workflow_templates.spark.udf_manager import bootstrap_udfs
|
||||
from pyspark.sql.functions import udf
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql.types import StringType, IntegerType
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from pyspark import SparkConf, Row
|
||||
from pyspark.sql import SparkSession
|
||||
import os
|
||||
import pandas as pd
|
||||
import polars as pl
|
||||
import pyarrow as pa
|
||||
from pyspark.sql.functions import expr,to_json,col,struct
|
||||
from functools import reduce
|
||||
from handle_structs_or_arrays import preprocess_then_expand
|
||||
import requests
|
||||
from jinja2 import Template
|
||||
import json
|
||||
|
||||
|
||||
from secrets_manager import SecretsManager
|
||||
|
||||
from WorkflowManager import WorkflowDSL, WorkflowManager
|
||||
from KnowledgebaseManager import KnowledgebaseManager
|
||||
from gitea_client import GiteaClient, WorkspaceVersionedContent
|
||||
|
||||
from dremio.flight.endpoint import DremioFlightEndpoint
|
||||
from dremio.flight.query import DremioFlightEndpointQuery
|
||||
|
||||
|
||||
alias_str='abcdefghijklmnopqrstuvwxyz'
|
||||
workspace = os.getenv('WORKSPACE') or 'exp360cust'
|
||||
|
||||
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
|
||||
|
||||
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
|
||||
secrets = sm.list_secrets(workspace)
|
||||
|
||||
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
|
||||
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
|
||||
conf = SparkConf()
|
||||
params = {
|
||||
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
|
||||
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
|
||||
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
|
||||
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
|
||||
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
|
||||
"spark.sql.catalog.dremio.type" : "hadoop",
|
||||
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
|
||||
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
||||
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
|
||||
}
|
||||
|
||||
|
||||
|
||||
conf.setAll(list(params.items()))
|
||||
|
||||
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
|
||||
bootstrap_udfs(spark)
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
|
||||
actions_audit_reader_df = spark.read.table('dremio.actionsaudit')
|
||||
actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df')
|
||||
|
||||
# %%
|
||||
|
||||
_actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else []
|
||||
|
||||
_actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date")
|
||||
|
||||
_actions_audit_mapper_select_clause.append("sub_category AS service_type")
|
||||
|
||||
_actions_audit_mapper_select_clause.append("action_count AS action_count")
|
||||
|
||||
|
||||
actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'"))
|
||||
actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df")
|
||||
|
||||
# %%
|
||||
|
||||
print(actions_audit_mapper_df.columns)
|
||||
actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))")
|
||||
actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df')
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
_params = {
|
||||
"datasource": "actions_audit_filter",
|
||||
"selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
|
||||
}
|
||||
|
||||
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df,
|
||||
group_expression="action_date, service_type",
|
||||
cube="",
|
||||
rollup="",
|
||||
grouping_set="",
|
||||
select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
|
||||
)
|
||||
|
||||
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
||||
for f in _rewritten_selects
|
||||
]
|
||||
|
||||
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
||||
|
||||
_partials = []
|
||||
for _gs in _grouping_specs:
|
||||
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
||||
for _col in _all_group_cols:
|
||||
if _col not in _gs:
|
||||
_gdf = _gdf.withColumn(_col, lit(None))
|
||||
_partials.append(_gdf)
|
||||
|
||||
|
||||
aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials)
|
||||
|
||||
aggregate__3_df.createOrReplaceTempView('aggregate__3_df')
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# %%
|
||||
|
||||
|
||||
|
||||
|
||||
_data_writer__5_fields_to_update = aggregate__3_df.columns
|
||||
_data_writer__5_set_clause=[]
|
||||
_data_writer__5_unique_key_clause= []
|
||||
|
||||
for _key in ['action_date', 'service_type']:
|
||||
_data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}')
|
||||
|
||||
for _field in _data_writer__5_fields_to_update:
|
||||
if(_field not in _data_writer__5_unique_key_clause):
|
||||
_data_writer__5_set_clause.append(f't.{_field} = s.{_field}')
|
||||
|
||||
_merge_query = '''
|
||||
MERGE INTO dremio.servicemetrics t
|
||||
USING aggregate__3_df s
|
||||
ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN
|
||||
UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
||||
|
||||
spark.sql(_merge_query)
|
||||
|
||||
|
||||
197
service_request_metrics/main.py.notebook
Normal file
197
service_request_metrics/main.py.notebook
Normal file
@@ -0,0 +1,197 @@
|
||||
import marimo
|
||||
|
||||
__generated_with = "0.13.15"
|
||||
app = marimo.App()
|
||||
|
||||
|
||||
@app.cell
|
||||
def init():
|
||||
|
||||
import sys
|
||||
sys.path.append('/opt/spark/work-dir/')
|
||||
from workflow_templates.spark.udf_manager import bootstrap_udfs
|
||||
from pyspark.sql.functions import udf
|
||||
from pyspark.sql.functions import lit
|
||||
from pyspark.sql.types import StringType, IntegerType
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from pyspark import SparkConf, Row
|
||||
from pyspark.sql import SparkSession
|
||||
import os
|
||||
import pandas as pd
|
||||
import polars as pl
|
||||
import pyarrow as pa
|
||||
from pyspark.sql.functions import expr,to_json,col,struct
|
||||
from functools import reduce
|
||||
from handle_structs_or_arrays import preprocess_then_expand
|
||||
import requests
|
||||
from jinja2 import Template
|
||||
import json
|
||||
|
||||
|
||||
from secrets_manager import SecretsManager
|
||||
|
||||
from WorkflowManager import WorkflowDSL, WorkflowManager
|
||||
from KnowledgebaseManager import KnowledgebaseManager
|
||||
from gitea_client import GiteaClient, WorkspaceVersionedContent
|
||||
|
||||
from dremio.flight.endpoint import DremioFlightEndpoint
|
||||
from dremio.flight.query import DremioFlightEndpointQuery
|
||||
|
||||
|
||||
alias_str='abcdefghijklmnopqrstuvwxyz'
|
||||
workspace = os.getenv('WORKSPACE') or 'exp360cust'
|
||||
|
||||
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
|
||||
|
||||
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
|
||||
secrets = sm.list_secrets(workspace)
|
||||
|
||||
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
|
||||
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
|
||||
conf = SparkConf()
|
||||
params = {
|
||||
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
|
||||
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
|
||||
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
|
||||
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
|
||||
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
|
||||
"spark.sql.catalog.dremio.type" : "hadoop",
|
||||
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
|
||||
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
|
||||
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
|
||||
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
|
||||
}
|
||||
|
||||
|
||||
|
||||
conf.setAll(list(params.items()))
|
||||
|
||||
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
|
||||
bootstrap_udfs(spark)
|
||||
return expr, job_id, lit, preprocess_then_expand, reduce, spark
|
||||
|
||||
|
||||
@app.cell
|
||||
def actions_audit_reader(spark):
|
||||
|
||||
|
||||
|
||||
actions_audit_reader_df = spark.read.table('dremio.actionsaudit')
|
||||
actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df')
|
||||
return (actions_audit_reader_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def actions_audit_mapper(actions_audit_reader_df, job_id, spark):
|
||||
|
||||
_actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else []
|
||||
|
||||
_actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date")
|
||||
|
||||
_actions_audit_mapper_select_clause.append("sub_category AS service_type")
|
||||
|
||||
_actions_audit_mapper_select_clause.append("action_count AS action_count")
|
||||
|
||||
|
||||
actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'"))
|
||||
actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df")
|
||||
return (actions_audit_mapper_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def actions_audit_filter(actions_audit_mapper_df, spark):
|
||||
|
||||
print(actions_audit_mapper_df.columns)
|
||||
actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))")
|
||||
actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df')
|
||||
return (actions_audit_filter_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def aggregate__3(
|
||||
actions_audit_filter_df,
|
||||
expr,
|
||||
lit,
|
||||
preprocess_then_expand,
|
||||
reduce,
|
||||
):
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
_params = {
|
||||
"datasource": "actions_audit_filter",
|
||||
"selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
|
||||
}
|
||||
|
||||
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df,
|
||||
group_expression="action_date, service_type",
|
||||
cube="",
|
||||
rollup="",
|
||||
grouping_set="",
|
||||
select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
|
||||
)
|
||||
|
||||
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
|
||||
for f in _rewritten_selects
|
||||
]
|
||||
|
||||
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
|
||||
|
||||
_partials = []
|
||||
for _gs in _grouping_specs:
|
||||
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
|
||||
for _col in _all_group_cols:
|
||||
if _col not in _gs:
|
||||
_gdf = _gdf.withColumn(_col, lit(None))
|
||||
_partials.append(_gdf)
|
||||
|
||||
|
||||
aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials)
|
||||
|
||||
aggregate__3_df.createOrReplaceTempView('aggregate__3_df')
|
||||
|
||||
|
||||
|
||||
|
||||
return (aggregate__3_df,)
|
||||
|
||||
|
||||
@app.cell
|
||||
def data_writer__5(aggregate__3_df, spark):
|
||||
|
||||
|
||||
|
||||
|
||||
_data_writer__5_fields_to_update = aggregate__3_df.columns
|
||||
_data_writer__5_set_clause=[]
|
||||
_data_writer__5_unique_key_clause= []
|
||||
|
||||
for _key in ['action_date', 'service_type']:
|
||||
_data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}')
|
||||
|
||||
for _field in _data_writer__5_fields_to_update:
|
||||
if(_field not in _data_writer__5_unique_key_clause):
|
||||
_data_writer__5_set_clause.append(f't.{_field} = s.{_field}')
|
||||
|
||||
_merge_query = '''
|
||||
MERGE INTO dremio.servicemetrics t
|
||||
USING aggregate__3_df s
|
||||
ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN
|
||||
UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
|
||||
|
||||
spark.sql(_merge_query)
|
||||
|
||||
|
||||
return
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run()
|
||||
1
service_request_metrics/main.workflow
Normal file
1
service_request_metrics/main.workflow
Normal file
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user