diff --git a/payment_metrics/main.py b/payment_metrics/main.py new file mode 100644 index 0000000..d6e7a33 --- /dev/null +++ b/payment_metrics/main.py @@ -0,0 +1,74 @@ + +__generated_with = "0.13.15" + +# %% + +import sys +sys.path.append('/opt/spark/work-dir/') +from workflow_templates.spark.udf_manager import bootstrap_udfs +from pyspark.sql.functions import udf +from pyspark.sql.functions import lit +from pyspark.sql.types import StringType, IntegerType +import uuid +from pathlib import Path +from pyspark import SparkConf, Row +from pyspark.sql import SparkSession +import os +import pandas as pd +import polars as pl +import pyarrow as pa +from pyspark.sql.functions import expr,to_json,col,struct +from functools import reduce +from handle_structs_or_arrays import preprocess_then_expand +import requests +from jinja2 import Template +import json + + +from secrets_manager import SecretsManager + +from WorkflowManager import WorkflowDSL, WorkflowManager +from KnowledgebaseManager import KnowledgebaseManager +from gitea_client import GiteaClient, WorkspaceVersionedContent + +from dremio.flight.endpoint import DremioFlightEndpoint +from dremio.flight.query import DremioFlightEndpointQuery + + +alias_str='abcdefghijklmnopqrstuvwxyz' +workspace = os.getenv('WORKSPACE') or 'exp360cust' + +job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) + +sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) +secrets = sm.list_secrets(workspace) + +gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') +workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) +conf = SparkConf() +params = { + "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), + "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), + "spark.hadoop.fs.s3a.aws.region": "us-west-1", + "spark.sql.catalog.dremio.warehouse" : 's3://'+ (secrets.get('LAKEHOUSE_BUCKET') or ''), + "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.dremio.type" : "hadoop", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2" +} + + + +conf.setAll(list(params.items())) + +spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() +bootstrap_udfs(spark) + +# %% + + + +data_reader__1_df = spark.read.table('dremio.payments') +data_reader__1_df.createOrReplaceTempView('data_reader__1_df') diff --git a/payment_metrics/main.py.notebook b/payment_metrics/main.py.notebook new file mode 100644 index 0000000..3503f74 --- /dev/null +++ b/payment_metrics/main.py.notebook @@ -0,0 +1,86 @@ +import marimo + +__generated_with = "0.13.15" +app = marimo.App() + + +@app.cell +def init(): + + import sys + sys.path.append('/opt/spark/work-dir/') + from workflow_templates.spark.udf_manager import bootstrap_udfs + from pyspark.sql.functions import udf + from pyspark.sql.functions import lit + from pyspark.sql.types import StringType, IntegerType + import uuid + from pathlib import Path + from pyspark import SparkConf, Row + from pyspark.sql import SparkSession + import os + import pandas as pd + import polars as pl + import pyarrow as pa + from pyspark.sql.functions import expr,to_json,col,struct + from functools import reduce + from handle_structs_or_arrays import preprocess_then_expand + import requests + from jinja2 import Template + import json + + + from secrets_manager import SecretsManager + + from WorkflowManager import WorkflowDSL, WorkflowManager + from KnowledgebaseManager import KnowledgebaseManager + from gitea_client import GiteaClient, WorkspaceVersionedContent + + from dremio.flight.endpoint import DremioFlightEndpoint + from dremio.flight.query import DremioFlightEndpointQuery + + + alias_str='abcdefghijklmnopqrstuvwxyz' + workspace = os.getenv('WORKSPACE') or 'exp360cust' + + job_id = os.getenv("EXECUTION_ID") or str(uuid.uuid4()) + + sm = SecretsManager(os.getenv('SECRET_MANAGER_URL'), os.getenv('SECRET_MANAGER_NAMESPACE'), os.getenv('SECRET_MANAGER_ENV'), os.getenv('SECRET_MANAGER_TOKEN')) + secrets = sm.list_secrets(workspace) + + gitea_client=GiteaClient(os.getenv('GITEA_HOST'), os.getenv('GITEA_TOKEN'), os.getenv('GITEA_OWNER') or 'gitea_admin', os.getenv('GITEA_REPO') or 'tenant1') + workspaceVersionedContent=WorkspaceVersionedContent(gitea_client) + conf = SparkConf() + params = { + "spark.hadoop.fs.s3a.access.key": secrets.get('S3_ACCESS_KEY'), + "spark.hadoop.fs.s3a.secret.key": secrets.get('S3_SECRET_KEY'), + "spark.hadoop.fs.s3a.aws.region": "us-west-1", + "spark.sql.catalog.dremio.warehouse" : 's3://'+ (secrets.get('LAKEHOUSE_BUCKET') or ''), + "spark.sql.catalog.dremio" : "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.dremio.type" : "hadoop", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.jars.packages": "com.amazonaws:aws-java-sdk-bundle:1.12.262,com.github.ben-manes.caffeine:caffeine:3.2.0,org.apache.iceberg:iceberg-aws-bundle:1.8.1,org.apache.iceberg:iceberg-common:1.8.1,org.apache.iceberg:iceberg-core:1.8.1,org.apache.iceberg:iceberg-spark:1.8.1,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-cloud-storage:3.3.4,org.apache.hadoop:hadoop-client-runtime:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2" + } + + + + conf.setAll(list(params.items())) + + spark = SparkSession.builder.appName(workspace).config(conf=conf).getOrCreate() + bootstrap_udfs(spark) + return (spark,) + + +@app.cell +def data_reader__1(spark): + + + + data_reader__1_df = spark.read.table('dremio.payments') + data_reader__1_df.createOrReplaceTempView('data_reader__1_df') + return + + +if __name__ == "__main__": + app.run() diff --git a/payment_metrics/main.workflow b/payment_metrics/main.workflow new file mode 100644 index 0000000..35898de --- /dev/null +++ b/payment_metrics/main.workflow @@ -0,0 +1 @@ +{"version":"v1alpha","kind":"VisualBuilder","metadata":{"name":"payment_metrics","description":"Workflow to generate payment metrics.","runtime":"spark"},"spec":{"ui":{"edges":[],"nodes":[{"id":"data-reader__1","type":"workflowNode","position":{"x":618.5,"y":-49.5},"data":{"nodeType":"data-reader","id":"data-reader__1"},"measured":{"width":320,"height":110}}],"nodesData":{"data-reader__1":{"isDefault":false,"name":"data_reader__1","type":"SparkIcebergReader","table_name":"payments","iceberg_catalog":"dremio","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"options":{},"typeLabel":"Spark"}}},"blocks":[{"name":"data_reader__1","type":"SparkIcebergReader","options":{},"isDefault":false,"table_name":"payments","iceberg_catalog":"dremio","region":"us-west-1","credentials":{"accessKey":"S3_ACCESS_KEY","secretKey":"S3_SECRET_KEY"},"typeLabel":"Spark"}]}} \ No newline at end of file