From 00276026215fa0959a4c0f0d6cc546b79f380cdc Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 29 Aug 2025 05:56:22 +0000 Subject: [PATCH] Workflow saved --- service_request_metrics/main.py | 167 +++++++++++++++++++ service_request_metrics/main.py.notebook | 197 +++++++++++++++++++++++ service_request_metrics/main.workflow | 1 + 3 files changed, 365 insertions(+) create mode 100644 service_request_metrics/main.py create mode 100644 service_request_metrics/main.py.notebook create mode 100644 service_request_metrics/main.workflow diff --git a/service_request_metrics/main.py b/service_request_metrics/main.py new file mode 100644 index 0000000..50cb668 --- /dev/null +++ b/service_request_metrics/main.py @@ -0,0 +1,167 @@ + +__generated_with = "0.13.15" + +# %% + +import sys +sys.path.append('/opt/spark/work-dir/') +from workflow_templates.spark.udf_manager import bootstrap_udfs +from pyspark.sql.functions import udf +from pyspark.sql.functions import lit +from pyspark.sql.types import StringType, IntegerType +import uuid +from pathlib import Path +from pyspark import SparkConf, Row +from pyspark.sql import SparkSession +import os +import pandas as pd +import polars as pl +import pyarrow as pa +from pyspark.sql.functions import expr,to_json,col,struct +from functools import reduce +from handle_structs_or_arrays import preprocess_then_expand +import requests +from jinja2 import Template +import json + + +from secrets_manager import SecretsManager + +from WorkflowManager import WorkflowDSL, WorkflowManager +from KnowledgebaseManager import KnowledgebaseManager +from gitea_client import GiteaClient, WorkspaceVersionedContent + +from dremio.flight.endpoint import DremioFlightEndpoint +from dremio.flight.query import DremioFlightEndpointQuery + + +alias_str='abcdefghijklmnopqrstuvwxyz' +workspace = os.getenv('WORKSPACE') or 'exp360cust' + +job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) + +sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) +secrets = sm.list_secrets(workspace) + +gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') +workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) +conf = SparkConf() +params = { + "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), + "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), + "spark.hadoop.fs.s3a.aws.region": "us-west-1", + "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), + "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.dremio.type" : "hadoop", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" +} + + + +conf.setAll(list(params.items())) + +spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() +bootstrap_udfs(spark) + +# %% + + + +actions_audit_reader_df = spark.read.table('dremio.actionsaudit') +actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df') + +# %% + +_actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else [] + +_actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date") + +_actions_audit_mapper_select_clause.append("sub_category AS service_type") + +_actions_audit_mapper_select_clause.append("action_count AS action_count") + + +actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'")) +actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df") + +# %% + +print(actions_audit_mapper_df.columns) +actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))") +actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df') + +# %% + + + + + + + + + +_params = { + "datasource": "actions_audit_filter", + "selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}] +} + +_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df, +group_expression="action_date, service_type", +cube="", +rollup="", +grouping_set="", +select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}] +) + +_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) +for f in _rewritten_selects +] + +_all_group_cols = list({c for gs in _grouping_specs for c in gs}) + +_partials = [] +for _gs in _grouping_specs: + _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) + for _col in _all_group_cols: + if _col not in _gs: + _gdf = _gdf.withColumn(_col, lit(None)) + _partials.append(_gdf) + + +aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials) + +aggregate__3_df.createOrReplaceTempView('aggregate__3_df') + + + + + +# %% + + + + +_data_writer__5_fields_to_update = aggregate__3_df.columns +_data_writer__5_set_clause=[] +_data_writer__5_unique_key_clause= [] + +for _key in ['action_date', 'service_type']: + _data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}') + +for _field in _data_writer__5_fields_to_update: + if(_field not in _data_writer__5_unique_key_clause): + _data_writer__5_set_clause.append(f't.{_field} = s.{_field}') + +_merge_query = ''' + MERGE INTO dremio.servicemetrics t + USING aggregate__3_df s + ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN + UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' + +spark.sql(_merge_query) + + diff --git a/service_request_metrics/main.py.notebook b/service_request_metrics/main.py.notebook new file mode 100644 index 0000000..c448b3f --- /dev/null +++ b/service_request_metrics/main.py.notebook @@ -0,0 +1,197 @@ +import marimo + +__generated_with = "0.13.15" +app = marimo.App() + + +@app.cell +def init(): + + import sys + sys.path.append('/opt/spark/work-dir/') + from workflow_templates.spark.udf_manager import bootstrap_udfs + from pyspark.sql.functions import udf + from pyspark.sql.functions import lit + from pyspark.sql.types import StringType, IntegerType + import uuid + from pathlib import Path + from pyspark import SparkConf, Row + from pyspark.sql import SparkSession + import os + import pandas as pd + import polars as pl + import pyarrow as pa + from pyspark.sql.functions import expr,to_json,col,struct + from functools import reduce + from handle_structs_or_arrays import preprocess_then_expand + import requests + from jinja2 import Template + import json + + + from secrets_manager import SecretsManager + + from WorkflowManager import WorkflowDSL, WorkflowManager + from KnowledgebaseManager import KnowledgebaseManager + from gitea_client import GiteaClient, WorkspaceVersionedContent + + from dremio.flight.endpoint import DremioFlightEndpoint + from dremio.flight.query import DremioFlightEndpointQuery + + + alias_str='abcdefghijklmnopqrstuvwxyz' + workspace = os.getenv('WORKSPACE') or 'exp360cust' + + job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) + + sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) + secrets = sm.list_secrets(workspace) + + gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') + workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) + conf = SparkConf() + params = { + "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), + "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), + "spark.hadoop.fs.s3a.aws.region": "us-west-1", + "spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'), + "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.dremio.type" : "hadoop", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0" + } + + + + conf.setAll(list(params.items())) + + spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() + bootstrap_udfs(spark) + return expr, job_id, lit, preprocess_then_expand, reduce, spark + + +@app.cell +def actions_audit_reader(spark): + + + + actions_audit_reader_df = spark.read.table('dremio.actionsaudit') + actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df') + return (actions_audit_reader_df,) + + +@app.cell +def actions_audit_mapper(actions_audit_reader_df, job_id, spark): + + _actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else [] + + _actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date") + + _actions_audit_mapper_select_clause.append("sub_category AS service_type") + + _actions_audit_mapper_select_clause.append("action_count AS action_count") + + + actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'")) + actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df") + return (actions_audit_mapper_df,) + + +@app.cell +def actions_audit_filter(actions_audit_mapper_df, spark): + + print(actions_audit_mapper_df.columns) + actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))") + actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df') + return (actions_audit_filter_df,) + + +@app.cell +def aggregate__3( + actions_audit_filter_df, + expr, + lit, + preprocess_then_expand, + reduce, +): + + + + + + + + + + _params = { + "datasource": "actions_audit_filter", + "selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}] + } + + _df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df, + group_expression="action_date, service_type", + cube="", + rollup="", + grouping_set="", + select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}] + ) + + _agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"]) + for f in _rewritten_selects + ] + + _all_group_cols = list({c for gs in _grouping_specs for c in gs}) + + _partials = [] + for _gs in _grouping_specs: + _gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs) + for _col in _all_group_cols: + if _col not in _gs: + _gdf = _gdf.withColumn(_col, lit(None)) + _partials.append(_gdf) + + + aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials) + + aggregate__3_df.createOrReplaceTempView('aggregate__3_df') + + + + + return (aggregate__3_df,) + + +@app.cell +def data_writer__5(aggregate__3_df, spark): + + + + + _data_writer__5_fields_to_update = aggregate__3_df.columns + _data_writer__5_set_clause=[] + _data_writer__5_unique_key_clause= [] + + for _key in ['action_date', 'service_type']: + _data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}') + + for _field in _data_writer__5_fields_to_update: + if(_field not in _data_writer__5_unique_key_clause): + _data_writer__5_set_clause.append(f't.{_field} = s.{_field}') + + _merge_query = ''' + MERGE INTO dremio.servicemetrics t + USING aggregate__3_df s + ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN + UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *' + + spark.sql(_merge_query) + + + return + + +if __name__ == "__main__": + app.run() diff --git a/service_request_metrics/main.workflow b/service_request_metrics/main.workflow new file mode 100644 index 0000000..3899f66 --- /dev/null +++ b/service_request_metrics/main.workflow @@ -0,0 +1 @@ +{"version":"v1alpha","kind":"VisualBuilder","metadata":{"name":"service_request_metrics","description":"Service request metrics workflow","runtime":"spark"},"spec":{"ui":{"edges":[{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-reader__0","target":"data-mapper__1","id":"xy-edge__data-reader__0-data-mapper__1"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"data-mapper__1","target":"filter__2","id":"xy-edge__data-mapper__1-filter__2"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"filter__2","target":"aggregate__3","id":"xy-edge__filter__2-aggregate__3"},{"style":{"stroke":"var(--color-primary-lighter)","strokeWidth":1},"source":"aggregate__3","target":"data-writer__5","id":"xy-edge__aggregate__3-data-writer__5"}],"nodes":[{"id":"data-reader__0","type":"workflowNode","position":{"x":-1794,"y":-34},"data":{"nodeType":"data-reader","id":"data-reader__0"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-mapper__1","type":"workflowNode","position":{"x":-1347.5,"y":-36.499999999999886},"data":{"nodeType":"data-mapper","id":"data-mapper__1"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"filter__2","type":"workflowNode","position":{"x":-899.5,"y":-34.499999999999886},"data":{"nodeType":"filter","id":"filter__2"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"aggregate__3","type":"workflowNode","position":{"x":-473.5,"y":-32.499999999999886},"data":{"nodeType":"aggregate","id":"aggregate__3"},"measured":{"width":320,"height":110},"selected":false,"dragging":false},{"id":"data-writer__5","type":"workflowNode","position":{"x":-7.5,"y":-52.499999999999886},"data":{"nodeType":"data-writer","id":"data-writer__5"},"measured":{"width":320,"height":110},"selected":true,"dragging":false}],"nodesData":{"data-reader__0":{"iceberg_catalog":"dremio","typeLabel":"Spark","isDefault":false,"name":"actions_audit_reader","type":"SparkIcebergReader","table_name":"actionsaudit","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{}},"data-mapper__1":{"name":"actions_audit_mapper","type":"DataMapping","fromDataReader":"actions_audit_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"action_date","valueExpression":"DATE(action_date)"},{"fieldName":"service_type","valueExpression":"sub_category"},{"fieldName":"action_count","valueExpression":"action_count"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756445643483","newFieldName":"action_date","mappingType":"expression","value":"DATE(action_date)"},{"id":"mapping-1756445668299","newFieldName":"service_type","mappingType":"sourceColumn","value":"sub_category"},{"id":"mapping-1756445681282","newFieldName":"action_count","mappingType":"sourceColumn","value":"action_count"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},"filter__2":{"name":"actions_audit_filter","type":"Filter","datasource":"actions_audit_mapper","condition":"action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))","isDefault":false},"aggregate__3":{"name":"aggregate__3","type":"SQLAggregation","datasource":"actions_audit_filter","groupByParams":{"group_expression":"action_date, service_type"},"selectFunctions":[{"fieldName":"service_count","aggregationFunction":"SUM(action_count)"}],"isDefault":false},"data-writer__5":{"name":"data_writer__5","type":"IcebergWriter","iceberg_catalog":"dremio","warehouse_directory":"servicemetrics","datasource":"aggregate__3","mode":"merge","typeLabel":"Iceberg","unique_id":["action_date","service_type"],"isDefault":false}}},"blocks":[{"name":"actions_audit_reader","type":"SparkIcebergReader","options":{},"iceberg_catalog":"dremio","typeLabel":"Spark","isDefault":false,"table_name":"actionsaudit","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"}},{"name":"actions_audit_mapper","type":"DataMapping","options":{},"fromDataReader":"actions_audit_reader","includeExistingColumns":false,"toSchema":[{"fieldName":"action_date","valueExpression":"DATE(action_date)"},{"fieldName":"service_type","valueExpression":"sub_category"},{"fieldName":"action_count","valueExpression":"action_count"}],"additionalData":{"isGlossaryAssisted":false,"selectedSourceGlossary":"","selectedTargetGlossary":"","manualMappings":[{"id":"mapping-1756445643483","newFieldName":"action_date","mappingType":"expression","value":"DATE(action_date)"},{"id":"mapping-1756445668299","newFieldName":"service_type","mappingType":"sourceColumn","value":"sub_category"},{"id":"mapping-1756445681282","newFieldName":"action_count","mappingType":"sourceColumn","value":"action_count"}],"confirmedMapping":[],"mappingBatchKeysAfterConfirm":[],"after":"","totalSourceTerms":0},"isDefault":false},{"name":"actions_audit_filter","type":"Filter","options":{},"datasource":"actions_audit_mapper","condition":"action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))","isDefault":false},{"name":"aggregate__3","type":"SQLAggregation","options":{},"datasource":"actions_audit_filter","groupByParams":{"group_expression":"action_date, service_type"},"selectFunctions":[{"fieldName":"service_count","aggregationFunction":"SUM(action_count)"}],"isDefault":false},{"name":"data_writer__5","type":"IcebergWriter","options":{},"iceberg_catalog":"dremio","warehouse_directory":"servicemetrics","datasource":"aggregate__3","mode":"merge","typeLabel":"Iceberg","unique_id":["action_date","service_type"],"isDefault":false}]}} \ No newline at end of file