Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

[Merged by Bors] - Add demo data-warehouse-iceberg-trino-spark #144

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
08be276
Add demo data-warehouse-iceberg-trino-spark
sbernauer Oct 12, 2022
e69018b
Fewer but larger batches
sbernauer Oct 12, 2022
1419ed6
docs
sbernauer Oct 13, 2022
82c3afd
Add some smart city datasets
sbernauer Oct 13, 2022
afd4f72
Microbatch every 5 mins
sbernauer Oct 14, 2022
52ae7e2
Add basic Superset dashboards
sbernauer Oct 14, 2022
9abe4a1
Merge branch 'main' into demo-data-warehouse-iceberg-trino-spark
sbernauer Oct 14, 2022
3062492
Bump minio version
sbernauer Oct 14, 2022
dc9e85b
Update dashboards
sbernauer Oct 17, 2022
620e926
Spark: Switch to unified resource struct after problems are fixed
sbernauer Oct 20, 2022
feccb37
Rename release to 22.09-latest-trino-spark
sbernauer Oct 20, 2022
20b9072
Starting point of docs
sbernauer Oct 20, 2022
d107c53
Merge branch 'main' into demo-data-warehouse-iceberg-trino-spark
sbernauer Oct 27, 2022
e70a65f
Update minio name
sbernauer Oct 27, 2022
64ca156
Add MinIO and NiFi to docs
sbernauer Oct 27, 2022
9538515
Docs on DBeaver
sbernauer Oct 28, 2022
13ac8a3
Docs on Superset
sbernauer Nov 2, 2022
dc95271
Larger MinIO pvcs
sbernauer Nov 2, 2022
c90f78e
Merge branch 'main' into demo-data-warehouse-iceberg-trino-spark
sbernauer Nov 2, 2022
ce2a040
Mention iceberg
sbernauer Nov 2, 2022
81d8654
Merge branch 'demo-data-warehouse-iceberg-trino-spark' of github.com:…
sbernauer Nov 2, 2022
d00bc9e
Fix minio svc name
sbernauer Nov 3, 2022
958f46f
Add setup-superset.yaml
sbernauer Nov 3, 2022
dd9fd28
Add view warehouse.smart_city.shared_bikes_station_status_joined
sbernauer Nov 3, 2022
4abc206
Update docs/modules/ROOT/pages/demos/data-warehouse-iceberg-trino-spa…
sbernauer Nov 3, 2022
6d72cc3
Update docs/modules/ROOT/pages/demos/data-warehouse-iceberg-trino-spa…
sbernauer Nov 3, 2022
7a0b592
Apply suggestions from code review
sbernauer Nov 3, 2022
176ac1c
wording
sbernauer Nov 3, 2022
28f4ed3
wording
sbernauer Nov 3, 2022
668efc7
wording
sbernauer Nov 3, 2022
ff742ac
typo
sbernauer Nov 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions demos/data-warehouse-iceberg-trino-spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.2.0

RUN curl -L -O http://search.maven.org/remotecontent?filepath=org/apache/ivy/ivy/2.5.0/ivy-2.5.0.jar
RUN java -jar ivy-2.5.0.jar -notransitive \
-dependency org.apache.spark spark-sql-kafka-0-10_2.12 3.3.0 \
-retrieve "/stackable/spark/jars/[artifact]-[revision](-[classifier]).[ext]"
RUN java -jar ivy-2.5.0.jar -confs compile \
-dependency org.apache.spark spark-sql-kafka-0-10_2.12 3.3.0 \
-retrieve "/stackable/spark/jars/[artifact]-[revision](-[classifier]).[ext]"
RUN java -jar ivy-2.5.0.jar -notransitive \
-dependency org.apache.iceberg iceberg-spark-runtime-3.3_2.12 0.14.1 \
-retrieve "/stackable/spark/jars/[artifact]-[revision](-[classifier]).[ext]"
RUN java -jar ivy-2.5.0.jar -confs compile \
-dependency org.apache.iceberg iceberg-spark-runtime-3.3_2.12 0.14.1 \
-retrieve "/stackable/spark/jars/[artifact]-[revision](-[classifier]).[ext]"
6,207 changes: 6,207 additions & 0 deletions demos/data-warehouse-iceberg-trino-spark/WarehouseKafkaIngest.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: create-nifi-ingestion-job
spec:
template:
spec:
serviceAccountName: demo-serviceaccount
initContainers:
- name: wait-for-testdata
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
command: ["bash", "-c", "echo 'Waiting for all kafka brokers to be ready' && kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/instance=kafka -l app.kubernetes.io/name=kafka"]
containers:
- name: create-nifi-ingestion-job
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
command: ["bash", "-c", "curl -O https://raw.githubusercontent.com/stackabletech/stackablectl/demo-data-warehouse-iceberg-trino-spark/demos/data-warehouse-iceberg-trino-spark/WarehouseKafkaIngest.xml && python -u /tmp/script/script.py"]
volumeMounts:
- name: script
mountPath: /tmp/script
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumes:
- name: script
configMap:
name: create-nifi-ingestion-job-script
restartPolicy: OnFailure
backoffLimit: 50
---
apiVersion: v1
kind: ConfigMap
metadata:
name: create-nifi-ingestion-job-script
data:
script.py: |
from nipyapi.canvas import get_root_pg_id, schedule_process_group, list_all_controllers, schedule_controller
from nipyapi.security import service_login
from nipyapi.templates import get_template, upload_template, deploy_template
import nipyapi
import os
import urllib3

# As of 2022-08-29 we cant use "https://nifi:8443" here because <h2>The request contained an invalid host header [<code>nifi:8443</code>] in the request [<code>/nifi-api</code>]. Check for request manipulation or third-party intercept.</h2>
ENDPOINT = f"https://nifi-node-default-0.nifi-node-default.{os.environ['NAMESPACE']}.svc.cluster.local:8443" # For local testing / developing replace it, afterwards change back to f"https://nifi-node-default-0.nifi-node-default.{os.environ['NAMESPACE']}.svc.cluster.local:8443"
USERNAME = "admin"
PASSWORD = "adminadmin"
TEMPLATE_NAME = "WarehouseKafkaIngest"
TEMPLATE_FILE = f"{TEMPLATE_NAME}.xml"

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

nipyapi.config.nifi_config.host = f"{ENDPOINT}/nifi-api"
nipyapi.config.nifi_config.verify_ssl = False

print("Logging in")
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")

pg_id = get_root_pg_id()

upload_template(pg_id, TEMPLATE_FILE)

template_id = get_template(TEMPLATE_NAME).id
deploy_template(pg_id, template_id, 200, 0)

for controller in list_all_controllers():
schedule_controller(controller, scheduled=True)

schedule_process_group(pg_id, scheduled=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
# We can't simply create the SparkApplication object here as we have to wait for Kafka to be ready because
# * We currently don't restart failed Spark applications (see https://github.com/stackabletech/spark-k8s-operator/issues/157)
# * We currently auto-create topics and we need all the brokers to be available so that the topic is distributed among all the brokers
---
apiVersion: batch/v1
kind: Job
metadata:
name: create-spark-ingestion-job
spec:
template:
spec:
serviceAccountName: demo-serviceaccount
initContainers:
- name: wait-for-testdata
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
command: ["bash", "-c", "echo 'Waiting for all kafka brokers to be ready' && kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/instance=kafka -l app.kubernetes.io/name=kafka"]
containers:
- name: create-spark-ingestion-job
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
command: ["bash", "-c", "echo 'Submitting Spark job' && kubectl apply -f /tmp/manifest/spark-ingestion-job.yaml"]
volumeMounts:
- name: manifest
mountPath: /tmp/manifest
volumes:
- name: manifest
configMap:
name: create-spark-ingestion-job-manifest
restartPolicy: OnFailure
backoffLimit: 50
---
apiVersion: v1
kind: ConfigMap
metadata:
name: create-spark-ingestion-job-manifest
data:
spark-ingestion-job.yaml: |
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: spark-ingest-into-warehouse
spec:
version: "1.0"
sparkImage: docker.stackable.tech/sbernauer/pyspark-k8s-with-iceberg:latest3 # docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.2.0
mode: cluster
mainApplicationFile: local:///stackable/spark/jobs/spark-ingest-into-warehouse.py
# deps:
# packages:
# - org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:0.14.1
# - org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0
sparkConf:
spark.hadoop.fs.s3a.endpoint: http://minio-trino:9000
spark.hadoop.fs.s3a.path.style.access: "true"
spark.hadoop.fs.s3a.access.key: trino
spark.hadoop.fs.s3a.secret.key: trinotrino
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.warehouse: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.warehouse.type: hive
spark.sql.catalog.warehouse.uri: thrift://hive-iceberg:9083
volumes:
- name: script
configMap:
name: write-iceberg-table-script
job:
resources:
cpu:
min: "100m"
max: "1"
driver:
resources:
cpu:
min: "1"
max: "1"
memory:
limit: "2Gi"
volumeMounts:
- name: script
mountPath: /stackable/spark/jobs
executor:
instances: 4
resources:
cpu:
min: "2"
max: "4"
memory:
limit: "12Gi"
volumeMounts:
- name: script
mountPath: /stackable/spark/jobs
---
apiVersion: v1
kind: ConfigMap
metadata:
name: write-iceberg-table-script
data:
spark-ingest-into-warehouse.py: |
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType, DoubleType, BooleanType, TimestampType, MapType, ArrayType
from pyspark.sql.functions import col, from_json, expr
import time
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("spark-ingest-into-warehouse").getOrCreate()
# spark.sparkContext.setLogLevel("DEBUG")

spark.sql("CREATE SCHEMA IF NOT EXISTS warehouse.water_levels LOCATION 's3a://warehouse/water-levels/'")
spark.sql("CREATE SCHEMA IF NOT EXISTS warehouse.smart_city LOCATION 's3a://warehouse/smart-city/'")

# Todo add PARTITIONED BY (days(timestamp))
# Currently fails with org.apache.spark.sql.AnalysisException: days(timestamp) ASC NULLS FIRST is not currently supported
# Don't forget to add option("fanout-enabled", "true") to iceberg sink as well
# see https://github.com/apache/iceberg/issues/5625
spark.sql("CREATE TABLE IF NOT EXISTS warehouse.water_levels.measurements (station_uuid string, timestamp timestamp, value float) USING iceberg")
spark.sql("CREATE TABLE IF NOT EXISTS warehouse.water_levels.stations (uuid string, number bigint, short_name string, long_name string, km float, agency string, latitude double, longitude double, water_short_name string, water_long_name string) USING iceberg")
spark.sql("CREATE TABLE IF NOT EXISTS warehouse.smart_city.shared_bikes_bike_status (bike_id string, vehicle_type_id string, latitude double, longitude double, is_reserved boolean, is_disabled boolean, last_reported timestamp) USING iceberg")
spark.sql("CREATE TABLE IF NOT EXISTS warehouse.smart_city.shared_bikes_station_information (station_id string, name string, latitude double, longitude double) USING iceberg")
spark.sql("CREATE TABLE IF NOT EXISTS warehouse.smart_city.shared_bikes_station_status (station_id string, num_bikes_available short, is_installed boolean, is_renting boolean, is_returning boolean, vehicle_types_available map<string,short>, last_reported timestamp) USING iceberg")

schema = StructType([
StructField("station_uuid", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("value", FloatType(), True),
])
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "water_levels_measurements") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 50000000) \
.load() \
.selectExpr("cast(key as string)", "cast(value as string)") \
.withColumn("json", from_json(col("value"), schema)) \
.select("json.station_uuid", "json.timestamp", "json.value") \
.writeStream \
.queryName("ingest water_level measurements") \
.format("iceberg") \
.outputMode("append") \
.trigger(processingTime='5 minutes') \
.option("path", "warehouse.water_levels.measurements") \
.option("checkpointLocation", "s3a://warehouse/water-levels/measurements/checkpoints") \
.start()

schema = StructType([ \
StructField("uuid", StringType(), True), \
StructField("number", StringType(), True), \
StructField("shortname", StringType(), True), \
StructField("longname", StringType(), True), \
StructField("km", FloatType(), True), \
StructField("agency", StringType(), True), \
StructField("latitude", DoubleType(), True), \
StructField("longitude", DoubleType(), True), \
StructField("water", \
StructType([StructField("shortname", StringType(), True), StructField("longname", StringType(), True)]), \
True), \
])
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "water_levels_stations") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 10000) \
.load() \
.selectExpr("cast(key as string)", "cast(value as string)") \
.withColumn("json", from_json(col("value"), schema)) \
.selectExpr( \
"json.uuid", \
"cast(json.number as bigint) as number", \
"json.shortname as short_name", \
"json.longname as long_name", \
"json.km", "json.agency", \
"json.latitude", \
"json.longitude", \
"json.water.shortname as water_short_name", \
"json.water.longname as water_long_name" \
) \
.writeStream \
.queryName("ingest water_level stations") \
.format("iceberg") \
.outputMode("append") \
.trigger(processingTime='2 minutes') \
.option("path", "warehouse.water_levels.stations") \
.option("checkpointLocation", "s3a://warehouse/water-levels/stations/checkpoints") \
.start()

schema = StructType([ \
StructField("station_id", StringType(), True), \
StructField("lat", DoubleType(), True), \
StructField("lon", DoubleType(), True), \
StructField("name", StringType(), True), \
])
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "shared_bikes_station_information") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 10000) \
.load() \
.selectExpr("cast(key as string)", "cast(value as string)") \
.withColumn("json", from_json(col("value"), schema)) \
.selectExpr("json.station_id", "json.name as name", "json.lat as latitude", "json.lon as longitude") \
.writeStream \
.queryName("ingest smart_city shared_bikes_station_information") \
.format("iceberg") \
.outputMode("append") \
.trigger(processingTime='2 minutes') \
.option("path", "warehouse.smart_city.shared_bikes_station_information") \
.option("checkpointLocation", "s3a://warehouse/smart-city/shared_bikes_station_information/checkpoints") \
.start()

schema = StructType([ \
StructField("station_id", StringType(), True), \
StructField("is_installed", BooleanType(), True), \
StructField("last_reported", TimestampType(), True), \
StructField("num_bikes_available", ShortType(), True), \
StructField("is_renting", BooleanType(), True), \
StructField("is_returning", BooleanType(), True), \
StructField("vehicle_types_available", ArrayType(StructType([StructField("count", ShortType(), True), StructField("vehicle_type_id", StringType(), True)]), True), True), \
])
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "shared_bikes_station_status") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 10000) \
.load() \
.selectExpr("cast(key as string)", "cast(value as string)") \
.withColumn("json", from_json(col("value"), schema)) \
.selectExpr( \
"json.station_id", \
"json.num_bikes_available", \
"json.is_installed", \
"json.is_renting", \
"json.is_returning", \
"map_from_arrays(json.vehicle_types_available.vehicle_type_id, json.vehicle_types_available.count) as vehicle_types_available", \
"json.last_reported" \
) \
.writeStream \
.queryName("ingest smart_city shared_bikes_station_status") \
.format("iceberg") \
.outputMode("append") \
.trigger(processingTime='2 minutes') \
.option("path", "warehouse.smart_city.shared_bikes_station_status") \
.option("checkpointLocation", "s3a://warehouse/smart-city/shared_bikes_station_status/checkpoints") \
.start()

schema = StructType([ \
StructField("lat", DoubleType(), True), \
StructField("lon", DoubleType(), True), \
StructField("bike_id", StringType(), True), \
StructField("is_reserved", BooleanType(), True), \
StructField("is_disabled", BooleanType(), True), \
StructField("vehicle_type_id", StringType(), True), \
StructField("last_reported", TimestampType(), True), \
])
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "shared_bikes_bike_status") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 10000) \
.load() \
.selectExpr("cast(key as string)", "cast(value as string)") \
.withColumn("json", from_json(col("value"), schema)) \
.selectExpr("json.bike_id", "json.vehicle_type_id", "json.lat as latitude", "json.lon as longitude", "json.is_reserved", "json.is_disabled", "json.last_reported") \
.writeStream \
.queryName("ingest smart_city shared_bikes_bike_status") \
.format("iceberg") \
.outputMode("append") \
.trigger(processingTime='2 minutes') \
.option("path", "warehouse.smart_city.shared_bikes_bike_status") \
.option("checkpointLocation", "s3a://warehouse/smart-city/shared_bikes_bike_status/checkpoints") \
.start()

# key: table name
# value: compaction strategy
tables_to_compact = {
# "water_levels.measurements": ", strategy => 'sort', sort_order => 'station_uuid ASC NULLS LAST,timestamp DESC NULLS LAST'",
"water_levels.measurements": "",
"water_levels.stations": "",
"warehouse.smart_city.shared_bikes_station_information": "",
"warehouse.smart_city.shared_bikes_station_status": "",
"warehouse.smart_city.shared_bikes_bike_status": "",
}

while True:
expire_before = (datetime.now() - timedelta(hours=4)).strftime("%Y-%m-%d %H:%M:%S")
for table, table_compaction_strategy in tables_to_compact.items():
print(f"[{table}] Expiring snapshots older than 4 hours ({expire_before})")
spark.sql(f"CALL warehouse.system.expire_snapshots(table => '{table}', older_than => TIMESTAMP '{expire_before}', retain_last => 50, stream_results => true)")

print(f"[{table}] Removing orphaned files")
spark.sql(f"CALL warehouse.system.remove_orphan_files(table => '{table}')")

print(f"[{table}] Starting compaction")
spark.sql(f"CALL warehouse.system.rewrite_data_files(table => '{table}'{table_compaction_strategy})")
print(f"[{table}] Finished compaction")

print("All tables compacted. Waiting 25min before scheduling next run...")
time.sleep(25 * 60) # Assuming compaction takes 5 min run every 30 minutes
Loading