An Apache Airflow provider for Great Expectations
A set of Airflow operators for Great Expectations, a Python library for testing and validating data.
Due to apply_default decorator removal, this version of the provider requires Airflow 2.1.0+. If your Airflow version is < 2.1.0, and you want to install this provider version, first upgrade Airflow to at least version 2.1.0. Otherwise, your Airflow package version will be upgraded automatically, and you will have to manually run airflow upgrade db to complete the migration.
This package has been most recently unit tested with apache-airflow=2.4.3
and great-expectation=0.15.34
.
Formerly, there was a separate operator for BigQuery, to facilitate the use of GCP stores. This functionality is now baked into the core Great Expectations library, so the generic Operator will work with any back-end and SQL dialect for which you have a working Data Context and Datasources.
Pre-requisites: An environment running great-expectations
and apache-airflow
- these are requirements of this package that will be installed as dependencies.
pip install airflow-provider-great-expectations
Depending on your use-case, you might need to add ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
to your Dockerfile to enable XCOM to pass data between tasks.
The operator requires a DataContext to run which can be specified either as:
Additonally, a Checkpoint may be supplied, which can be specified either as:
Although if no Checkpoint is supplied, a default one will be built.
The operator also enables you to pass in a Python dictionary containing kwargs which will be added/substituted to the Checkpoint at runtime.
Great Expectations Base Operator: A base operator for Great Expectations. Import into your DAG via:
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
The email alert functionality available in version 0.0.7
has been removed, in order to keep the purpose of the operator more narrow and related to running the Great Expectations validations, etc. There is now a validation_failure_callback
parameter to the base operator's constructor, which can be used for any kind of notification upon failure, given that the notification mechanisms provided by the Great Expectations framework itself doesn't suffice.
See the example_dags directory for an example DAG with some sample tasks that demonstrate operator functionality.
The example DAG can be exercised in one of two ways:
With the open-source Astro CLI (recommended):
Initialize a project with the Astro CLI
Copy the example DAG into the dags/
folder of your astro project
Copy the directories in the include
folder of this repository into the include
directory of your Astro project
Copy your GCP credentials.json
file into the base directory of your Astro project
Add the following to your Dockerfile
to install the airflow-provider-great-expectations
package, enable xcom pickling, and add the required Airflow variables and connection to run the example DAG:
RUN pip install --user airflow_provider_great_expectations
ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
ENV GOOGLE_APPLICATION_CREDENTIALS=/usr/local/airflow/credentials.json
ENV AIRFLOW_VAR_MY_PROJECT=<YOUR_GCP_PROJECT_ID>
ENV AIRFLOW_VAR_MY_BUCKET=<YOUR_GCS_BUCKET>
ENV AIRFLOW_VAR_MY_DATASET=<YOUR_BQ_DATASET>
ENV AIRFLOW_VAR_MY_TABLE=<YOUR_BQ_TABLE>
ENV AIRFLOW_CONN_MY_BIGQUERY_CONN_ID='google-cloud-platform://?extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fbigquery&extra__google_cloud_platform__project=bombora-dev&extra__google_cloud_platform__key_path=%2Fusr%2Flocal%2Fairflow%2Fairflow-gcp.bombora-dev.iam.gserviceaccount.com.json'
Run astro dev start
to view the DAG on a local Airflow instance (you will need Docker running)
With a vanilla Airflow installation:
dags/
foldergreat_expectations
and data
directories in include/
available in your environment.data_file
and ge_root_dir
paths in your DAG file to point to the appropriate places.great-expectations/checkpoints/*.yml
to point to the absolute path of your data files.enable_xcom_pickling
to true
in your airflow.cfgastro
CLIAny virtual environment tool can be used, but the simplest approach is likely using the venv
tool included
in the Python standard library.
For example, creating a virtual environment for development against this package can be done with the following
(assuming bash
):
# Create the virtual environment using venv:
$ python -m venv --prompt my-af-ge-venv .venv
# Activate the virtual environment:
$ . .venv/bin/activate
# Install the package and testing dependencies:
(my-af-ge-venv) $ pip install -e '.[tests]'
Once the above is done, running the unit and integration tests can be done with either of the following approaches.
pytest
The pytest
library and CLI is preferred by this project, and many Python developers, because of its
rich API, and the additional control it gives you over things like test output, test markers, etc.
It is included as a dependency in requirements.txt
.
The simple command pytest -p no:warnings
, when run in the virtual environment created with the above
process, provides a concise output when all tests pass, filtering out deprecation warnings that may be
issued by Airflow, and a only as detailed as necessary output when they dont:
(my-af-ge-venv) $ pytest -p no:warnings
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.4, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
rootdir: /Users/jpayne/repos-bombora/bombora-airflow-provider-great-expectations, configfile: pytest.ini, testpaths: tests
plugins: anyio-3.3.0
collected 7 items
tests/operators/test_great_expectations.py ....... [100%]
============================================================================================ 7 passed in 11.99s ============================================================================================
Functional testing entails simply running the example DAG using, for instance, one of the approaches outlined above, only with the adjustment that the local development package be installed in the target Airflow environment.
Again, the recommended approach is to use the Astro CLI
**This operator is in early stages of development! Feel free to submit issues, PRs, or join the #integration-airflow channel in the Great Expectations Slack for feedback. Thanks to Pete DeJoy and the Astronomer.io team for the support.