content[-]

CDC

create database if not exists data_pipelines;
create or replace schema data_pipelines.manual_cdc;

-- source (table) --> target (table), w/ MERGE in stored proc
CREATE TABLE source(del BOOLEAN, id INT, name STRING);
CREATE TABLE target(id INT, name STRING);

merge into target t using source s on  t.id = s.id
  when not matched and not del then insert (id, name) values (s.id, s.name)
  when matched and del then delete
  when matched and not del then update set t.name = s.name;

create procedure cdc() returns int
as $$
merge into target t using source s on  t.id = s.id
  when not matched and not del then insert (id, name) values (s.id, s.name)
  when matched and del then delete
  when matched and not del then update set t.name = s.name;
$$;

-- 3 x INSERT
INSERT INTO source VALUES (False, 1, 'John'), (False, 2, 'Mary'), (False, 3, 'George');
CALL cdc();
TRUNCATE TABLE source;
SELECT * FROM target;

-- UPDATE + INSERT
INSERT INTO source VALUES (False, 1, 'Mark'), (True, 2, NULL);
CALL cdc();
TRUNCATE TABLE source;
SELECT * FROM target;

Stream and tasks

create database if not exists data_pipelines;
create or replace schema data_pipelines.streams_and_tasks;

-- source (table) --> mystream (stream) --> target (table)
create table source(id int, name string);
create table target(id int, name string);

create stream mystream on table source;

-- task on mystream data stream, w/ MERGE statement
create or replace task mytask
  warehouse = compute_wh
  schedule = '1 minute'
  when system$stream_has_data('mystream')
as
  merge into target t using mystream s on t.id = s.id
  when matched
    and metadata$action = 'DELETE'
    and metadata$isupdate = 'FALSE'
    then delete
  when matched
    and metadata$action = 'INSERT'
    and metadata$isupdate = 'TRUE'
    then update set t.name = s.name
  when not matched
    and metadata$action = 'INSERT'
    then insert (id, name) values (s.id, s.name);

-- insert 3 rows in the source table
select system$stream_has_data('mystream');
insert into source values (1, 'John'), (2, 'Mary'), (3, 'George');
select system$stream_has_data('mystream');

-- could manually execute the task and look at its execution
alter task mytask resume;
execute task mytask;
select *
  from table(information_schema.task_history(task_name => 'mytask'))
  order by run_id desc;

select * from target;

-- update+delete existing source rows --> target should make in-place changes
update source set name = 'Mark' where id = 1;
delete from source where id = 2;
select system$stream_has_data('mystream');
select * from target;

dynamic tables

create database if not exists data_pipelines;
create or replace schema data_pipelines.dynamic_tables;

-- source (table) --> target (dynamic table)
CREATE TABLE source(id INT, name STRING);

CREATE DYNAMIC TABLE target
  WAREHOUSE = compute_wh
  TARGET_LAG = '1 minute'
AS
  SELECT id, name FROM source;

-- insert 3 rows in the source table
INSERT INTO source VALUES (1, 'John'), (2, 'Mary'), (3, 'George');
SELECT * FROM target;

-- update+delete existing source rows --> dynamic table should reflect in-place changes
UPDATE source SET name = 'Mark' WHERE id = 1;
DELETE FROM source WHERE id = 2;
SELECT * FROM target;

ALTER DYNAMIC TABLE target SUSPEND;