The modern data warehouse at scale is populated, edited, and analyzed via a set of data workflows (or pipelines). Without these, data warehouses would be static and would not be able to keep up with the ever-changing needs of the business. A data pipeline or workflow is a sequence of data processing steps, e.g. fetching data from external sources, transforming data, and loading data into other systems. In order to keep downstream systems up to date, data pipelines are often scheduled to run at regular intervals. Furthermore, to limit the amount of data that needs to be processed, data is often partitioned by time at a specific grain (degree to time granularity).

Traditional data orchestration systems force the developer to explicitly declare the dependencies between tasks, i.e. the order in which tasks are executed. You may have seen this with the >> operator in Apache Airflow. This is tedious and error prone, especially when tasks are scheduled to run at regular intervals. SDF automatically infers dependencies between the tasks’s queries. That’s right, the beautiful SQL you’ve been writing for years contains the dependencies within it. Combined with a simple schedule declaration in the table block of an sdf.yml, SDF is able to orchestrate your entire data workflow on the specified cadence with extra respect paid to partitions.

SDF infers the DAG (directed acyclic graph) of your data pipeline from the SQL statements in your workspace. Combined with a simple schedule declaration, your entire SDF workspace is automatically orchestrated with ease.

Workflow, Schedules, and Time Partitioning

Workflows, schedules, and time partitioned tables work well together to automate and orchestrate recurring data management tasks; everything from fetching data from external locations, performing data validation and transformation, and loading data into other systems. We first discuss their relationship and then look at an example in SDF.

Terminology

  • A workflow is a collection of tasks executed in a particular dependency-based order. In SDF, a task builds a table and the order of tasks is defined by a use-define relationship between tables. The goal of a workflow in SDF is to build one or more tables of a workspace. Suppose table A is defined by a select statement that references table B in its from clause. SDF turns A and B into tasks with the default goal of building both. Thanks to SDF’s handy SQL static analysis, it understands the use-define relationship and concludes B must be built first before building A (remind you of a build system yet?)

  • SDF schedules allow workflows or tasks to recur at particular intervals. SDF uses cron-expressions to specify intervals. Each time partitioned table must have an associated cron expression that describes exactly when that table is being built. For instance, attaching the cron expression “0 0 * * *” to a table T means that T is scheduled for execution every day at midnight, or (reading the entries of the cron expression from left to right) SDF builds the table at minute 0, hour 0, every day of the month, every month, every day of the week. Once a table’s task is scheduled for execution it first waits for all its upstream tasks it depends on to complete and then begins executing itself.

  • In SDF, time partitioned tables are specified by simply declaring one or more time-related columns as partition columns.
    Each time partition column also has a format string. Suppose partition a column named dt should only contain strings of the form YYYY-MM-DD. You can specify this with the format string %Y-%m-%d. The syntax of these format expressions is taken from the widely popular strftime libray.

  • The grain of a time partitioned table is the timespan between two successively scheduled iterations. For example, a daily partitioned table has a daily grain. Grains are often used to summarize or aggregate data. For instance, a daily grain might summarize a set of hourly grains. To schedule time partitioned tasks properly, grains must align, which we will discuss in more detail below.

Example

Let’s bring it all together in the form of an example. Suppose we have a data warehouse that tracks the activity of users. We want to build a table dau that contains the number of unique users that have performed some action on a given day.

Workspace File

(1) Let’s start by installing the sample dau (for daily activity of users)

sdf new --sample dau

(2) The workspace file specifies where to find source sql files as well as sdf.yml configuration files and resource files.

cat workspace.sdf.yml

workspace:
  edition: "1.3"
  includes: 
    - path: models/
  resources:
    - path: events/

(3) The resource files in this example are partitioned CSV files providing data for an external event table. Every row in this table has an action (column action) and a user id (uid). Event table partitions are assumed to be landing every six hours, each partition containing data for the corresponding six hour interval. The interval is encoded in the two partition columns: dt and hr containing the date and the hour respectively.

The event table’s schema is defined in the following .sql file.

cat src/events.sql

create external table events(
    dt varchar, 
    hr varchar, 
    action varchar, 
    uid bigint
) stored as csv  partitioned by (dt, hr) location 'events/';

(4)The SQL DDL language is not rich enough to capture all the semantics of this table, e.g. the above SQL statement does not define the temporal frequency of partitions (every 6 hours) or which format should be used for dt and hour values. All of this is specified in the configuration file events.sdf.yml. The schedule field uses a cron expression to tell SDF the data lands every day at 0,6,12 and 18 hours. The format specification for the dt and hr tell SDF how to format its values. Finally,events.sdf.yml specifies the table is external via the materialization property.

cat src/events.sdf.yml

table:
  name: events
  schedule: "0 */6 * * *"
  partitioned-by:
    - name: "dt"
      format: "%Y%m%d"
    - name: "hr"
      format: "%H"
  materialization: external

On disk the events table for two concrete days is layed out like this:

tree event

events ├── dt=20220102 │ ├── hr=00 │ │ └── data.csv │ ├── hr=06 │ │ └── data.csv │ ├── hr=12 │ │ └── data.csv │ └── hr=18 │ └── data.csv └── dt=20220103 ├── hr=00 │ └── data.csv ├── hr=06 │ └── data.csv ├── hr=12 │ └── data.csv └── hr=18 └── data.csv

This example demonstrates an “external” table defined by local files. We can also configure an external table to point to remote files — for example, in an S3 bucket.

Variables and Partitions on the Same Grain

Now that we understand our workspace, let’s look at how partitions are injected into our queries.

First, let’s try counting all actions per user by partition and represent the result as yet another time-partitioned table. The query hau.sql together with its metadata description hau.sdf.yml does so.

cat src/hau.sql

select
    dt, hr, uid, count(action) action_count_total
from
    events
where
    dt = @dt and hr = @hr
group by
    dt,hr,uid;

As you can see from the where clause, the query hau.sql is parametric. The variables @dt and @hr are implicitly created for the partition columns dt and hr of this table (as defined in hau.sdf.yml.) When a task runs for a particular six hour interval, the SDF scheduler substitutes the corresponding date and time values for these variables.

cat src/hau.sdf.yml

table:
  name: hau
  schedule: "0 */6 * * *"
  partitioned-by:
    - name: "dt"
      format: "%Y%m%d"
    - name: "hr"
      format: "%H" 

The hau.sdf.yml looks exactly like the event.sdf.yml. They share the same schedule and as such run at the same time. However, note that hau.sdf.yml is missing the materialization. Hence by default hau will be cached.

Building for a Particular Time

To build a time partitioned table for a particular time you need a binding for @dt and @hr. In SDF you provide the binding for partition variables via the --date or, as we’ll see later, the --from and --to parameters.

For instance, to count all actions per user for a particular date and time, call sdf build like so:

sdf build src/hau.sql --date 2022-01-02T00:00 --no-show

Note, the --no-show option causes SDF to omit outputting a fragment of the resulting table to STDOUT.

   Building dau.pub.events (./src/events.sql)
    Written sdftarget/dau/pub/events/dt=20220102/hr=00/
   Building dau.pub.hau (./src/hau.sql)
    Written sdftarget/dau/pub/hau/dt=20220102/hr=00/
    Written sdftarget/assembly.sdf.yml
   Finished build in 0.341 secs

The provided date parameter (which must be a prefix of RFC 3339), is mapped to partition variables using the provided format conversion.

We see the result of the execution materialized in sdftarget.

tree sdftarget

sdftarget ├── assembly.sdf.yml | └── dau | └── pub | ├── events | | │ └── dt=20220102 | | │ └── hr=00 | | │ └── part-0.parquet └── hau | └── dt=20220102 | └── hr=00 | └── part-0.parquet

Calling sdf build with a date for which there is no data results in an empty table.

Building for a a time range

To build tables for a time range, provide the flags --from and --to with valid RFC 3339 dates.

from is inclusive and to is exclusive when providing a time range to SDF via the command line

sdf build src/hau.sql --from 2022-01-02 --to 2022-01-03

    Running dau.pub.events dau.pub.hau --date 2022-01-02T00:00
   Building dau.pub.events (./src/events.sql)
   Building dau.pub.hau (./src/hau.sql)
   ---
    Running dau.pub.events dau.pub.hau --date 2022-01-02T06:00
   Building dau.pub.events (./src/events.sql)
   Building dau.pub.hau (./src/hau.sql)
   ---
    Running dau.pub.events dau.pub.hau --date 2022-01-02T12:00
   Building dau.pub.events (./src/events.sql)
   Building dau.pub.hau (./src/hau.sql)
   ---
    Running dau.pub.events dau.pub.hau --date 2022-01-02T18:00
   Building dau.pub.events (./src/events.sql)
   Building dau.pub.hau (./src/hau.sql)

As you can see from the output above, SDF just ran backfill for this data warehouse. Orchestrators like Apache Airflow run into the notorious backfill problem, where [complex programs](TODO (wolfram):insert-link) are required in order for new schema changes to be propagated back in time through partitions. In SDF, this comes fully out of the box - simply provide a time range and sdf build will do the rest.

Another aspect worth noting is that SDF’s rebuilds are smart. Suppose that you decided to rebuild an even larger time range, like --from 2022-01-02 --to 2022-01-04. Then simply say so, the system will leverage any previous work already done.

sdf build src/hau.sql --from 2022-01-02 --to 2022-01-04

    Running dau.pub.hau dau.pub.events --date 2022-01-02T00:00
    Reusing dau.pub.events, dau.pub.hau
    --
    Running dau.pub.hau dau.pub.events --date 2022-01-02T06:00
    Reusing dau.pub.events, dau.pub.hau
    --
    Running dau.pub.hau dau.pub.events --date 2022-01-02T12:00
    Reusing dau.pub.hau, dau.pub.events
    --
    Running dau.pub.hau dau.pub.events --date 2022-01-02T18:00
    Reusing dau.pub.hau, dau.pub.events
    --
    Running dau.pub.hau dau.pub.events --date 2022-01-03T00:00
   Building dau.pub.events (./src/events.sql)
   Building dau.pub.hau (./src/hau.sql)
    --
    Running dau.pub.hau dau.pub.events --date 2022-01-03T06:00
   Building dau.pub.events (./src/events.sql)
   Building dau.pub.hau (./src/hau.sql)
    --
    Running dau.pub.hau dau.pub.events --date 2022-01-03T12:00
   Building dau.pub.events (./src/events.sql)
   Building dau.pub.hau (./src/hau.sql)
    --
    Running dau.pub.hau dau.pub.events --date 2022-01-03T18:00
   Building dau.pub.events (./src/events.sql)
   Building dau.pub.hau (./src/hau.sql)

We see that all partitions that have been built before are simply reused. In fact, SDF only builds what is necessary. For more cache details, see our deep dive on the SDF Cache.

Building Partitions on Different Time Grains.

The dau table (where dau stands for daily active users) summarizes the 6-hourly hau (hourly active users) table.

cat src/hau.sql

select
    dt,uid,sum(action_count_total) as action_count_total
from
    hau
where
    dt = @dt
group by
    dt,uid

While dau references hau, it specifies its partition predicate as just dt = @dt. As such, dau summarizes all partitions of hau from the same day.

Since dau summarizes a time range you should only build dau with --from and --to. This guarantees that the proper hau partitions are landed before dau runs. Let’s see it in action.

sdf build src/dau.sql --from 2022-01-02 --to 2022-01-03

    Running dau.pub.hau dau.pub.events --date 2022-01-02T00:00
    Reusing dau.pub.events, dau.pub.hau
    Running dau.pub.hau dau.pub.events --date 2022-01-02T06:00
    Reusing dau.pub.hau, dau.pub.events
    Running dau.pub.hau dau.pub.events --date 2022-01-02T12:00
    Reusing dau.pub.events, dau.pub.hau
    Running dau.pub.hau dau.pub.events --date 2022-01-02T18:00
    Reusing dau.pub.hau, dau.pub.events
    Running dau.pub.dau --date 2022-01-02
    Reusing dau.pub.hau, dau.pub.events
   Building dau.pub.dau (./src/dau.sql)

The provided console output shows that all hourly tables are already available. As a byproduct, the only table to be built is dau.

You might be asking yourself, why wasn’t dau built earlier on? Well, that’s because SDF was clever enough to figure out that dau requires all hourly partitions to be built before it can build, otherwise the data would be innaccurate.

Recursive tables

Ever wanted to implement recursion in SQL? We did, and we did it in a way that is both easy to use and efficient. Let’s see how it works.

Non-recursive accumulation

To compute the total number of events per user over the whole lifetime of a service we could write the following query, called dau_slow.

cat src/dau_slow.sql

create table dau_slow as
select
    @dt as dt, uid, sum(action_count_total) as action_count_total
from
  dau
where
    dt < @dt
group by
    dt, uid

Initially dau_slow is actually not that slow (haha), but over time non-recursive accumulation becomes slower. This is because the query always has to recompute everything from the beginning of time. If only we had a build system to figure out what had already be done…

Recursive accumulation

An alternative way to author such accumulating tables is to define them recursively. Below, you’ll see a more efficient version of this accumulating table, which we call dau_fast. This new query establishes the invariant that each @dt partition simply stores dau_slows’s result for that day. Leveraging this invariant, dau_fast computes today’s partition by reading yesterday’s partition and composing it with today’s dau partition.

cat src/dau_fast.sql

create recursive table dau_fast as 
with 
    today as (select  * from dau where dt=@dt),
    yesterday (select * from dau_fast where to_timestamp(dt) + interval '1' day = @dt)
select 
  @dt as dt,
  coalesce(today.uid, yesterday.uid) as uid
  coalesce(today.action_total_count, 0) + (yesterday.action_total_count, 0) as action_total_count
from today
outer join yesterday 
using (uid)

Before we can execute above query we have to specify dau_fasts partition scheme and recursive materialization.

cat src/dau_fast.sdf.yml

table:
  name: dau_fast
  schedule: "0 0 * * *"
  partitioned-by:
    - name: "dt"
      format: "%Y-%m-%d"
  materialization: recursive

Now we can execute recursive queries like any other query:

sdf build src/dau_fast --from 2022-01-02 --to 2022-01-04

...TBD...

Again we see that all intermediate results, like dau have been cached and reused. Oh, the power of build systems…

Current Implementation Restrictions

The current implementation has the following restrictions

  • Time partitioned columns must be of type varchar
  • Each time partitioned query can only write to one partition at a time. That partition value must be the same as the value of a variable.
  • Time partition filters can currently only use equality predicates and can only appear in conjunctions.
  • The schedule for time partitioned tables that reference each other must be synchronized. So if A and B have the same grain they have to start on the same time. If A and B have a different grain, then A and B still must have a common time when they can be scheduled and one grain must be a multiple of the other.

Feedback

The SDF team will address these shortcomings in short order. Let them know via email what requirements and priorities you have so that your work can be prioritized.