Back to changelog
New
3 minute read

Snowflake: Task & Pipe Support

Atlas now supports Snowflake tasks and pipes as first-class resources, with support for task DAGs, session parameters, and continuous data loading pipelines.

Tasks and pipes can be defined, diffed, and migrated alongside other schema objects like tables, stages, and views.

Tasks

The task block defines a Snowflake task with a schedule, warehouse, optional condition, and a SQL statement to execute:

task "root_task" {
schema = schema.PUBLIC
schedule = "60 MINUTE"
warehouse = "COMPUTE_WH"
condition = "SYSTEM$STREAM_HAS_DATA('my_stream')"
comment = "root task"
as = "INSERT INTO target SELECT * FROM source"
}

Task DAGs

Child tasks use the after attribute to form a directed acyclic graph (DAG). Atlas resolves task dependencies and generates statements in the correct order:

task "child_task" {
schema = schema.PUBLIC
warehouse = "COMPUTE_WH"
as = "CALL refresh_summary()"
after = [task.root_task]
}

Session Parameters

Tasks support session parameters like user_task_timeout_ms, suspend_task_after_num_failures, and task_auto_retry_attempts. Atlas diffs these parameters and generates the appropriate ALTER TASK SET or UNSET statements:

task "etl_task" {
schema = schema.PUBLIC
schedule = "120 MINUTE"
warehouse = "COMPUTE_WH"
user_task_timeout_ms = 30000
suspend_task_after_num_failures = 5
task_auto_retry_attempts = 3
as = "INSERT INTO t1 SELECT * FROM t2"
}

Pipes

The pipe block defines a Snowflake pipe for continuous data loading. Pipes reference a stage and a COPY INTO statement:

stage "data_stage" {
schema = schema.PUBLIC
}
pipe "ingest_pipe" {
schema = schema.PUBLIC
as = "COPY INTO target_table FROM @data_stage FILE_FORMAT=(TYPE='JSON')"
comment = "continuous ingestion pipe"
}

Generated SQL

Atlas generates the appropriate CREATE, ALTER, and DROP statements for both tasks and pipes:

-- Create task "root_task"
CREATE TASK "root_task"
WAREHOUSE = "COMPUTE_WH"
SCHEDULE = '60 MINUTE'
COMMENT = 'root task'
WHEN SYSTEM$STREAM_HAS_DATA('my_stream')
AS INSERT INTO target SELECT * FROM source;
-- Create task "child_task"
CREATE TASK "child_task"
WAREHOUSE = "COMPUTE_WH"
AFTER "PUBLIC"."root_task"
AS CALL refresh_summary();
-- Create pipe "ingest_pipe"
CREATE PIPE "ingest_pipe"
COMMENT = 'continuous ingestion pipe'
AS COPY INTO target_table FROM @data_stage FILE_FORMAT=(TYPE='JSON');
-- Modify pipe "ingest_pipe"
ALTER PIPE "ingest_pipe" SET COMMENT = 'updated comment';

Atlas fully manages the lifecycle of tasks and pipes: creating, altering, dropping, and diffing them alongside other schema objects. Task dependency ordering is handled automatically, ensuring child tasks are dropped before their parents and created after them.

featuresnowflaketaskpipe