Implementing the Data Pipeline¶
Learning Objectives¶
By the end of this chapter, you will:
- Understand how to configure Kafka sources to ingest game events
- Learn to set up RisingWave materialized views for stream processing
- Master the configuration of Iceberg sinks for persistent storage
- Verify the end-to-end data flow of your streaming pipeline
- Troubleshoot common issues in the data pipeline setup
Context and Overview¶
In the previous chapter, we designed our Iceberg schema for game analytics. Now, we'll implement the data pipeline that will process our game events and store them in these tables.
Our implementation follows a structured approach:
- Configure Kafka as the source of streaming data
- Create materialized views in RisingWave for processing the data
- Set up Iceberg tables as sinks for persistent storage
- Verify the pipeline's operation with testing and validation
Note
The complete data pipeline configuration is generated using Ansible playbooks, which ensures consistency and reproducibility of the setup.
Setting Up the Pipeline¶
Step 1: Generate Configuration Scripts¶
First, we'll generate the necessary SQL scripts using our Ansible playbook:
This playbook creates SQL files with the configuration for: - Kafka sources - RisingWave materialized views - Iceberg sinks
The generated scripts will be placed in the $PROJECT_HOME/scripts/
directory.
Template Variables
You'll notice that the SQL scripts contain placeholders in the format of {{ variable_name }}
. These are Jinja2 template variables that are replaced with actual values during script generation. The values for these variables are defined in the application settings file at polaris-forge-setup/defaults/main.yml.
Step 2: Create an .env
file with the following contents¶
## psql -h 0.0.0.0 -p 4566 -d dev -U root
PGHOST=localhost
PGPORT=14567
PGDATABASE=dev
PGUSER=root
PGPASSWORD=
AWS_ENDPOINT_URL=http://minio.minio:19000
AWS_ACCESS_KEY_ID=test
AWS_SECRET_ACCESS_KEY=test
AWS_REGION=us-east-1
CATALOG_NAME="balloon-game"
CATALOG_NAMESPACE="game_events"
ICEBERG_REST_URI=http://localhost:18181/api/catalog
KAFKA_BOOTSTRAP_SERVERS=localhost:19094
RPK_BROKERS=localhost:19094
Step 3: Configure Kafka Sources¶
Next, we'll set up the Kafka source to ingest game events:
The source.sql
script is generated from a Jinja template that creates a RisingWave source connecting to our Kafka topic, along with the necessary materialized views. Here's what the source configuration looks like:
-- source events from Kafka
CREATE SOURCE IF NOT EXISTS balloon_game_events (
player string,
balloon_color string,
score integer,
page_id integer,
favorite_color_bonus boolean,
event_ts timestamptz
)
WITH (
connector='kafka',
topic='{{ balloon_game_kafka_topic }}',
properties.bootstrap.server='{{ balloon_game_kafka_bootstrap_servers }}',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;
This configuration:
- Defines the schema for our game events
- Connects to the specified Kafka topic through the template variable
- Specifies JSON as the message format
- Sets the scan mode to start from the latest available message (real-time processing)
The same template also defines our materialized views, such as the leaderboard view:
-- Leaderboard stats
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_leaderboard AS
SELECT
player,
sum(score) as total_score,
count(case when favorite_color_bonus = true then 1 end) as bonus_hits,
max(event_ts) as event_ts
FROM balloon_game_events
GROUP BY player;
Step 4: Create RisingWave Materialized Views¶
The materialized views are already defined in the same Jinja template as the source. These views transform our raw event data into the format expected by our Iceberg tables.
Here are the key materialized views defined in the template:
-- Overall Color Stats
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_balloon_color_stats AS
SELECT
player,
balloon_color,
count(*) as balloon_pops,
sum(score) as points_by_color,
count(CASE WHEN favorite_color_bonus = true THEN 1 END) as bonus_hits,
max(event_ts) as event_ts
FROM balloon_game_events
GROUP BY
player,
balloon_color;
-- Timeseries
-- Leaderboard over window of 15 seconds
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_realtime_scores AS
SELECT
player,
sum(score) as total_score,
window_start,
window_end
FROM TUMBLE(balloon_game_events, event_ts, INTERVAL '15 SECONDS')
GROUP BY
player,
window_start,
window_end;
-- Analyze the various balloon_color pops in 15 seconds
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_balloon_colored_pops AS
SELECT
player,
balloon_color,
count(*) as balloon_pops,
sum(score) as points_by_color,
count(CASE WHEN favorite_color_bonus = true THEN 1 END) as bonus_hits,
window_start,
window_end
FROM TUMBLE(balloon_game_events, event_ts, INTERVAL '15 SECONDS')
GROUP BY
player,
balloon_color,
window_start,
window_end;
-- Color based performance
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_color_performance_trends AS
SELECT
balloon_color,
avg(score) as avg_score_per_pop,
count(*) as total_pops,
window_start,
window_end
FROM TUMBLE(balloon_game_events, event_ts, INTERVAL '15 SECONDS')
GROUP BY
balloon_color,
window_start,
window_end;
Note the use of TUMBLE
windows to create 15-second time slices for our time-series analysis. This allows us to track how performance changes over time in a consistent way.
Step 5: Configure Iceberg Sinks¶
Now that we have our materialized views set up, we can configure the sinks that will write data to our Iceberg tables:
The sink.sql
script is also generated from a Jinja template and creates sinks for each of our materialized views. Here's an example sink configuration:
CREATE SINK IF NOT EXISTS leaderboard
FROM mv_leaderboard
WITH (
connector='iceberg',
type = 'append-only',
force_append_only='true',
database.name = '{{ balloon_game_db }}',
table.name = 'leaderboard',
warehouse.path = '{{ plf_catalog_name }}',
catalog.name = '{{ plf_catalog_name }}',
catalog.type = 'rest',
catalog.uri = '{{ plf_polaris_catalog_uri }}',
catalog.credential = '{{ principal_client_id }}:{{ principal_client_secret }}',
catalog.scope='PRINCIPAL_ROLE:ALL',
s3.endpoint = '{{ plf_aws_endpoint_url }}',
s3.access.key = '{{ plf_aws_access_key_id | default("test") }}',
s3.secret.key = '{{ plf_aws_secret_access_key | default("test") }}',
s3.region = '{{ plf_aws_region }}',
s3.path.style.access = 'true'
);
This configuration:
- Creates a sink named
leaderboard
from themv_leaderboard
materialized view - Uses the Iceberg connector to write to our Iceberg table
- Specifies
append-only
type withforce_append_only
set to true for better performance - Uses template variables for database and catalog configuration
- Configures S3 storage with the appropriate endpoint, region, and credentials
- Sets up Apache Polaris as the REST catalog service
- Provides proper authentication and access control through credentials and scope
Similar sink configurations are created for each of our materialized views, including balloon_color_stats
, realtime_scores
, balloon_colored_pops
, and color_performance_trends
.
Verifying the Pipeline¶
After setting up the pipeline, it's important to verify that everything is working correctly.
Check RisingWave Configuration¶
Run the following command to shell into the RisingWave interactive shell:
List All Sources¶
Expected output:
List All Materialized Views¶
Expected output:
dev=> show materialized views;
Name
-----------------------------
mv_balloon_colored_pops
mv_leaderboard
mv_color_performance_trends
mv_balloon_color_stats
mv_realtime_scores
(5 rows)
List All Sinks¶
Expected output:
dev=> show sinks;
Name
--------------------------
color_performance_trends
realtime_scores
balloon_colored_pops
balloon_color_stats
leaderboard
(5 rows)
Troubleshooting¶
Template Variable Issues¶
If you encounter errors related to the Jinja template variables:
- Make sure all required variables are defined in your environment or Ansible variables
- Verify that the Ansible playbook has access to the necessary credentials
- Check the generated SQL scripts to ensure variables were properly substituted
Schema Mismatch Errors¶
If you see schema mismatch errors:
- Compare the schema in the materialized view with the Iceberg table schema
- Ensure data types are compatible between RisingWave and Iceberg
- You may need to drop and recreate the sink with the correct schema
Key Takeaways¶
When implementing a streaming data pipeline for game analytics, remember these principles:
- Start simple: Begin with a basic pipeline and add complexity incrementally
- Verify each step: Test each component individually before connecting them
- Monitor continuously: Set up logging and monitoring to catch issues early
- Use configuration management: Generate configuration with tools like Ansible to ensure consistency
In the next chapter, we'll explore how to build interactive dashboards on top of our Iceberg tables to visualize the game analytics data.