Use Airflow-like macros in dbt
I used to use create BigQuery tables with Apache Airflow. These days, I am migrating the queries to dbt, but still use airflow to schedule dbt jobs. One of the obstacles to migrate is Airflow-unique jinja2 macros, such as ds
and ts
. So, I implemented a dbt package to use airflow-like macros.
The macros would be very useful to specify proceeded data in WHERE
clause, especially when a table is very large and is partitioned by date and time. Consider if we calculate total amounts on transactions on a certain day and insert the results to a partition of another table. The ds
macros enables us to specify the date as below.
SELECT
DATE("{{ ds }}") AS transaction_date
, user_id
, COUNT(amount) AS total_amount_per_day
FROM transactions
WHERE DATE(created_at) = "{{ ds }}"
GROUP BY 1, 2
The open-sourced dbt doesn’t have such macros at the time of writing this article, as the official documentation shows the list of macros. The query above can be translated to the one below with the macro which I implemented.
SELECT
DATE("{{ dbt_airflow_macros.ds() }}") AS transaction_date
, user_id
, COUNT(amount) AS total_amount_per_day
FROM transactions
WHERE DATE(created_at) = "{{ dbt_airflow_macros.ds()}}"
GROUP BY 1, 2
One of the differences from airflow is we have to pass the environment variable for “execution date”, when we execute thedbt
CLI. If it is not set, then the system local time is set to “execution date” no matter what timezone is.
EXECUTION_DATE="2020-01-01T01:23:45" dbt run
As you might know if you are airflow users, we can also pass execution_date
to some operators, such as python operators and kubernetes pod operators , with context. If we want to keep using Airflow to schedule dbt jobs, we can leverage the airflow-like macros easily. Actually, the macros solved my issue. As well as, I hope it would be useful to you too.