Apache Airflow as a Data Quality Checker
I have been addressing how I should maintain data quality over years as a machine learning enginner. I think checking data quality has been bothering all data guys as well. We had to inspect whether there is something wrong with data. What if can we make that automatic? Apache Airflow has a huge possibility to make our dream come true.
In this article, I would like to make airflow a automatic data quality checker.
tl:dr
- We data guys should make data quality check automatic and intuitive more.
- Apache Airflow has checker operators, such as `BigQueryValueCheckOperator`.
- Such checker allows us to check data automatically.
Motivation
Software engineering has been tackling how to make the software quality better and better. One of the great solutions is unit testing and like this, I think. Automatically testing allows us to keep software quality better.
How can we transfer such ideas for maintaining software quality to data science? It is often said that we data scientist spend 80% to check and prepare data. It doesn’t make sense. We should eliminate unnecessary cost against our essential objectives.
Airflow as a Data Quality Checker
As I mentioned, Apache Airflow has a huge possibility to be a great automatic data quality checker. As well as, airflow offers sending messages to slack and so on. Combining the features, we can implement data quality checker like unit testing.
Now, consider that we are collecting logs as JSON format like the following with BigQuery. As well as, some keys should not be nullable. For example, assume that event_id
is not allowed to be null
or blank.
{"timestamp":1500233640,"user_id":1234,"event_id":"view",...}{"timestamp":1500233641,"user_id":4321,"event_id":"post",...}
In that case, we can inspect the log quality with the below BigQuery query. If the result is not equal to 0, there are null values in the log
SELECT COUNT(*) AS event_id_null_count
FROM event_log.event_log_20170906
WHERE JSON_EXTRACT(event_id) IS NULL
How can we assemble the query with airflow directed acyclic graph?
- Make a task to check data quality from
BigQueryValueCheckOperator
. - Make a task to send a message to slack from
SlackAPIPostOperator
, when there is something wrong with the data. - Define task dependency between the first task and the second one.
dag = DAG(
'bq_event_log_checker',
default_args=default_args,
schedule_interval='@daily')# The result should be 0.
expected = 0
sql = """
SELECT COUNT(*) AS event_id_null_count
FROM event_log.event_log_{{ yesterday_ds_nodash }}
WHERE JSON_EXTRACT(event_id) IS NULL
"""checker = BigQueryValueCheckOperator(
dag=dag,
task_id='bq_checker',
bigquery_conn_id='bq_connection_id',
sql=sql,
pass_value=expected)# When the result is not the expected value, then send a message to slack.
slack = SlackAPIPostOperator(
dag=dag,
task_id='post_error_message_to_slack',
token=YOUR_SLACK_TOKEN,
channel='#data-quality',
username='airflow',
text='event_log on {{ yesterday_ds_nodash }} has record(s) whose event_id is null.',
trigger_rule=TriggerRule.ALL_FAILED)# Define task dependency
checker.set_downstream(slack)