content[-]
lambda_function.py
import json
import os
import time
import sys
import subprocess
# pip install custom package to /tmp/ and add to path
subprocess.call('pip install requests -t /tmp/ --no-cache-dir'.split(), stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
subprocess.call('pip install snowflake-connector-python -t /tmp/ --no-cache-dir'.split(), stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
sys.path.insert(1, '/tmp/')
import snowflake.connector
import requests
def lambda_handler(event, context):
# Step 1: Fetch the JSON data from the API with retry logic
api_url = 'https://api.aviationstack.com/v1/flights'
params = {
'access_key': '***',
'flight_status': 'active',
'limit': 100
}
max_retries = 3
retry_delay = 5 # seconds
retries = 0
while retries <= max_retries:
response = requests.get(api_url, params=params)
if response.status_code == 200:
flights_data = response.json().get('data', [])
break # Exit the loop if the request was successful
else:
retries += 1
if retries > max_retries:
return {
'statusCode': response.status_code,
'body': json.dumps(
f"Failed to fetch data after {retries} attempts: {response.status_code}, {response.text}")
}
time.sleep(retry_delay) # Wait before retrying
# Step 2: Connect to Snowflake
try:
conn = snowflake.connector.connect(
user='PENGFEIQIAO',
password='***',
account='***',
database='flight',
schema='flight_ods',
role='dev'
)
cur = conn.cursor()
# Step 3: Prepare data for batch insert
insert_data = [
(
'AviationStack API', # Source
json.dumps(flight), # JSON string of flight data
)
for flight in flights_data
]
print(insert_data)
# Step 4: Insert all data at once using executemany()
insert_query = """
INSERT INTO flight_ods.flights_data_raw (source, data, load_timestamp)
VALUES (
%s, -- source
%s, -- data
CONVERT_TIMEZONE('UTC', 'Europe/Paris', CURRENT_TIMESTAMP()) -- load_timestamp
)
"""
cur.executemany(insert_query, insert_data)
# Step 5: Commit and close the connection
conn.commit()
cur.close()
conn.close()
return {
'statusCode': 200,
'body': json.dumps("Data loaded into Snowflake ODS successfully.")
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(f"Error in Snowflake operation: {e}")
}
dbt code
dbt project github link
Snowflake
#prepare environment
create warehouse transforming;
CREATE ROLE DEV;
GRANT USAGE ON DATABASE FLIGHT TO ROLE dev;
GRANT USAGE ON SCHEMA FLIGHT.ODS TO ROLE dev;
GRANT SELECT ON ALL TABLES IN SCHEMA ods TO ROLE DEV;
GRANT INSERT, UPDATE, DELETE, SELECT ON ALL TABLES IN SCHEMA FLIGHT.ODS TO ROLE dev;
GRANT ROLE dev TO USER PENGFEIQIAO;
-- Grant create table permission on the schema
GRANT CREATE TABLE ON SCHEMA flight.ODS TO ROLE dev;
GRANT USAGE ON WAREHOUSE transforming TO ROLE dev;
GRANT CREATE view ON SCHEMA flight.ODS TO ROLE dev;
select * from flight_dwh.arrival_country_ranking
-- Switch to an appropriate role
USE ROLE ACCOUNTADMIN;
-- Grant create table permission on the schema
GRANT CREATE TABLE ON SCHEMA flight.ODS TO ROLE dev;
SHOW GRANTS TO ROLE DEV;
SHOW GRANTS TO USER pengfeiqiao;
GRANT ROLE DEV TO USER pengfeiqiao;
GRANT USAGE ON WAREHOUSE transforming TO ROLE dev;
# Create table
CREATE OR REPLACE TABLE flight_ods.flights_data_raw (
source VARCHAR,
data VARCHAR,
load_timestamp TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP
);
SELECT * from icata
PARSE_JSON(data):flight_date::STRING AS flight_date,
PARSE_JSON(data):flight_status::STRING AS flight_status,
PARSE_JSON(data):departure:airport::STRING AS departure_airport,
PARSE_JSON(data):departure:delay::STRING AS departure_delay,
PARSE_JSON(data):departure:icao::STRING AS departure_icao,
PARSE_JSON(data):departure:iata::STRING AS departure_iata,
PARSE_JSON(data):arrival:airport::STRING AS arrival_airport,
PARSE_JSON(data):arrival:delay::STRING AS arrival_delay,
PARSE_JSON(data):arrival:icao::STRING AS arrival_icao,
PARSE_JSON(data):arrival:iata::STRING AS arrival_iata,
PARSE_JSON(data):airline:name::STRING AS airline_name,
CURRENT_TIMESTAMP() as load_timestamp
FROM
ods.flights_data_raw;
ALTER SESSION SET TIMEZONE = 'Europe/Amsterdam';
# create stage
CREATE OR REPLACE STAGE snowflake_s3_stage
URL = 's3://snowflake-flight/mart/'
CREDENTIALS = (AWS_KEY_ID = '***' AWS_SECRET_KEY = '***');
LIST @snowflake_s3_stage
CREATE OR REPLACE STREAM arrival_country_ranking_stream ON table FLIGHT.FLIGHT_MART.ARRIVAL_COUNTRY_RANKING;
CREATE OR REPLACE TASK arrival_country_ranking_stream_to_s3_task
WAREHOUSE = transforming
SCHEDULE = '1 minute' -- Adjust schedulFLIGHT.FLIGHT_DWHe
WHEN SYSTEM$STREAM_HAS_DATA('arrival_country_ranking_stream')
AS
COPY INTO '@snowflake_s3_stage/arrival_country_ranking/'
FROM (SELECT ARRIVAL_COUNTRY_NAME,FLIGHT_DATE, cn
FROM arrival_country_ranking_stream where arrival_country_name is not null and
flight_date in (select max(flight_date) from arrival_country_ranking_stream )
)
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE=TRUE;
;
ALTER TASK arrival_country_ranking_stream_to_s3_task RESUME;