Workflows, Schedules, & Partitions
The modern data warehouse at scale is populated, edited, and analyzed via a set of data workflows (or pipelines). Without these, data warehouses would be static and would not be able to keep up with the ever-changing needs of the business. A data pipeline or workflow is a sequence of data processing steps, e.g. fetching data from external sources, transforming data, and loading data into other systems. In order to keep downstream systems up to date, data pipelines are often scheduled to run at regular intervals. Furthermore, to limit the amount of data that needs to be processed, data is often partitioned by time at a specific grain (degree to time granularity).
Traditional data orchestration systems force the developer to explicitly declare the dependencies between tasks, i.e. the order in which tasks are executed. You may have seen this with the >>
operator in Apache Airflow. This is tedious and error prone, especially when tasks are scheduled to run at regular intervals.
SDF automatically infers dependencies between the tasks’s queries. That’s right, the beautiful SQL you’ve been writing for years contains the dependencies within it. Combined with a simple schedule declaration in the table block of an sdf.yml, SDF is able to orchestrate your entire data workflow on the specified cadence with extra respect paid to partitions.
SDF infers the DAG (directed acyclic graph) of your data pipeline from the SQL statements in your workspace. Combined with a simple schedule declaration, your entire SDF workspace is automatically orchestrated with ease.
Workflow, Schedules, and Time Partitioning
Workflows, schedules, and time partitioned tables work well together to automate and orchestrate recurring data management tasks; everything from fetching data from external locations, performing data validation and transformation, and loading data into other systems. We first discuss their relationship and then look at an example in SDF.
Terminology
-
A workflow is a collection of tasks executed in a particular dependency-based order. In SDF, a task builds a table and the order of tasks is defined by a use-define relationship between tables. The goal of a workflow in SDF is to build one or more tables of a workspace. Suppose table
A
is defined by aselect
statement that references tableB
in itsfrom
clause. SDF turnsA
andB
into tasks with the default goal of building both. Thanks to SDF’s handy SQL static analysis, it understands the use-define relationship and concludesB
must be built first before buildingA
(remind you of a build system yet?) -
SDF schedules allow workflows or tasks to recur at particular intervals. SDF uses cron-expressions to specify intervals. Each time partitioned table must have an associated cron expression that describes exactly when that table is being built. For instance, attaching the cron expression “0 0 * * *” to a table
T
means thatT
is scheduled for execution every day at midnight, or (reading the entries of the cron expression from left to right) SDF builds the table at minute 0, hour 0, every day of the month, every month, every day of the week. Once a table’s task is scheduled for execution it first waits for all its upstream tasks it depends on to complete and then begins executing itself. -
In SDF, time partitioned tables are specified by simply declaring one or more time-related columns as partition columns.
Each time partition column also has a format string. Suppose partition a column nameddt
should only contain strings of the formYYYY-MM-DD
. You can specify this with the format string%Y-%m-%d
. The syntax of these format expressions is taken from the widely popular strftime libray. -
The grain of a time partitioned table is the timespan between two successively scheduled iterations. For example, a daily partitioned table has a daily grain. Grains are often used to summarize or aggregate data. For instance, a daily grain might summarize a set of hourly grains. To schedule time partitioned tasks properly, grains must align, which we will discuss in more detail below.
Example
Let’s bring it all together in the form of an example. Suppose we have a data warehouse that tracks the activity of users. We want to build a table dau
that contains the number of unique users that have performed some action on a given day.
Workspace File
(1) Let’s start by installing the sample dau
(for daily activity of users)
sdf new --sample dau
(2) The workspace file specifies where to find source sql files as well as sdf.yml configuration files and resource files.
cat workspace.sdf.yml
workspace:
edition: "1.3"
includes:
- path: models/
resources:
- path: events/
(3) The resource files in this example are partitioned CSV files providing data for an external event table.
Every row in this table has an action (column action
) and a user id (uid
).
Event table partitions are assumed to be landing every six hours, each partition containing data for the corresponding six hour interval. The interval is encoded in the two partition columns: dt
and hr
containing the date and the hour respectively.
The event table’s schema is defined in the following .sql
file.
cat src/events.sql
create external table events(
dt varchar,
hr varchar,
action varchar,
uid bigint
) stored as csv partitioned by (dt, hr) location 'events/';
(4)The SQL DDL language is not rich enough to capture all the semantics of this table, e.g. the above SQL statement does not define the temporal frequency of partitions (every 6 hours) or which format should be used for dt
and hour
values.
All of this is specified in the configuration file events.sdf.yml
. The schedule
field uses a cron expression to tell SDF the data lands every day at 0,6,12 and 18 hours. The format
specification for the dt
and hr
tell SDF how to format its values. Finally,events.sdf.yml
specifies the table is external via the materialization
property.
cat src/events.sdf.yml
table:
name: events
schedule: "0 */6 * * *"
partitioned-by:
- name: "dt"
format: "%Y%m%d"
- name: "hr"
format: "%H"
materialization: external
On disk the events
table for two concrete days is layed out like this:
tree event
events ├── dt=20220102 │ ├── hr=00 │ │ └── data.csv │ ├── hr=06 │ │ └── data.csv │ ├── hr=12 │ │ └── data.csv │ └── hr=18 │ └── data.csv └── dt=20220103 ├── hr=00 │ └── data.csv ├── hr=06 │ └── data.csv ├── hr=12 │ └── data.csv └── hr=18 └── data.csv
This example demonstrates an “external” table defined by local files. We can also configure an external table to point to remote files — for example, in an S3 bucket.
Variables and Partitions on the Same Grain
Now that we understand our workspace, let’s look at how partitions are injected into our queries.
First, let’s try counting all actions per user by partition and represent the result as yet another time-partitioned table. The query hau.sql
together with its metadata description hau.sdf.yml
does so.
cat src/hau.sql
select
dt, hr, uid, count(action) action_count_total
from
events
where
dt = @dt and hr = @hr
group by
dt,hr,uid;
As you can see from the where
clause, the query hau.sql
is parametric. The variables @dt
and @hr
are implicitly created for the partition columns dt
and hr
of this table (as defined in hau.sdf.yml
.) When a task runs for a particular six hour interval, the SDF scheduler substitutes the corresponding date and time values for these variables.
cat src/hau.sdf.yml
table:
name: hau
schedule: "0 */6 * * *"
partitioned-by:
- name: "dt"
format: "%Y%m%d"
- name: "hr"
format: "%H"
The hau.sdf.yml
looks exactly like the event.sdf.yml
. They share the same schedule and as such run at the same time. However, note that hau.sdf.yml
is missing the materialization
. Hence by default hau
will be cached.
Building for a Particular Time
To build a time partitioned table for a particular time you need a binding for @dt
and @hr
. In SDF you provide the binding for partition variables via the --date
or, as we’ll see later, the --from
and --to
parameters.
For instance, to count all actions per user for a particular date and time, call sdf build
like so:
sdf build src/hau.sql --date 2022-01-02T00:00 --no-show
Note, the --no-show
option causes SDF to omit outputting a fragment of the resulting table to STDOUT.
Building dau.pub.events (./src/events.sql)
Written sdftarget/dau/pub/events/dt=20220102/hr=00/
Building dau.pub.hau (./src/hau.sql)
Written sdftarget/dau/pub/hau/dt=20220102/hr=00/
Written sdftarget/assembly.sdf.yml
Finished build in 0.341 secs
The provided date parameter (which must be a prefix of RFC 3339), is mapped to partition variables using the provided format conversion.
We see the result of the execution materialized in sdftarget
.
tree sdftarget
sdftarget ├── assembly.sdf.yml | └── dau | └── pub | ├── events | | │ └── dt=20220102 | | │ └── hr=00 | | │ └── part-0.parquet └── hau | └── dt=20220102 | └── hr=00 | └── part-0.parquet
Calling sdf build
with a date for which there is no data results in an empty table.
Building for a a time range
To build tables for a time range, provide the flags --from
and --to
with valid RFC 3339 dates.
from
is inclusive and to
is exclusive when providing a time range to SDF via the command line
sdf build src/hau.sql --from 2022-01-02 --to 2022-01-03
Running dau.pub.events dau.pub.hau --date 2022-01-02T00:00
Building dau.pub.events (./src/events.sql)
Building dau.pub.hau (./src/hau.sql)
---
Running dau.pub.events dau.pub.hau --date 2022-01-02T06:00
Building dau.pub.events (./src/events.sql)
Building dau.pub.hau (./src/hau.sql)
---
Running dau.pub.events dau.pub.hau --date 2022-01-02T12:00
Building dau.pub.events (./src/events.sql)
Building dau.pub.hau (./src/hau.sql)
---
Running dau.pub.events dau.pub.hau --date 2022-01-02T18:00
Building dau.pub.events (./src/events.sql)
Building dau.pub.hau (./src/hau.sql)
As you can see from the output above, SDF just ran backfill for this data warehouse. Orchestrators like Apache Airflow run into the notorious backfill problem, where [complex programs](TODO (wolfram):insert-link) are required in order for new schema changes to be propagated back in time through partitions. In SDF, this comes fully out of the box - simply provide a time range and sdf build
will do the rest.
Another aspect worth noting is that SDF’s rebuilds are smart. Suppose that you decided to rebuild an even larger time range, like --from 2022-01-02 --to 2022-01-04
. Then simply say so, the system will leverage any previous work already done.
sdf build src/hau.sql --from 2022-01-02 --to 2022-01-04
Running dau.pub.hau dau.pub.events --date 2022-01-02T00:00
Reusing dau.pub.events, dau.pub.hau
--
Running dau.pub.hau dau.pub.events --date 2022-01-02T06:00
Reusing dau.pub.events, dau.pub.hau
--
Running dau.pub.hau dau.pub.events --date 2022-01-02T12:00
Reusing dau.pub.hau, dau.pub.events
--
Running dau.pub.hau dau.pub.events --date 2022-01-02T18:00
Reusing dau.pub.hau, dau.pub.events
--
Running dau.pub.hau dau.pub.events --date 2022-01-03T00:00
Building dau.pub.events (./src/events.sql)
Building dau.pub.hau (./src/hau.sql)
--
Running dau.pub.hau dau.pub.events --date 2022-01-03T06:00
Building dau.pub.events (./src/events.sql)
Building dau.pub.hau (./src/hau.sql)
--
Running dau.pub.hau dau.pub.events --date 2022-01-03T12:00
Building dau.pub.events (./src/events.sql)
Building dau.pub.hau (./src/hau.sql)
--
Running dau.pub.hau dau.pub.events --date 2022-01-03T18:00
Building dau.pub.events (./src/events.sql)
Building dau.pub.hau (./src/hau.sql)
We see that all partitions that have been built before are simply reused. In fact, SDF only builds what is necessary. For more cache details, see our deep dive on the SDF Cache.
Building Partitions on Different Time Grains.
The dau
table (where dau
stands for daily active users) summarizes the 6-hourly hau
(hourly active users) table.
cat src/hau.sql
select
dt,uid,sum(action_count_total) as action_count_total
from
hau
where
dt = @dt
group by
dt,uid
While dau
references hau
, it specifies its partition predicate as just dt = @dt
. As such, dau
summarizes all partitions of hau
from the same day.
Since dau
summarizes a time range you should only build dau
with --from
and --to
. This guarantees that the proper hau
partitions are landed before dau
runs. Let’s see it in action.
sdf build src/dau.sql --from 2022-01-02 --to 2022-01-03
Running dau.pub.hau dau.pub.events --date 2022-01-02T00:00
Reusing dau.pub.events, dau.pub.hau
Running dau.pub.hau dau.pub.events --date 2022-01-02T06:00
Reusing dau.pub.hau, dau.pub.events
Running dau.pub.hau dau.pub.events --date 2022-01-02T12:00
Reusing dau.pub.events, dau.pub.hau
Running dau.pub.hau dau.pub.events --date 2022-01-02T18:00
Reusing dau.pub.hau, dau.pub.events
Running dau.pub.dau --date 2022-01-02
Reusing dau.pub.hau, dau.pub.events
Building dau.pub.dau (./src/dau.sql)
The provided console output shows that all hourly tables are already available. As a byproduct, the only table to be built is dau
.
You might be asking yourself, why wasn’t dau
built earlier on? Well, that’s because SDF was clever enough to figure out that dau
requires all hourly partitions to be built before it can build, otherwise the data would be innaccurate.
Recursive tables
Ever wanted to implement recursion in SQL? We did, and we did it in a way that is both easy to use and efficient. Let’s see how it works.
Non-recursive accumulation
To compute the total number of events per user over the whole lifetime of a service we could write the following query, called dau_slow
.
cat src/dau_slow.sql
create table dau_slow as
select
@dt as dt, uid, sum(action_count_total) as action_count_total
from
dau
where
dt < @dt
group by
dt, uid
Initially dau_slow
is actually not that slow (haha), but over time non-recursive accumulation becomes slower. This is because the query always has to recompute everything from the beginning of time. If only we had a build system to figure out what had already be done…
Recursive accumulation
An alternative way to author such accumulating tables is to define them recursively. Below, you’ll see a more efficient version of this accumulating table, which we call dau_fast
. This new query establishes the invariant that each @dt
partition simply stores dau_slows
’s result for that day. Leveraging this invariant, dau_fast
computes today’s partition by reading yesterday’s partition and composing it with today’s dau
partition.
cat src/dau_fast.sql
create recursive table dau_fast as
with
today as (select * from dau where dt=@dt),
yesterday (select * from dau_fast where to_timestamp(dt) + interval '1' day = @dt)
select
@dt as dt,
coalesce(today.uid, yesterday.uid) as uid
coalesce(today.action_total_count, 0) + (yesterday.action_total_count, 0) as action_total_count
from today
outer join yesterday
using (uid)
Before we can execute above query we have to specify dau_fast
s partition scheme and recursive
materialization.
cat src/dau_fast.sdf.yml
table:
name: dau_fast
schedule: "0 0 * * *"
partitioned-by:
- name: "dt"
format: "%Y-%m-%d"
materialization: recursive
Now we can execute recursive queries like any other query:
sdf build src/dau_fast --from 2022-01-02 --to 2022-01-04
...TBD...
Again we see that all intermediate results, like dau
have been cached and reused. Oh, the power of build systems…
Current Implementation Restrictions
The current implementation has the following restrictions
- Time partitioned columns must be of type
varchar
- Each time partitioned query can only write to one partition at a time. That partition value must be the same as the value of a variable.
- Time partition filters can currently only use equality predicates and can only appear in conjunctions.
- The schedule for time partitioned tables that reference each other must be synchronized. So if
A
andB
have the same grain they have to start on the same time. IfA
andB
have a different grain, thenA
andB
still must have a common time when they can be scheduled and one grain must be a multiple of the other.
Feedback
The SDF team will address these shortcomings in short order. Let them know via email what requirements and priorities you have so that your work can be prioritized.