Workflow saved

This commit is contained in:
unknown
2025-08-29 05:56:22 +00:00
parent c5a5dfb7cc
commit 0027602621
3 changed files with 365 additions and 0 deletions

View File

@@ -0,0 +1,167 @@
__generated_with = "0.13.15"
# %%
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
# %%
actions_audit_reader_df = spark.read.table('dremio.actionsaudit')
actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df')
# %%
_actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else []
_actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date")
_actions_audit_mapper_select_clause.append("sub_category AS service_type")
_actions_audit_mapper_select_clause.append("action_count AS action_count")
actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'"))
actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df")
# %%
print(actions_audit_mapper_df.columns)
actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))")
actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df')
# %%
_params = {
"datasource": "actions_audit_filter",
"selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df,
group_expression="action_date, service_type",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials)
aggregate__3_df.createOrReplaceTempView('aggregate__3_df')
# %%
_data_writer__5_fields_to_update = aggregate__3_df.columns
_data_writer__5_set_clause=[]
_data_writer__5_unique_key_clause= []
for _key in ['action_date', 'service_type']:
_data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _data_writer__5_fields_to_update:
if(_field not in _data_writer__5_unique_key_clause):
_data_writer__5_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.servicemetrics t
USING aggregate__3_df s
ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)

View File

@@ -0,0 +1,197 @@
import marimo
__generated_with = "0.13.15"
app = marimo.App()
@app.cell
def init():
import sys
sys.path.append('/opt/spark/work-dir/')
from workflow_templates.spark.udf_manager import bootstrap_udfs
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, IntegerType
import uuid
from pathlib import Path
from pyspark import SparkConf, Row
from pyspark.sql import SparkSession
import os
import pandas as pd
import polars as pl
import pyarrow as pa
from pyspark.sql.functions import expr,to_json,col,struct
from functools import reduce
from handle_structs_or_arrays import preprocess_then_expand
import requests
from jinja2 import Template
import json
from secrets_manager import SecretsManager
from WorkflowManager import WorkflowDSL, WorkflowManager
from KnowledgebaseManager import KnowledgebaseManager
from gitea_client import GiteaClient, WorkspaceVersionedContent
from dremio.flight.endpoint import DremioFlightEndpoint
from dremio.flight.query import DremioFlightEndpointQuery
alias_str='abcdefghijklmnopqrstuvwxyz'
workspace = os.getenv('WORKSPACE') or 'exp360cust'
job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4())
sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN'))
secrets = sm.list_secrets(workspace)
gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1')
workspaceVersionedContent=WorkspaceVersionedContent(gitea_client)
conf = SparkConf()
params = {
"spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'),
"spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'),
"spark.hadoop.fs.s3a.aws.region": "us-west-1",
"spark.sql.catalog.dremio.warehouse" : secrets.get('LAKEHOUSE_BUCKET'),
"spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dremio.type" : "hadoop",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,za.co.absa.cobrix:spark-cobol_2.12:2.8.0"
}
conf.setAll(list(params.items()))
spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate()
bootstrap_udfs(spark)
return expr, job_id, lit, preprocess_then_expand, reduce, spark
@app.cell
def actions_audit_reader(spark):
actions_audit_reader_df = spark.read.table('dremio.actionsaudit')
actions_audit_reader_df.createOrReplaceTempView('actions_audit_reader_df')
return (actions_audit_reader_df,)
@app.cell
def actions_audit_mapper(actions_audit_reader_df, job_id, spark):
_actions_audit_mapper_select_clause=actions_audit_reader_df.columns if False else []
_actions_audit_mapper_select_clause.append("DATE(action_date) AS action_date")
_actions_audit_mapper_select_clause.append("sub_category AS service_type")
_actions_audit_mapper_select_clause.append("action_count AS action_count")
actions_audit_mapper_df=spark.sql(("SELECT " + ', '.join(_actions_audit_mapper_select_clause) + " FROM actions_audit_reader_df").replace("{job_id}",f"'{job_id}'"))
actions_audit_mapper_df.createOrReplaceTempView("actions_audit_mapper_df")
return (actions_audit_mapper_df,)
@app.cell
def actions_audit_filter(actions_audit_mapper_df, spark):
print(actions_audit_mapper_df.columns)
actions_audit_filter_df = spark.sql("select * from actions_audit_mapper_df where action_date >= COALESCE((SELECT MAX(DATE(action_date)) FROM dremio.servicemetrics), (SELECT MIN(action_date) FROM actions_audit_mapper_df))")
actions_audit_filter_df.createOrReplaceTempView('actions_audit_filter_df')
return (actions_audit_filter_df,)
@app.cell
def aggregate__3(
actions_audit_filter_df,
expr,
lit,
preprocess_then_expand,
reduce,
):
_params = {
"datasource": "actions_audit_filter",
"selectFunctions" : [{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
}
_df_flat, _grouping_specs, _rewritten_selects = preprocess_then_expand( actions_audit_filter_df,
group_expression="action_date, service_type",
cube="",
rollup="",
grouping_set="",
select_functions=[{'fieldName': 'service_count', 'aggregationFunction': 'SUM(action_count)'}]
)
_agg_exprs = [expr(f["aggregationFunction"]).alias(f["fieldName"])
for f in _rewritten_selects
]
_all_group_cols = list({c for gs in _grouping_specs for c in gs})
_partials = []
for _gs in _grouping_specs:
_gdf = _df_flat.groupBy(*_gs).agg(*_agg_exprs)
for _col in _all_group_cols:
if _col not in _gs:
_gdf = _gdf.withColumn(_col, lit(None))
_partials.append(_gdf)
aggregate__3_df = reduce(lambda a, b: a.unionByName(b), _partials)
aggregate__3_df.createOrReplaceTempView('aggregate__3_df')
return (aggregate__3_df,)
@app.cell
def data_writer__5(aggregate__3_df, spark):
_data_writer__5_fields_to_update = aggregate__3_df.columns
_data_writer__5_set_clause=[]
_data_writer__5_unique_key_clause= []
for _key in ['action_date', 'service_type']:
_data_writer__5_unique_key_clause.append(f't.{_key} = s.{_key}')
for _field in _data_writer__5_fields_to_update:
if(_field not in _data_writer__5_unique_key_clause):
_data_writer__5_set_clause.append(f't.{_field} = s.{_field}')
_merge_query = '''
MERGE INTO dremio.servicemetrics t
USING aggregate__3_df s
ON ''' + ' AND '.join(_data_writer__5_unique_key_clause) + ''' WHEN MATCHED THEN
UPDATE SET ''' + ', '.join(_data_writer__5_set_clause) + ' WHEN NOT MATCHED THEN INSERT *'
spark.sql(_merge_query)
return
if __name__ == "__main__":
app.run()

File diff suppressed because one or more lines are too long