项目作者: manesioz

项目描述 :
Airflow plug-in that allows you to automate robust Data Quality checks for BigQuery
高级语言: Python
项目地址: git://github.com/manesioz/bq_dq_plugin.git
创建时间: 2019-12-05T16:22:32Z
项目社区:https://github.com/manesioz/bq_dq_plugin

开源协议:

下载


bq_dq_plugin

Airflow plug-in that allows you to compare various metrics based on aggregated historical metrics and arbitrary thresholds.

Why?

Previously, Airflow’s BigQueryIntervalCheckOperator allowed you to compare the current day’s metrics with the metrics
of another day. This extends that functionality and allows you to compare it to aggregated metrics over an arbitrary window.
This is import for more robust data quality checks that are more resilient to noise and daily flucuation.

You could run the following query:

  1. select count(*) as NumRecords, max(column1) as MaxColumn_1
  2. from table
  3. where date_column = '{{ds}}'

which computes the number of rows and the max of column1 for today.
Now, compare it with the same metrics computed 7 days earlier:

  1. select count(*), max(column1)
  2. from {{table}}
  3. where date_column = timestamp_sub('{{ds}}', interval 7 day)

However, if you want more robust data quality checks you likely want to compare today’s metric
with an aggregated value (since that is less susceptible to daily flucutations/noise).

With this library, you can compare:

  1. select count(*) as NumRecords, max(column1) as MaxColumn_1
  2. from table
  3. where timestamp_trunc(date_column, Day) = timestamp_trunc('2019-01-01', Day)

which computes the number of records and the max of column1 for records that have date_column = '2019-01-01'.
Now, let’s compare the aggregated values:

  1. with data as (
  2. select count(*) as NumRecords, max(column1) as MaxColumn_1, timestamp_trunc(date_column, Day) as Time
  3. from table
  4. where date_column between '2018-01-01' and '2018-12-31'
  5. group by Time
  6. )
  7. select avg(NumRecords), avg(MaxColumn_1)
  8. from data

Like the BigQueryIntervalCheckOperator, you can pass a dict which contains all metrics and their associated thresholds as key/value pairs.

In addition to this functionality, there will be some out-of-the-box checks including:

Numerical

  • num_records
  • percent_null
  • mean
  • std_dev
  • num_zero
  • min
  • median
  • max

Categorical

  • num_records
  • percent_null
  • num_unique
  • top
  • top_freq
  • avg_str_len

This plugin will allow you to create more complex custom checks as well.

Example usage (in a DAG)

  1. from airflow import DAG
  2. from datetime import datetime, timedelta
  3. from bq_dq_plugin.operators.big_query_aggregate_check_operator import BigQueryAggregateCheckOperator
  4. default_args = {
  5. 'owner': 'Zachary Manesiotis',
  6. 'depends_on_past': False,
  7. 'start_date': datetime(2019, 12, 10),
  8. 'email': ['zacl.manesiotis@gmail.com'],
  9. 'email_on_failure': True,
  10. 'email_on_retry': False,
  11. 'retries': 3,
  12. 'retry_delay': timedelta(minutes=1)
  13. }
  14. with DAG('Dag_ID', schedule_interval='@weekly', max_active_runs=15, catchup=False, default_args=default_args) as dag:
  15. data_quality_check = BigQueryAggregateCheckOperator(
  16. task_id='data_quality_check',
  17. table='`project.dataset.table`',
  18. metrics_thresholds={'count(*)': 1.5, 'max(Column1)': 1.6},
  19. date_filter_column='DateTime',
  20. agg_time_period='Day',
  21. start_date='2019-01-01',
  22. end_date='2019-12-01',
  23. gcp_conn_id=CONNECTION_ID,
  24. use_legacy_sql=False,
  25. dag=dag
  26. )
  27. data_quality_check