Apache Airflow integration for dbt
This is a collection of Airflow operators to provide easy integration with dbt.
from airflow import DAG
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)
from airflow.utils.dates import days_ago
default_args = {
'dir': '/srv/app/dbt',
'start_date': days_ago(0)
}
with DAG(dag_id='dbt', default_args=default_args, schedule_interval='@daily') as dag:
dbt_seed = DbtSeedOperator(
task_id='dbt_seed',
)
dbt_snapshot = DbtSnapshotOperator(
task_id='dbt_snapshot',
)
dbt_run = DbtRunOperator(
task_id='dbt_run',
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
retries=0, # Failing tests would fail the task, and we don't want Airflow to try again
)
dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test
Install from PyPI:
pip install airflow-dbt
It will also need access to the dbt
CLI, which should either be on your PATH
or can be set with the dbt_bin
argument in each operator.
There are five operators currently implemented:
DbtDocsGenerateOperator
dbt docs generate
DbtDepsOperator
dbt deps
DbtSeedOperator
dbt seed
DbtSnapshotOperator
dbt snapshot
DbtRunOperator
dbt run
DbtTestOperator
dbt test
Each of the above operators accept the following arguments:
profiles_dir
--profiles-dir
argument to the dbt
commandtarget
--target
argument to the dbt
commanddir
dbt
command infull_refresh
True
, passes --full-refresh
vars
--vars
argument to the dbt
command. Should be set as a Python dictionary, as will be passed to the dbt
command as YAMLmodels
--models
argument to the dbt
commandexclude
--exclude
argument to the dbt
commandselect
--select
argument to the dbt
commanddbt_bin
dbt
CLI. Defaults to dbt
, so assumes it's on your PATH
verbose
warn_error
True
, passes --warn-error
argument to dbt
command and will treat warnings as errorsTypically you will want to use the DbtRunOperator
, followed by the DbtTestOperator
, as shown earlier.
You can also use the hook directly. Typically this can be used for when you need to combine the dbt
command with another task in the same operators, for example running dbt docs
and uploading the docs to somewhere they can be served from.
To install from the repository: First it's recommended to create a virtual environment:
python3 -m venv .venv
source .venv/bin/activate
Install using pip
:
pip install .
To run tests locally, first create a virtual environment (see Building Locally section)
Install dependencies:
pip install . pytest
Run the tests:
pytest tests/
This project uses flake8.
To check your code, first create a virtual environment (see Building Locally section):
pip install flake8
flake8 airflow_dbt/ tests/ setup.py
If you use dbt's package manager you should include all dependencies before deploying your dbt project.
For Docker users, packages specified in packages.yml
should be included as part your docker image by calling dbt deps
in your Dockerfile
.
If you use MWAA, you just need to update the requirements.txt
file and add airflow-dbt
and dbt
to it.
Then you can have your dbt code inside a folder {DBT_FOLDER}
in the dags folder on S3 and configure the dbt task like below:
dbt_run = DbtRunOperator(
task_id='dbt_run',
dbt_bin='/usr/local/airflow/.local/bin/dbt',
profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
dir='/usr/local/airflow/dags/{DBT_FOLDER}/'
)
GoCardless ♥ open source. If you do too, come join us.