content[-]
Project Link github

kafka producer
import asyncio
import json
import time
from datetime import datetime
import requests
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
API_URL = "https://api.themeparks.wiki/v1/entity/6e1464ca-1e9b-49c3-8937-c5c6f6675057/live"
POLL_INTERVAL_SECONDS = 300
ATTRACTION_SCHEMA = {
"title": "AttractionData",
"type": "object",
"properties": {
"entityId": {"type": "string"},
"timestamp_ms": {"type": "integer"},
"status": {"type": "string"},
"name": {"type": "string"},
"waitTime": {"type": "integer"},
"entityType": {"type": "string"}
},
"required": ["entityId", "timestamp_ms", "status", "name", "waitTime", "entityType"]
}
class ThemeParkProducer:
def __init__(self, config):
kafka_config = config.get_kafka_config()
self.producer = Producer(kafka_config)
self.topic = config.get_topic_names()['raw_data']
self.running = False
schema_registry_config = config.get_schema_registry_config()
self.schema_registry_client = SchemaRegistryClient(schema_registry_config)
self.json_serializer = JSONSerializer(
json.dumps(ATTRACTION_SCHEMA),
self.schema_registry_client
)
def delivery_callback(self, err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}]')
async def fetch_and_produce(self):
self.running = True
print(f"Starting theme park data producer...")
while self.running:
try:
timestamp_ms = int(time.time() * 1000)
response = requests.get(API_URL, timeout=10)
response.raise_for_status()
data = response.json()
live_entities = data.get('liveData', [])
count = 0
for entity_data in live_entities:
wait_time = entity_data.get('queue', {}).get('STANDBY', {}).get('waitTime', 0)
if entity_data.get('entityType') == 'ATTRACTION' and wait_time is not None:
event = {
"entityId": entity_data.get('id'),
"timestamp_ms": timestamp_ms,
"status": entity_data.get('status'),
"name": entity_data.get('name'),
"waitTime": int(wait_time),
"entityType": entity_data.get('entityType')
}
serialized_data = self.json_serializer(
event,
SerializationContext(self.topic, MessageField.VALUE)
)
self.producer.produce(
self.topic,
key=str(event['entityId']).encode('utf-8'),
value=serialized_data,
callback=self.delivery_callback
)
print(f" {event['name']} | {event['status']} | {event['waitTime']} min")
count += 1
self.producer.flush()
print(f" Produced {count} records. Sleeping {POLL_INTERVAL_SECONDS}s...\n")
await asyncio.sleep(POLL_INTERVAL_SECONDS)
except requests.exceptions.RequestException as e:
print(f"API Error: {e}")
await asyncio.sleep(60)
except Exception as e:
print(f" Error: {e}")
await asyncio.sleep(60)
def stop(self):
self.running = False
self.producer.flush()
themepark_producer = None
Kakfa consumer
import asyncio
import json
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
class ThemeParkConsumer:
def __init__(self, config):
consumer_config = config.get_kafka_config()
consumer_config['group.id'] = config.get_consumer_group()
consumer_config['auto.offset.reset'] = 'earliest'
consumer_config['enable.auto.commit'] = True
self.consumer = Consumer(consumer_config)
self.topics = [config.get_topic_names()['avg_waittime']]
self.running = False
self.avg_waittimes = {}
self.callbacks = []
schema_registry_config = config.get_schema_registry_config()
self.schema_registry_client = SchemaRegistryClient(schema_registry_config)
# Try Avro deserializer (Flink uses Avro by default)
self.avro_deserializer = AvroDeserializer(
self.schema_registry_client,
from_dict=lambda obj, ctx: obj
)
def add_callback(self, callback):
self.callbacks.append(callback)
def start_consuming(self):
self.running = True
self.consumer.subscribe(self.topics)
async def consume_loop():
print(f"Consumer started for topics: {self.topics}")
while self.running:
try:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
await asyncio.sleep(0.1)
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f" Consumer error: {msg.error()}")
continue
# Decode key
try:
if msg.key():
key = self.avro_deserializer(msg.key(), SerializationContext(msg.topic(), MessageField.KEY))
if isinstance(key, dict):
entity_id = key.get('entityId')
else:
entity_id = str(key)
else:
entity_id = None
except:
entity_id = msg.key().decode('utf-8') if msg.key() else None
# Try Avro first (Flink default), then JSON fallback
value = None
try:
value = self.avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
print(f" Avro deserialized: {value}")
except Exception as e:
print(f" Avro failed: {e}")
try:
# Fallback to plain JSON
value = json.loads(msg.value().decode('utf-8'))
print(f" JSON deserialized: {value}")
except Exception as e2:
print(f"❌ JSON also failed: {e2}")
continue
if value:
avg_waittime = value.get('avg_waittime')
name = value.get('name', entity_id)
self.avg_waittimes[entity_id] = {
'entityId': entity_id,
'name': name,
'avg_waittime': avg_waittime
}
self.avg_waittimes[entity_id] = {
'entityId': entity_id,
'name': name,
'avg_waittime': avg_waittime
}
print(f" {name}: Avg Wait = {avg_waittime:.1f} min")
for callback in self.callbacks:
callback(list(self.avg_waittimes.values()))
except Exception as e:
print(f" Error consuming: {e}")
await asyncio.sleep(1)
asyncio.create_task(consume_loop())
def get_avg_waittimes(self):
return list(self.avg_waittimes.values())
def stop_consuming(self):
self.running = False
self.consumer.close()
themepark_consumer = None
flink sql
-- Create Kafka source table for raw themepark data
CREATE TABLE themepark_raw (
entityId STRING,
timestamp_ms BIGINT,
status STRING,
name STRING,
waitTime INT,
entityType STRING,
event_time AS TO_TIMESTAMP_LTZ(timestamp_ms, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'themepark_raw',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-themepark-analytics',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- Create Kafka sink table for average wait times
CREATE TABLE attraction_avg_waittime (
entityId STRING,
name STRING,
avg_waittime DOUBLE,
PRIMARY KEY (entityId) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'attraction_avg_waittime',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
-- Calculate 20-minute sliding window (updates every 5 minutes)
INSERT INTO attraction_avg_waittime
SELECT
entityId,
REGEXP_REPLACE(name, '“|”|’','') AS name,
AVG(CAST(waitTime AS DOUBLE)) AS avg_waittime
FROM TABLE(
HOP(TABLE themepark_raw, DESCRIPTOR(event_time), INTERVAL '5' MINUTES, INTERVAL '20' MINUTES)
)
WHERE status = 'OPERATING'
GROUP BY entityId, REGEXP_REPLACE(name, '“|”|’', ''), window_start, window_end;
docker-compose.yml
services:
# 1. Apache Zookeeper (REQUIRED for legacy Kafka mode)
# zookeeper:
# image: confluentinc/cp-zookeeper:7.5.0
# container_name: zookeeper
# environment:
# ZOOKEEPER_CLIENT_PORT: 2181
# ZOOKEEPER_TICK_TIME: 2000
# ports:
# - "2181:2181"
# 2. Apache Kafka Broker (Using Zookeeper for Coordination)
kafka:
image: confluentinc/cp-kafka:latest # Using a reliable Kafka image
container_name: kafka
networks:
- themepark-network
ports:
# Map host port 9092 to container port 9092 for external clients
- "9092:9092"
- "29092:29092"
environment:
# Unique ID for this Kafka instance
KAFKA_NODE_ID: 1
# Enable KRaft mode and define roles as both broker and controller
KAFKA_PROCESS_ROLES: 'broker,controller'
# Define the listeners for internal/external communication
#KAFKA_LISTENERS: 'PLAINTEXT://:9092,CONTROLLER://:9093'
KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093
# Address that clients outside the Docker network will use to connect
#KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
#KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# Controller quorum voters list (self-reference in single-node setup)
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'
# Cluster ID (can be generated, or it will auto-generate)
CLUSTER_ID: 'ABCDEF-GHIJ-KLMN-OPQR'
# Recommended for single-node development
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
volumes:
- kafka_data:/var/lib/kafka/data
# 2. MinIO (The Iceberg Data Storage - S3 compatible)
minio:
image: minio/minio:latest
container_name: minio
ports:
- "9000:9000"
- "9999:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: minio server /data --console-address ":9001"
volumes:
- minio_data:/data
# 3. Flink JobManager (The Processor Coordinator)
flink-jobmanager:
build:
context: ./flink
dockerfile: Dockerfile
container_name: flink-jobmanager
networks:
- themepark-network
expose:
- "8081"
ports:
- "8081:8081" # Flink Web UI
volumes:
- ./sql:/opt/sql
# FIX: Revert to the simple jobmanager command that keeps the container alive
command: jobmanager
environment:
# --- Standard Flink Properties ---
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
# Flink properties for S3 access (Fallback 1)
s3.endpoint: http://minio:9000
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.path-style-access: true
# Hadoop Configuration Properties (Fallback 2, for Flink's configuration loading)
hadoop-conf.fs.s3a.connection.establish.timeout: 500000
hadoop-conf.fs.s3a.connection.timeout: 10000000
hadoop-conf.fs.s3a.attempts: 50
HADOOP_CONF_fs_s3a_endpoint: http://minio:9000
depends_on:
- kafka
- minio
# 4. Flink TaskManager (The Worker)
flink-taskmanager:
build:
context: ./flink
dockerfile: Dockerfile
image: flink-custom
container_name: flink-taskmanager
networks:
- themepark-network
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 30
s3.endpoint: http://minio:9000
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.path-style-access: true
depends_on:
- flink-jobmanager
# 5. Backend API (FastAPI with Producer & Consumer)
backend:
build:
context: ./backend
dockerfile: Dockerfile
container_name: themepark-backend
networks:
- themepark-network
ports:
- "8002:8002"
environment:
KAFKA_BROKER: kafka:9092
RAW_TOPIC: themepark_raw
AVG_TOPIC: attraction_avg_waittime
CONSUMER_GROUP: themepark-consumer-local
depends_on:
- kafka
restart: on-failure
# 6. Frontend (React + Vite)
frontend:
image: node:20-alpine
container_name: themepark-frontend
working_dir: /app
volumes:
- ../frontend:/app
ports:
- "5173:5173"
command: sh -c "npm install npm run dev -- --host 0.0.0.0"
environment:
- VITE_API_URL=http://localhost:8002
depends_on:
- backend
volumes:
minio_data: {}
kafka_data: {}
networks:
themepark-network:
driver: bridge
Run
docker exec -it flink-jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/avg_waittime_job.sql
npm dev run
