content[-]

Project Link github

kafka producer

import asyncio
import json
import time
from datetime import datetime
import requests
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
from confluent_kafka.serialization import SerializationContext, MessageField

API_URL = "https://api.themeparks.wiki/v1/entity/6e1464ca-1e9b-49c3-8937-c5c6f6675057/live"
POLL_INTERVAL_SECONDS = 300

ATTRACTION_SCHEMA = {
    "title": "AttractionData",
    "type": "object",
    "properties": {
        "entityId": {"type": "string"},
        "timestamp_ms": {"type": "integer"},
        "status": {"type": "string"},
        "name": {"type": "string"},
        "waitTime": {"type": "integer"},
        "entityType": {"type": "string"}
    },
    "required": ["entityId", "timestamp_ms", "status", "name", "waitTime", "entityType"]
}

class ThemeParkProducer:
    def __init__(self, config):
        kafka_config = config.get_kafka_config()
        self.producer = Producer(kafka_config)
        self.topic = config.get_topic_names()['raw_data']
        self.running = False

        schema_registry_config = config.get_schema_registry_config()
        self.schema_registry_client = SchemaRegistryClient(schema_registry_config)

        self.json_serializer = JSONSerializer(
            json.dumps(ATTRACTION_SCHEMA),
            self.schema_registry_client
        )

    def delivery_callback(self, err, msg):
        if err:
            print(f'Delivery failed: {err}')
        else:
            print(f'Delivered to {msg.topic()} [{msg.partition()}]')

    async def fetch_and_produce(self):
        self.running = True
        print(f"Starting theme park data producer...")

        while self.running:
            try:
                timestamp_ms = int(time.time() * 1000)
                response = requests.get(API_URL, timeout=10)
                response.raise_for_status()
                data = response.json()

                live_entities = data.get('liveData', [])
                count = 0

                for entity_data in live_entities:
                    wait_time = entity_data.get('queue', {}).get('STANDBY', {}).get('waitTime', 0)

                    if entity_data.get('entityType') == 'ATTRACTION' and wait_time is not None:
                        event = {
                            "entityId": entity_data.get('id'),
                            "timestamp_ms": timestamp_ms,
                            "status": entity_data.get('status'),
                            "name": entity_data.get('name'),
                            "waitTime": int(wait_time),
                            "entityType": entity_data.get('entityType')
                        }

                        serialized_data = self.json_serializer(
                            event,
                            SerializationContext(self.topic, MessageField.VALUE)
                        )

                        self.producer.produce(
                            self.topic,
                            key=str(event['entityId']).encode('utf-8'),
                            value=serialized_data,
                            callback=self.delivery_callback
                        )

                        print(f" {event['name']} | {event['status']} | {event['waitTime']} min")
                        count += 1

                self.producer.flush()
                print(f" Produced {count} records. Sleeping {POLL_INTERVAL_SECONDS}s...\n")

                await asyncio.sleep(POLL_INTERVAL_SECONDS)

            except requests.exceptions.RequestException as e:
                print(f"API Error: {e}")
                await asyncio.sleep(60)
            except Exception as e:
                print(f" Error: {e}")
                await asyncio.sleep(60)

    def stop(self):
        self.running = False
        self.producer.flush()

themepark_producer = None

Kakfa consumer

import asyncio
import json
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

class ThemeParkConsumer:
    def __init__(self, config):
        consumer_config = config.get_kafka_config()
        consumer_config['group.id'] = config.get_consumer_group()
        consumer_config['auto.offset.reset'] = 'earliest'
        consumer_config['enable.auto.commit'] = True

        self.consumer = Consumer(consumer_config)
        self.topics = [config.get_topic_names()['avg_waittime']]
        self.running = False
        self.avg_waittimes = {}
        self.callbacks = []

        schema_registry_config = config.get_schema_registry_config()
        self.schema_registry_client = SchemaRegistryClient(schema_registry_config)

        # Try Avro deserializer (Flink uses Avro by default)
        self.avro_deserializer = AvroDeserializer(
            self.schema_registry_client,
            from_dict=lambda obj, ctx: obj
        )

    def add_callback(self, callback):
        self.callbacks.append(callback)

    def start_consuming(self):
        self.running = True
        self.consumer.subscribe(self.topics)

        async def consume_loop():
            print(f"Consumer started for topics: {self.topics}")

            while self.running:
                try:
                    msg = self.consumer.poll(timeout=1.0)

                    if msg is None:
                        await asyncio.sleep(0.1)
                        continue

                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            continue
                        else:
                            print(f" Consumer error: {msg.error()}")
                            continue

                    # Decode key
                    try:
                        if msg.key():
                            key = self.avro_deserializer(msg.key(), SerializationContext(msg.topic(), MessageField.KEY))
                            if isinstance(key, dict):
                                entity_id = key.get('entityId')
                            else:
                                entity_id = str(key)
                        else:
                            entity_id = None
                    except:
                        entity_id = msg.key().decode('utf-8') if msg.key() else None

                    # Try Avro first (Flink default), then JSON fallback
                    value = None
                    try:
                        value = self.avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
                        print(f" Avro deserialized: {value}")
                    except Exception as e:
                        print(f" Avro failed: {e}")
                        try:
                            # Fallback to plain JSON
                            value = json.loads(msg.value().decode('utf-8'))
                            print(f" JSON deserialized: {value}")
                        except Exception as e2:
                            print(f"❌ JSON also failed: {e2}")
                            continue

                    if value:
                        avg_waittime = value.get('avg_waittime')
                        name = value.get('name', entity_id)

                        self.avg_waittimes[entity_id] = {
                            'entityId': entity_id,
                            'name': name,
                            'avg_waittime': avg_waittime
                        }

                        self.avg_waittimes[entity_id] = {
                            'entityId': entity_id,
                            'name': name,
                            'avg_waittime': avg_waittime
                        }

                        print(f" {name}: Avg Wait = {avg_waittime:.1f} min")

                        for callback in self.callbacks:
                            callback(list(self.avg_waittimes.values()))

                except Exception as e:
                    print(f" Error consuming: {e}")
                    await asyncio.sleep(1)

        asyncio.create_task(consume_loop())

    def get_avg_waittimes(self):
        return list(self.avg_waittimes.values())

    def stop_consuming(self):
        self.running = False
        self.consumer.close()

themepark_consumer = None

flink sql

-- Create Kafka source table for raw themepark data
CREATE TABLE themepark_raw (
    entityId STRING,
    timestamp_ms BIGINT,
    status STRING,
    name STRING,
    waitTime INT,
    entityType STRING,
    event_time AS TO_TIMESTAMP_LTZ(timestamp_ms, 3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'themepark_raw',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-themepark-analytics',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

-- Create Kafka sink table for average wait times
CREATE TABLE attraction_avg_waittime (
    entityId STRING,
    name STRING,
    avg_waittime DOUBLE,
    PRIMARY KEY (entityId) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'attraction_avg_waittime',
    'properties.bootstrap.servers' = 'kafka:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);

-- Calculate 20-minute sliding window (updates every 5 minutes)
INSERT INTO attraction_avg_waittime
SELECT
  entityId,
  REGEXP_REPLACE(name, '“|”|’','') AS name,
  AVG(CAST(waitTime AS DOUBLE)) AS avg_waittime
FROM TABLE(
  HOP(TABLE themepark_raw, DESCRIPTOR(event_time), INTERVAL '5' MINUTES, INTERVAL '20' MINUTES)
)
WHERE status = 'OPERATING'
GROUP BY entityId, REGEXP_REPLACE(name, '“|”|’', ''), window_start, window_end;

docker-compose.yml

services:
  # 1. Apache Zookeeper (REQUIRED for legacy Kafka mode)
#  zookeeper:
#    image: confluentinc/cp-zookeeper:7.5.0
#    container_name: zookeeper
#    environment:
#      ZOOKEEPER_CLIENT_PORT: 2181
#      ZOOKEEPER_TICK_TIME: 2000
#    ports:
#      - "2181:2181"

  # 2. Apache Kafka Broker (Using Zookeeper for Coordination)
  kafka:
    image: confluentinc/cp-kafka:latest # Using a reliable Kafka image
    container_name: kafka
    networks:
      - themepark-network
    ports:
      # Map host port 9092 to container port 9092 for external clients
      - "9092:9092"
      - "29092:29092"
    environment:
      # Unique ID for this Kafka instance
      KAFKA_NODE_ID: 1
      # Enable KRaft mode and define roles as both broker and controller
      KAFKA_PROCESS_ROLES: 'broker,controller'
      # Define the listeners for internal/external communication
      #KAFKA_LISTENERS: 'PLAINTEXT://:9092,CONTROLLER://:9093'
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093
      # Address that clients outside the Docker network will use to connect
      #KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
      #KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT'
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

      # Controller quorum voters list (self-reference in single-node setup)
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'
      # Cluster ID (can be generated, or it will auto-generate)
      CLUSTER_ID: 'ABCDEF-GHIJ-KLMN-OPQR'
      # Recommended for single-node development
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
    volumes:
      - kafka_data:/var/lib/kafka/data

  # 2. MinIO (The Iceberg Data Storage - S3 compatible)
  minio:
    image: minio/minio:latest
    container_name: minio
    ports:
      - "9000:9000"
      - "9999:9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    command: minio server /data --console-address ":9001"
    volumes:
      - minio_data:/data

  # 3. Flink JobManager (The Processor Coordinator)
  flink-jobmanager:
    build:
      context: ./flink
      dockerfile: Dockerfile
    container_name: flink-jobmanager
    networks:
      - themepark-network
    expose:
      - "8081"
    ports:
      - "8081:8081" # Flink Web UI
    volumes:
      - ./sql:/opt/sql

    #  FIX: Revert to the simple jobmanager command that keeps the container alive
    command: jobmanager

    environment:
      # --- Standard Flink Properties ---
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        # Flink properties for S3 access (Fallback 1)
        s3.endpoint: http://minio:9000
        s3.access-key: minioadmin
        s3.secret-key: minioadmin
        s3.path-style-access: true

        # Hadoop Configuration Properties (Fallback 2, for Flink's configuration loading)
        hadoop-conf.fs.s3a.connection.establish.timeout: 500000
        hadoop-conf.fs.s3a.connection.timeout: 10000000
        hadoop-conf.fs.s3a.attempts: 50
        HADOOP_CONF_fs_s3a_endpoint: http://minio:9000


    depends_on:
      - kafka
      - minio

  # 4. Flink TaskManager (The Worker)
  flink-taskmanager:
    build:
      context: ./flink
      dockerfile: Dockerfile
    image: flink-custom
    container_name: flink-taskmanager
    networks:
      - themepark-network
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 30
        s3.endpoint: http://minio:9000
        s3.access-key: minioadmin
        s3.secret-key: minioadmin
        s3.path-style-access: true
    depends_on:
      - flink-jobmanager

  # 5. Backend API (FastAPI with Producer & Consumer)
  backend:
    build:
      context: ./backend
      dockerfile: Dockerfile
    container_name: themepark-backend
    networks:
      - themepark-network
    ports:
      - "8002:8002"
    environment:
      KAFKA_BROKER: kafka:9092
      RAW_TOPIC: themepark_raw
      AVG_TOPIC: attraction_avg_waittime
      CONSUMER_GROUP: themepark-consumer-local
    depends_on:
      - kafka
    restart: on-failure

  # 6. Frontend (React + Vite)
  frontend:
    image: node:20-alpine
    container_name: themepark-frontend
    working_dir: /app
    volumes:
      - ../frontend:/app
    ports:
      - "5173:5173"
    command: sh -c "npm install   npm run dev -- --host 0.0.0.0"
    environment:
      - VITE_API_URL=http://localhost:8002
    depends_on:
      - backend

volumes:
  minio_data: {}
  kafka_data: {}

networks:
  themepark-network:
    driver: bridge

Run

docker exec -it flink-jobmanager /opt/flink/bin/sql-client.sh -f /opt/sql/avg_waittime_job.sql
npm dev run