content[-]

lambda_function.py

import json
import os
import time
import sys
import subprocess
# pip install custom package to /tmp/ and add to path
subprocess.call('pip install requests -t /tmp/ --no-cache-dir'.split(), stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL)
subprocess.call('pip install snowflake-connector-python -t /tmp/ --no-cache-dir'.split(), stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL)                


sys.path.insert(1, '/tmp/')
import snowflake.connector
import requests

def lambda_handler(event, context):
    # Step 1: Fetch the JSON data from the API with retry logic
    api_url = 'https://api.aviationstack.com/v1/flights'
    params = {
        'access_key': '***',
        'flight_status': 'active',
        'limit': 100
    }

    max_retries = 3
    retry_delay = 5  # seconds
    retries = 0

    while retries <= max_retries:
        response = requests.get(api_url, params=params)

        if response.status_code == 200:
            flights_data = response.json().get('data', [])
            break  # Exit the loop if the request was successful
        else:
            retries += 1
            if retries > max_retries:
                return {
                    'statusCode': response.status_code,
                    'body': json.dumps(
                        f"Failed to fetch data after {retries} attempts: {response.status_code}, {response.text}")
                }
            time.sleep(retry_delay)  # Wait before retrying

    # Step 2: Connect to Snowflake
    try:
        conn = snowflake.connector.connect(
            user='PENGFEIQIAO',
            password='***',
            account='***',
            database='flight',
            schema='flight_ods',
            role='dev'
        )
        cur = conn.cursor()

        # Step 3: Prepare data for batch insert
        insert_data = [
            (
                'AviationStack API',  # Source
                json.dumps(flight),   # JSON string of flight data

            )
            for flight in flights_data
        ]
        print(insert_data)
        # Step 4: Insert all data at once using executemany()
        insert_query = """
            INSERT INTO flight_ods.flights_data_raw (source, data, load_timestamp)
            VALUES (
                %s,  -- source
                %s,  -- data
                CONVERT_TIMEZONE('UTC', 'Europe/Paris', CURRENT_TIMESTAMP())  -- load_timestamp
            )
        """
        cur.executemany(insert_query, insert_data)

        # Step 5: Commit and close the connection
        conn.commit()
        cur.close()
        conn.close()

        return {
            'statusCode': 200,
            'body': json.dumps("Data loaded into Snowflake ODS successfully.")
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f"Error in Snowflake operation: {e}")
        }

dbt code

dbt project github link

Snowflake

#prepare environment
create warehouse transforming;
CREATE ROLE DEV;
GRANT USAGE ON DATABASE FLIGHT TO ROLE dev;
GRANT USAGE ON SCHEMA FLIGHT.ODS TO ROLE dev;
GRANT SELECT ON ALL TABLES IN SCHEMA ods TO ROLE DEV;

GRANT INSERT, UPDATE, DELETE, SELECT ON ALL TABLES IN SCHEMA FLIGHT.ODS TO ROLE dev;
GRANT ROLE dev TO USER PENGFEIQIAO;
-- Grant create table permission on the schema
GRANT CREATE TABLE ON SCHEMA flight.ODS TO ROLE dev;

GRANT USAGE ON WAREHOUSE transforming TO ROLE dev;

GRANT CREATE view ON SCHEMA flight.ODS TO ROLE dev;

select * from flight_dwh.arrival_country_ranking

-- Switch to an appropriate role
USE ROLE ACCOUNTADMIN;

-- Grant create table permission on the schema
GRANT CREATE TABLE ON SCHEMA flight.ODS TO ROLE dev;

SHOW GRANTS TO ROLE DEV;
SHOW GRANTS TO USER pengfeiqiao;
GRANT ROLE DEV TO USER pengfeiqiao;
GRANT USAGE ON WAREHOUSE transforming TO ROLE dev;

# Create table 
CREATE OR REPLACE TABLE flight_ods.flights_data_raw (
    source VARCHAR,
    data VARCHAR,
    load_timestamp TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP
);
SELECT * from icata
    PARSE_JSON(data):flight_date::STRING AS flight_date,
    PARSE_JSON(data):flight_status::STRING AS flight_status,
    PARSE_JSON(data):departure:airport::STRING AS departure_airport,
    PARSE_JSON(data):departure:delay::STRING AS departure_delay,
    PARSE_JSON(data):departure:icao::STRING AS departure_icao,
    PARSE_JSON(data):departure:iata::STRING AS departure_iata,
    PARSE_JSON(data):arrival:airport::STRING AS arrival_airport,
    PARSE_JSON(data):arrival:delay::STRING AS arrival_delay,
    PARSE_JSON(data):arrival:icao::STRING AS arrival_icao,
    PARSE_JSON(data):arrival:iata::STRING AS arrival_iata,
    PARSE_JSON(data):airline:name::STRING AS airline_name,
    CURRENT_TIMESTAMP() as load_timestamp
FROM
    ods.flights_data_raw;
ALTER SESSION SET TIMEZONE = 'Europe/Amsterdam';

# create stage
CREATE OR REPLACE STAGE snowflake_s3_stage
  URL = 's3://snowflake-flight/mart/'
  CREDENTIALS = (AWS_KEY_ID = '***' AWS_SECRET_KEY = '***');
LIST @snowflake_s3_stage

CREATE OR REPLACE STREAM arrival_country_ranking_stream ON table FLIGHT.FLIGHT_MART.ARRIVAL_COUNTRY_RANKING;

CREATE OR REPLACE TASK arrival_country_ranking_stream_to_s3_task
  WAREHOUSE = transforming
  SCHEDULE = '1 minute' -- Adjust schedulFLIGHT.FLIGHT_DWHe 
  WHEN SYSTEM$STREAM_HAS_DATA('arrival_country_ranking_stream')
AS

COPY INTO '@snowflake_s3_stage/arrival_country_ranking/'
FROM (SELECT ARRIVAL_COUNTRY_NAME,FLIGHT_DATE, cn 
FROM arrival_country_ranking_stream where arrival_country_name is not null and
flight_date in (select max(flight_date) from arrival_country_ranking_stream )
)
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE=TRUE;
;

ALTER TASK arrival_country_ranking_stream_to_s3_task RESUME;