Atlas now supports Snowflake tasks and pipes as first-class resources, with support for task DAGs, session parameters, and continuous data loading pipelines.
Tasks and pipes can be defined, diffed, and migrated alongside other schema objects like tables, stages, and views.
Tasks
The task block defines a Snowflake task with a schedule, warehouse, optional condition, and a SQL statement to execute:
task "root_task" {schema = schema.PUBLICschedule = "60 MINUTE"warehouse = "COMPUTE_WH"condition = "SYSTEM$STREAM_HAS_DATA('my_stream')"comment = "root task"as = "INSERT INTO target SELECT * FROM source"}
Task DAGs
Child tasks use the after attribute to form a directed acyclic graph (DAG). Atlas resolves task dependencies and generates statements in the correct order:
task "child_task" {schema = schema.PUBLICwarehouse = "COMPUTE_WH"as = "CALL refresh_summary()"after = [task.root_task]}
Session Parameters
Tasks support session parameters like user_task_timeout_ms, suspend_task_after_num_failures, and task_auto_retry_attempts. Atlas diffs these parameters and generates the appropriate ALTER TASK SET or UNSET statements:
task "etl_task" {schema = schema.PUBLICschedule = "120 MINUTE"warehouse = "COMPUTE_WH"user_task_timeout_ms = 30000suspend_task_after_num_failures = 5task_auto_retry_attempts = 3as = "INSERT INTO t1 SELECT * FROM t2"}
Pipes
The pipe block defines a Snowflake pipe for continuous data loading. Pipes reference a stage and a COPY INTO statement:
stage "data_stage" {schema = schema.PUBLIC}pipe "ingest_pipe" {schema = schema.PUBLICas = "COPY INTO target_table FROM @data_stage FILE_FORMAT=(TYPE='JSON')"comment = "continuous ingestion pipe"}
Generated SQL
Atlas generates the appropriate CREATE, ALTER, and DROP statements for both tasks and pipes:
-- Create task "root_task"CREATE TASK "root_task"WAREHOUSE = "COMPUTE_WH"SCHEDULE = '60 MINUTE'COMMENT = 'root task'WHEN SYSTEM$STREAM_HAS_DATA('my_stream')AS INSERT INTO target SELECT * FROM source;-- Create task "child_task"CREATE TASK "child_task"WAREHOUSE = "COMPUTE_WH"AFTER "PUBLIC"."root_task"AS CALL refresh_summary();
-- Create pipe "ingest_pipe"CREATE PIPE "ingest_pipe"COMMENT = 'continuous ingestion pipe'AS COPY INTO target_table FROM @data_stage FILE_FORMAT=(TYPE='JSON');-- Modify pipe "ingest_pipe"ALTER PIPE "ingest_pipe" SET COMMENT = 'updated comment';
Atlas fully manages the lifecycle of tasks and pipes: creating, altering, dropping, and diffing them alongside other schema objects. Task dependency ordering is handled automatically, ensuring child tasks are dropped before their parents and created after them.