import marimo __generated_with = "0.13.15" app = marimo.App() @app.cell def init(): import sys sys.path.append('/opt/spark/work-dir/') from workflow_templates.spark.udf_manager import bootstrap_udfs from pyspark.sql.functions import udf from pyspark.sql.functions import lit from pyspark.sql.types import StringType, IntegerType import uuid from pathlib import Path from pyspark import SparkConf, Row from pyspark.sql import SparkSession import os import pandas as pd import polars as pl import pyarrow as pa from pyspark.sql.functions import expr,to_json,col,struct from functools import reduce from handle_structs_or_arrays import preprocess_then_expand import requests from jinja2 import Template import json from secrets_manager import SecretsManager from WorkflowManager import WorkflowDSL, WorkflowManager from KnowledgebaseManager import KnowledgebaseManager from gitea_client import GiteaClient, WorkspaceVersionedContent from dremio.flight.endpoint import DremioFlightEndpoint from dremio.flight.query import DremioFlightEndpointQuery alias_str='abcdefghijklmnopqrstuvwxyz' workspace = os.getenv('WORKSPACE') or 'exp360cust' job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) secrets = sm.list_secrets(workspace) gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) conf = SparkConf() params = { "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), "spark.hadoop.fs.s3a.aws.region": "us-west-1", "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dremio.type" : "hadoop", "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" } conf.setAll(list(params.items())) spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() bootstrap_udfs(spark) return expr, job_id, lit, preprocess_then_expand, reduce, spark @app.cell def actions_audit_reader(spark): actions_audit_reader_df = spark.read.table('dremio.actionsaudit') actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df') return (actions_audit_reader_df,) @app.cell def actions_audit_mapper(actions_audit_reader_df, job_id, spark): _actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else [] _actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date") _actions_audit_mapper_select_clause.append("sub_category AS service_type") _actions_audit_mapper_select_clause.append("action_count AS action_count") actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'")) actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df") return (actions_audit_mapper_df,) @app.cell def actions_audit_filter(actions_audit_mapper_df, spark): print(actions_audit_mapper_df.columns) actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))") actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df') return (actions_audit_filter_df,) @app.cell def aggregate__3( actions_audit_filter_df, expr, lit, preprocess_then_expand, reduce, ): _params = { "datasource": "actions_audit_filter", "selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}] } _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df, group_expression="action_date, service_type", cube="", rollup="", grouping_set="", select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}] ) _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) for f in _rewritten_selects ] _all_group_cols = list({c for gs in _grouping_specs for c in gs}) _partials = [] for _gs in _grouping_specs: _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) for _col in _all_group_cols: if _col not in _gs: _gdf = _gdf.withColumn(_col, lit(None)) _partials.append(_gdf) aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials) aggregate__3_df.createOrReplaceTempView('aggregate__3_df') return (aggregate__3_df,) @app.cell def data_writer__5(aggregate__3_df, spark): _data_writer__5_fields_to_update = aggregate__3_df.columns _data_writer__5_set_clause=[] _data_writer__5_unique_key_clause= [] for _key in ['action_date', 'service_type']: _data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}') for _field in _data_writer__5_fields_to_update: if(_field not in _data_writer__5_unique_key_clause): _data_writer__5_set_clause.append(f't.{_field} = s.{_field}') _merge_query = ''' MERGE INTO dremio.servicemetrics t USING aggregate__3_df s ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' spark.sql(_merge_query) return if __name__ == "__main__": app.run()