Incremental Materialization
Materialize incremental models in Snowflake to save time and compute.
Overview
This guide introduces the incremental materialization of models in Snowflake using SDF. We’ll build on the basic concepts of materializing tables and views, focusing on incremental updates to optimize performance and resource usage. We continue to use datasets from Cybersyn available through Snowflake’s marketplace.
Prerequisites
This guide should be followed after completing the Getting Started with Snowflake and SDF guide and the Basic Materialization with Snowflake guide.
You’ll also need:
- A Snowflake account with this Cybersyn dataset installed.
- Valid Snowflake username / password credentials with write access to at least one database we can materialize tables to.
- Instantiated credentials completed in the previous guide.
When installing the Cybersyn dataset, make sure to grant the Snowflake role you’ll use with SDF read access to TECH__INNOVATION_ESSENTIALS
(i.e. the Cybersyn database).
Guide
Create a New SDF Project from the Cybersyn Tech Innovation Sample
If you just completed the Basic Materialization with Snowflake guide, you can skip the next two steps and use the project you created there.
Create a new SDF project using the Cybersyn Tech Innovation sample. This will create a new project in your current working directory with the sample project files.
sdf new --sample cybersyn_tech_innovation cybersyn_tech_innovation_incremental
Compile to Test Credentials
To ensure your credentials are working and have read access to the new Cybersyn database, let’s try compiling one of the models.
sdf compile sdf_snowflake.cybersyn_tech_innovation.active_ai_domains
Working set 3 model files, 1 .sdf file
Downloading TECH__INNOVATION_ESSENTIALS.CYBERSYN.DOMAIN_CHARACTERISTICS (schema)
Compiling sdf_snowflake.cybersyn_tech_innovation.active_ai_domains (./models/sdf_snowflake/cybersyn_tech_innovation/active_ai_domains.sql)
Finished 2 models [1 succeeded, 1 downloaded] in 1.741 secs
Schema sdf_snowflake.cybersyn_tech_innovation.active_ai_domains
┌─────────────┬───────────┬────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ column_name ┆ data_type ┆ classifier ┆ description │
╞═════════════╪═══════════╪════════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡
│ DOMAIN_ID ┆ varchar ┆ ┆ Base domain for the website. For example, the domain_id for https://docs.cybersyn.com/ would be 'cybersyn.com' │
└─────────────┴───────────┴────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
If you do not see a successful compilation, please ensure you’ve:
- Followed the Getting Started with Snowflake and SDF guide to authenticate to your Snowflake.
- Granted the correct role read access to the
TECH__INNOVATION_ESSENTIALS
database.
Materialize an Increment Model
Now that we’ve confirmed our credentials are working, let’s materialize an incremental model. Cybersyn offers a table called github_events
which gets updated frequently with new events across all repositories on GitHub. We’ll use this table to demonstrate incremental materialization with the append
merge strategy.
Let’s say we want to track push events to the Apache DataFusion repo. We can create a new model that finds all push events to DataFusion. Since the github_events
source data is updated frequently, we can use incremental materialization to only query new push events, thereby saving on compute and optimizing our pipeline.
Since we don’t care about updates to existing rows, we can use the append
merge strategy. This strategy appends new rows to the target table without updating existing rows.
Let’s start creating a file called datafusion_push_events.sql
in the directory models/sdf_snowflake/staging/
and adding the following SQL to it:
SELECT
*
FROM
tech__innovation_essentials.cybersyn.github_events
WHERE
{% if builtin.is_incremental_mode %}
-- Only fetch rows that are newer than the newest row in the previous materialization of this table
created_at_timestamp >= (SELECT MAX(created_at_timestamp) FROM sdf_snowflake.staging.datafusion_push_events)
{% else %}
created_at_timestamp >= DATEADD(WEEK, -1, CURRENT_TIMESTAMP())
{% endif %}
AND
type = 'PushEvent'
AND
repo_name = 'apache/datafusion';
Let’s unpack a few things here:
- This relatively simple query fetches events and filters by their
type
,repository name
, andcreated_at_timestamp
. - The
{% if builtin.is_incremental_mode %}
block is a Jinja conditional that checks if the model is being materialized incrementally. If it is, we only fetch rows that are newer than the newest row in the previous materialization of this table. If not, we fetch rows from the last week.
In a production scenario, you would likely want to fetch events from all time for non-incremental mode run as this would be a full refresh of the data. We are adding the week filter to prevent any major compute costs in this guide.
You might be wondering, how does SDF know when to set builtin.is_incremental_mode
to True
? SDF sets this variable to True
when the model has already been materialized in the cloud.
Before running this model, we’ll need need to tell SDF to overwrite the default materialization for this table. We can do this by adding the following to the workspace.sdf.yml
file:
---
table:
name: sdf_snowflake.staging.datafusion_push_events
materialization: incremental-table
SDF defaults the incremental strategy to append
, as such we don’t need to specify it in this YML (we’ll explore this later in the guide).
Now, let’s first compile our workspace with this new model:
sdf compile sdf_snowflake.staging.datafusion_push_events
Working set 4 model files, 1 .sdf file
Downloading SDF_SNOWFLAKE.STAGING.DATAFUSION_PUSH_EVENTS (exists_remotely)
Downloading TECH__INNOVATION_ESSENTIALS.CYBERSYN.GITHUB_EVENTS (schema)
Compiling sdf_snowflake.staging.datafusion_push_events (./models/sdf_snowflake/staging/datafusion_push_events.sql)
Finished 2 models [1 succeeded, 1 downloaded] in 2.334 secs
Schema sdf_snowflake.staging.datafusion_push_events
┌────────────────────────────────┬────────────────┬────────────┬────────────────────────────────────────────────────────────────────────────────────────────────┐
│ column_name ┆ data_type ┆ classifier ┆ description │
╞════════════════════════════════╪════════════════╪════════════╪════════════════════════════════════════════════════════════════════════════════════════════════╡
│ ID ┆ varchar ┆ ┆ Unique identifier for the Github event │
│ CREATED_AT_TIMESTAMP ┆ timestamp ┆ ┆ Datetime the event took place │
│ TYPE ┆ varchar ┆ ┆ Category of event logged │
│ ACTOR_AVATAR_URL ┆ varchar ┆ ┆ Avaatar of the user or actor │
│ ACTOR_DISPLAY_LOGIN ┆ varchar ┆ ┆ Display name of the user or actor involved in the event │
│ ACTOR_GRAVATAR_ID ┆ varchar ┆ ┆ │
│ ACTOR_ID ┆ decimal(38, 0) ┆ ┆ Unique identifier of the user or actor involved in the event │
│ ACTOR_LOGIN ┆ varchar ┆ ┆ Login name of the user or actor involved in the event │
│ ACTOR_URL ┆ varchar ┆ ┆ │
│ REPO_ID ┆ decimal(38, 0) ┆ ┆ Unique identifier for the repository involved in the event, joinable to the github_repos table │
│ REPO_NAME ┆ varchar ┆ ┆ Name of the repository │
│ REPO_URL ┆ varchar ┆ ┆ URL of the repository │
│ ORG_AVATAR_URL ┆ varchar ┆ ┆ Avatar of the organization │
│ ORG_GRAVATAR_ID ┆ varchar ┆ ┆ │
│ ORG_ID ┆ decimal(38, 0) ┆ ┆ Unique identifier for the organization │
│ ORG_LOGIN ┆ varchar ┆ ┆ Name of the organization │
│ ORG_URL ┆ varchar ┆ ┆ URL that contains information about the org, including the Github org link │
│ PAYLOAD ┆ variant ┆ ┆ Request involved in the action │
│ PAYLOAD_ACTION ┆ varchar ┆ ┆ Type of action taken by the request │
│ PAYLOAD_DESCRIPTION ┆ varchar ┆ ┆ │
│ PAYLOAD_COMMENT ┆ varchar ┆ ┆ │
│ PAYLOAD_MASTER_BRANCH ┆ varchar ┆ ┆ │
│ PAYLOAD_PULL_REQUEST ┆ varchar ┆ ┆ │
│ PAYLOAD_PUSHER_TYPE ┆ varchar ┆ ┆ Category of user that made the push │
│ PAYLOAD_PUSH_ID ┆ decimal(38, 0) ┆ ┆ │
│ PAYLOAD_HEAD ┆ varchar ┆ ┆ │
│ PAYLOAD_REF ┆ varchar ┆ ┆ │
│ PAYLOAD_REF_TYPE ┆ varchar ┆ ┆ │
│ PAYLOAD_ISSUE_ID ┆ decimal(38, 0) ┆ ┆ │
│ PAYLOAD_ISSUE ┆ variant ┆ ┆ │
│ PUBLIC ┆ boolean ┆ ┆ │
│ LOAD_DATE ┆ varchar ┆ ┆ Date the event was loaded │
│ CREATED_AT ┆ varchar ┆ ┆ Datetime the event took place │
│ PAYLOAD_BODY ┆ varchar ┆ ┆ │
│ PAYLOAD_COMMIT_ID ┆ varchar ┆ ┆ │
│ PAYLOAD_CREATED_AT ┆ varchar ┆ ┆ │
│ PAYLOAD_USER_ID ┆ decimal(38, 0) ┆ ┆ │
│ PAYLOAD_USER_LOGIN ┆ varchar ┆ ┆ │
│ ISSUE ┆ variant ┆ ┆ │
│ ISSUE_ACTIVE_LOCK_REASON ┆ varchar ┆ ┆ │
│ ISSUE_ASSIGNEE ┆ varchar ┆ ┆ │
│ ISSUE_ASSIGNEES ┆ variant ┆ ┆ │
│ ISSUE_AUTHOR_ASSOCIATION ┆ varchar ┆ ┆ │
│ ISSUE_BODY ┆ varchar ┆ ┆ │
│ ISSUE_CLOSED_AT ┆ varchar ┆ ┆ │
│ ISSUE_COMMENTS ┆ decimal(38, 0) ┆ ┆ │
│ ISSUE_COMMENTS_URL ┆ varchar ┆ ┆ │
│ ISSUE_CREATED_AT ┆ varchar ┆ ┆ │
│ ISSUE_DRAFT ┆ boolean ┆ ┆ │
│ ISSUE_EVENTS_URL ┆ varchar ┆ ┆ │
│ ISSUE_HTML_URL ┆ varchar ┆ ┆ │
│ ISSUE_ID ┆ decimal(38, 0) ┆ ┆ │
│ ISSUE_LABELS ┆ variant ┆ ┆ │
│ ISSUE_LABELS_URL ┆ varchar ┆ ┆ │
│ ISSUE_LOCKED ┆ boolean ┆ ┆ │
│ ISSUE_MILESTONE ┆ varchar ┆ ┆ │
│ ISSUE_NODE_ID ┆ varchar ┆ ┆ │
│ ISSUE_NUMBER ┆ decimal(38, 0) ┆ ┆ │
│ ISSUE_PERFORMED_VIA_GITHUB_APP ┆ varchar ┆ ┆ │
│ ISSUE_PULL_REQUEST ┆ variant ┆ ┆ │
│ ISSUE_REACTIONS ┆ variant ┆ ┆ │
│ ISSUE_REPOSITORY_URL ┆ varchar ┆ ┆ │
│ ISSUE_STATE ┆ varchar ┆ ┆ │
│ ISSUE_TIMELINE_URL ┆ varchar ┆ ┆ │
│ ISSUE_TITLE ┆ varchar ┆ ┆ │
│ ISSUE_UPDATED_AT ┆ varchar ┆ ┆ │
│ ISSUE_URL ┆ varchar ┆ ┆ │
│ ISSUE_USER_ID ┆ decimal(38, 0) ┆ ┆ │
│ ISSUE_USER_LOGIN ┆ varchar ┆ ┆ │
│ ISSUE_USER_TYPE ┆ varchar ┆ ┆ │
│ LANGUAGE ┆ varchar ┆ ┆ Language of the repository. │
└────────────────────────────────┴────────────────┴────────────┴────────────────────────────────────────────────────────────────────────────────────────────────┘
Before running our new incremental model, let’s inspect the compiled query output to see what exactly will be run against Snowflake. To do so, open up the file sdftarget/dbg/materialized/sdf_snowflake/staging/datafusion_push_events.sql
and inspect the SQL query.
It should show this:
create or replace table sdf_snowflake.staging.datafusion_push_events as (
SELECT
*
FROM
tech__innovation_essentials.cybersyn.github_events
WHERE
created_at_timestamp >= DATEADD(WEEK, -1, CURRENT_TIMESTAMP())
AND
type = 'PushEvent'
AND
repo_name = 'apache/datafusion'
);
As you can see, the query was compiled with builtin.is_incremental_mode
set to False
. This is because we haven’t materialized the table yet.
SDF outputs all configurations and settings for the compiled query in comments at the bottom of the file. This is useful for debugging and understanding how SDF is compiling your models.
Let’s run the model now to materialize the table in non-incremental mode.
sdf run models/sdf_snowflake/staging/datafusion_push_events.sql
Nice! The model should have successfully been materialized in Snowflake. Next, we’ll try running the model in incremental mode.
All we need to do is run the model again and SDF will automatically detect that the model has already been materialized and set builtin.is_incremental_mode
to True
.
sdf run sdf_snowflake/staging/datafusion_push_events.sql
Notice how the model ran much faster this time? That’s because SDF only fetched new rows from the github_events
table that were created after the last materialization.
Furthermore, you might notice something slightly different in the run output, specifically a line that says Preloading
like so:
Preloading sdf_snowflake.staging.datafusion_push_events (schema & last_altered)
This indicates SDF is preloading the schema and last altered time of the table before running the query. As was previously mentioned, this is to set the is_incremental_mode
builtin variable.
Lastly, if you inspect the compiled query output again, you should see builtin.is_incremental_mode
set to True
and the query’s SQL reflective of that.
insert into sdf_snowflake.staging.datafusion_push_events
SELECT
*
FROM
tech__innovation_essentials.cybersyn.github_events
WHERE
-- Only fetch rows that are newer than the newest row in the previous materialization of this table
created_at_timestamp >= (SELECT MAX(created_at_timestamp) FROM sdf_snowflake.staging.datafusion_push_events)
AND
type = 'PushEvent'
AND
repo_name = 'apache/datafusion'
;
Utilize the Merge Incremental Strategy
Now that we’ve successfully materialized an incremental model with the append
strategy, let’s explore the merge
strategy. The merge
strategy is useful when you want to update existing rows in the target table with new data from the source table.
For a full reference of all supported merge strategies, see the SDF Reference section.
Let’s say we now want to track a running count of the total push events per each unique contributor to the DataFusion repo. Since we want to update the row corresponding to the contributor, or create a new row if the contributor is new, the merge
strategy is a perfect fit for this use case.
Let’s start by adding a new SQL file to our workspace called top_datafusion_contributors.sql
in the directory models/sdf_snowflake/cybersyn_tech_innovation/
with the following SQL:
SELECT
actor_id,
actor_display_login,
COUNT(DISTINCT ID) as contribution_count
FROM sdf_snowflake.staging.datafusion_push_events
GROUP BY 1,2
ORDER BY contribution_count DESC;
Next, let’s update the workspace.sdf.yml
to set materialization to incremental-table
for this model.
---
table:
name: sdf_snowflake.cybersyn_tech_innovation.top_datafusion_contributors
materialization: incremental-table
incremental-options:
strategy: merge
unique-key: actor_id
merge-update-columns:
- contribution_count
In this YML, we’ve set the incremental-options
to use the merge
strategy. We’ve also specified the unique-key
as actor_id
and the merge-update-columns
as contribution_count
.
This tells SDF to update the contribution_count
column in the target table with the new count from the source table when it finds two rows that match on their actor_id
.
Let’s compile the workspace with this new model:
sdf compile models/sdf_snowflake/cybersyn_tech_innovation/top_datafusion_contributors.sql
Working set 5 model files, 1 .sdf file
Downloading SDF_SNOWFLAKE.STAGING.GITHUB_PUSH_EVENTS (exists_remotely)
Downloading SDF_SNOWFLAKE.CYBERSYN_TECH_INNOVATION.TOP_DATAFUSION_CONTRIBUTORS (exists_remotely)
Downloading TECH__INNOVATION_ESSENTIALS.CYBERSYN.GITHUB_EVENTS (schema)
Compiling sdf_snowflake.staging.datafusion_push_events (./models/sdf_snowflake/staging/datafusion_push_events.sql)
Compiling sdf_snowflake.cybersyn_tech_innovation.top_datafusion_contributors (./models/sdf_snowflake/cybersyn_tech_innovation/top_datafusion_contributors.sql)
Finished 3 models [2 succeeded, 1 downloaded] in 2.265 secs
Schema sdf_snowflake.cybersyn_tech_innovation.top_datafusion_contributors
┌─────────────────────┬────────────────┬────────────┬─────────────┐
│ column_name ┆ data_type ┆ classifier ┆ description │
╞═════════════════════╪════════════════╪════════════╪═════════════╡
│ ACTOR_ID ┆ decimal(38, 0) ┆ ┆ │
│ ACTOR_DISPLAY_LOGIN ┆ varchar ┆ ┆ │
│ CONTRIBUTION_COUNT ┆ decimal(38, 0) ┆ ┆ │
└─────────────────────┴────────────────┴────────────┴─────────────┘
Before running the model, let’s inspect the compiled query output to see what exactly will be run against Snowflake. To do so, open up the file sdftarget/dbg/materialized/sdf_snowflake/cybersyn_tech_innovation/top_datafusion_contributors.sql
and inspect the SQL query.
It should show this:
create or replace table sdf_snowflake.cybersyn_tech_innovation.top_datafusion_contributors as (
SELECT
actor_id,
actor_display_login,
COUNT(DISTINCT ID) as contribution_count
from sdf_snowflake.staging.datafusion_push_events
GROUP BY 1,2
ORDER BY contribution_count DESC
);
Since this table hasn’t materialized yet, builtin.is_incremental_mode
is set to False
and the table is simply being created. Let’s run the model now to materialize the table in non-incremental mode.
sdf run models/sdf_snowflake/cybersyn_tech_innovation/top_datafusion_contributors.sql
The model should have successfully been materialized in Snowflake. Next, let’s try running the model in incremental mode. Like before, we’ll just run the model again and SDF will automatically detect that the model has already been materialized and set builtin.is_incremental_mode
to True
.
sdf run models/sdf_snowflake/cybersyn_tech_innovation/top_datafusion_contributors.sql
Let’s inspect the compiled query output again to see the SQL query that was run. You should see builtin.is_incremental_mode
set to True
and the query’s SQL reflective of that.
merge into sdf_snowflake.cybersyn_tech_innovation.top_datafusion_contributors as SDF_DEST
using (
SELECT
actor_id,
actor_display_login,
COUNT(DISTINCT ID) as contribution_count
from sdf_snowflake.staging.datafusion_push_events
GROUP BY 1,2
ORDER BY contribution_count DESC
) as SDF_SRC
on (
SDF_SRC.actor_id = SDF_DEST.actor_id
)
when matched then update set SDF_DEST.contribution_count = SDF_SRC.contribution_count
when not matched then insert ("ACTOR_ID", "ACTOR_DISPLAY_LOGIN", "CONTRIBUTION_COUNT")
values ("ACTOR_ID", "ACTOR_DISPLAY_LOGIN", "CONTRIBUTION_COUNT");
If the file isn’t updated, try reopening it. Sometimes, because of VSCode’s cache, the file appears to be unchanged.
As you can see, the query was compiled with builtin.is_incremental_mode
set to True
and, as a byproduct, the SQL query is a merge
statement that updates the contribution_count
column in the target table with the new count from the source table.
The merge
strategy is a powerful tool for updating existing rows in the target table with new data from the source table. It’s especially useful when you want to keep a running count of events or other metrics.
For a full list of our supported incremental options and strategies, see the SDF Reference section.