__generated_with = "0.13.15" # %% import sys sys.path.append('/opt/spark/work-dir/') from workflow_templates.spark.udf_manager import bootstrap_udfs from pyspark.sql.functions import udf from pyspark.sql.functions import lit from pyspark.sql.types import StringType, IntegerType import uuid from pathlib import Path from pyspark import SparkConf, Row from pyspark.sql import SparkSession import os import pandas as pd import polars as pl import pyarrow as pa from pyspark.sql.functions import expr,to_json,col,struct from functools import reduce from handle_structs_or_arrays import preprocess_then_expand import requests from jinja2 import Template import json from secrets_manager import SecretsManager from WorkflowManager import WorkflowDSL, WorkflowManager from KnowledgebaseManager import KnowledgebaseManager from gitea_client import GiteaClient, WorkspaceVersionedContent from dremio.flight.endpoint import DremioFlightEndpoint from dremio.flight.query import DremioFlightEndpointQuery alias_str='abcdefghijklmnopqrstuvwxyz' workspace = os.getenv('WORKSPACE') or 'exp360cust' job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) secrets = sm.list_secrets(workspace) gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) conf = SparkConf() params = { "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), "spark.hadoop.fs.s3a.aws.region": "us-west-1", "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dremio.type" : "hadoop", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" } conf.setAll(list(params.items())) spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() bootstrap_udfs(spark) # %% success_payments_df = spark.read.table('dremio.payments') success_payments_df.createOrReplaceTempView('success_payments_df') # %% _success_payments_mapper_select_clause=success_payments_df.columns if False else [] _success_payments_mapper_select_clause.append("DATE(payment_date) AS payment_date") _success_payments_mapper_select_clause.append("amount AS amount") _success_payments_mapper_select_clause.append("gateway AS gateway") _success_payments_mapper_select_clause.append("payment_method AS payment_method") success_payments_mapper_df=spark.sql(("SELECT " + ', '.join(_success_payments_mapper_select_clause) + " FROM success_payments_df").replace("{job_id}",f"'{job_id}'")) success_payments_mapper_df.createOrReplaceTempView("success_payments_mapper_df") # %% print(success_payments_mapper_df.columns) final_success_payments_df = spark.sql("select * from success_payments_mapper_df where payment_date >= COALESCE((SELECT MAX(DATE(payment_date)) FROM dremio.successpaymentmetrics), (SELECT MIN(payment_date) FROM success_payments_mapper_df)) AND gateway = \'CCS\'") final_success_payments_df.createOrReplaceTempView('final_success_payments_df') # %% print(final_success_payments_df.columns) high_valued_payments_filter_df = spark.sql("select * from final_success_payments_df where amount >= 500") high_valued_payments_filter_df.createOrReplaceTempView('high_valued_payments_filter_df') # %% _params = { "datasource": "final_success_payments", "selectFunctions" : [{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] } _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, group_expression="payment_date", cube="", rollup="", grouping_set="", select_functions=[{'fieldName': 'total_payments', 'aggregationFunction': 'COUNT(*)'}, {'fieldName': 'total_value_processed', 'aggregationFunction': 'SUM(amount)'}] ) _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) for f in _rewritten_selects ] _all_group_cols = list({c for gs in _grouping_specs for c in gs}) _partials = [] for _gs in _grouping_specs: _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) for _col in _all_group_cols: if _col not in _gs: _gdf = _gdf.withColumn(_col, lit(None)) _partials.append(_gdf) total_payments_and_total_value_processed_df = reduce(lambda a, b: a.unionByName(b), _partials) total_payments_and_total_value_processed_df.createOrReplaceTempView('total_payments_and_total_value_processed_df') # %% _params = { "datasource": "final_success_payments", "selectFunctions" : [{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] } _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( final_success_payments_df, group_expression="payment_date, payment_method", cube="", rollup="", grouping_set="", select_functions=[{'fieldName': 'method_count', 'aggregationFunction': 'COUNT(*)'}] ) _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) for f in _rewritten_selects ] _all_group_cols = list({c for gs in _grouping_specs for c in gs}) _partials = [] for _gs in _grouping_specs: _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) for _col in _all_group_cols: if _col not in _gs: _gdf = _gdf.withColumn(_col, lit(None)) _partials.append(_gdf) aggregate__4_df = reduce(lambda a, b: a.unionByName(b), _partials) aggregate__4_df.createOrReplaceTempView('aggregate__4_df') # %% _data_mapper__5_select_clause=aggregate__4_df.columns if False else [] _data_mapper__5_select_clause.append("payment_date AS payment_date") _data_mapper__5_select_clause.append("payment_method AS payment_method") _data_mapper__5_select_clause.append("method_count AS method_count") _data_mapper__5_select_clause.append("RANK() OVER (PARTITION BY payment_date ORDER BY method_count) AS rank_method") data_mapper__5_df=spark.sql(("SELECT " + ', '.join(_data_mapper__5_select_clause) + " FROM aggregate__4_df").replace("{job_id}",f"'{job_id}'")) data_mapper__5_df.createOrReplaceTempView("data_mapper__5_df") # %% print(data_mapper__5_df.columns) filter__6_df = spark.sql("select * from data_mapper__5_df where rank_method = 1") filter__6_df.createOrReplaceTempView('filter__6_df') # %% _most_used_payment_method___select_clause=filter__6_df.columns if False else [] _most_used_payment_method___select_clause.append("payment_date AS payment_date") _most_used_payment_method___select_clause.append("payment_method AS most_used_payment_method") most_used_payment_method___df=spark.sql(("SELECT " + ', '.join(_most_used_payment_method___select_clause) + " FROM filter__6_df").replace("{job_id}",f"'{job_id}'")) most_used_payment_method___df.createOrReplaceTempView("most_used_payment_method___df") # %% _params = { "datasource": "high_valued_payments_filter", "selectFunctions" : [{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] } _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( high_valued_payments_filter_df, group_expression="payment_date", cube="", rollup="", grouping_set="", select_functions=[{'fieldName': 'high_valued_payments', 'aggregationFunction': 'COUNT(*)'}] ) _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) for f in _rewritten_selects ] _all_group_cols = list({c for gs in _grouping_specs for c in gs}) _partials = [] for _gs in _grouping_specs: _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) for _col in _all_group_cols: if _col not in _gs: _gdf = _gdf.withColumn(_col, lit(None)) _partials.append(_gdf) high_valued_payments_df = reduce(lambda a, b: a.unionByName(b), _partials) high_valued_payments_df.createOrReplaceTempView('high_valued_payments_df')