项目作者: piotr-kalanski

项目描述 :
Data Quality Monitoring Tool
高级语言: Scala
项目地址: git://github.com/piotr-kalanski/data-quality-monitoring.git
创建时间: 2017-08-30T11:48:51Z
项目社区:https://github.com/piotr-kalanski/data-quality-monitoring

开源协议:Apache License 2.0

下载


data-quality-monitoring

Data Quality Monitoring Tool for Big Data implemented using Spark

Build Status
codecov.io

License

Table of contents

Goals

  • Validate data using provided business rules
  • Log result
  • Send alerts

Getting started

Include dependency:

  1. "com.github.piotr-kalanski" % "data-quality-monitoring_2.11" % "0.3.2"

or

  1. <dependency>
  2. <groupId>com.github.piotr-kalanski</groupId>
  3. <artifactId>data-quality-monitoring_2.11</artifactId>
  4. <version>0.3.2</version>
  5. </dependency>

Data quality monitoring process

Data quality monitoring process consists from below steps:

  • Load configuration with business rules
  • Run data validation
  • Log validation results
  • Send alerts

Load configuration

Configuration can be loaded from:

  • file
  • directory
  • RDBMS

Additionally there are plans to support:

  • Dynamo DB

Example configuration

  1. tablesConfiguration = [
  2. {
  3. location = {type = Hive, table = clients}, // location of first table that should be validated
  4. rules = { // validation rules
  5. rowRules = [ // validation rules working on single row level
  6. {
  7. field = client_id, // name of field that should be validated
  8. rules = [
  9. {type = NotNull}, // this field shouldn't be null
  10. {type = min, value = 0} // minimum value for this field is 0
  11. ]
  12. },
  13. {
  14. field = client_name,
  15. rules = [
  16. {type = NotNull} // this field shouldn't be null
  17. ]
  18. }
  19. ]
  20. }
  21. },
  22. {
  23. location = {type = Hive, table = companies}, // location of first table that should be validated
  24. rules = {
  25. rowRules = [
  26. {
  27. field = company_id, // name of field that should be validated
  28. rules = [
  29. {type = NotNull}, // this field shouldn't be null
  30. {type = max, value = 100} // maximum value for this field is 100
  31. ]
  32. },
  33. {
  34. field = company_name, // name of field that should be validated
  35. rules = [
  36. {type = NotNull} // this field shouldn't be null
  37. ]
  38. }
  39. ]
  40. }
  41. }
  42. ]

Load configuration from file

Use class: FileSingleTableConfigurationLoader or FileMultipleTablesConfigurationLoader.

Example:

  1. import com.datawizards.dqm.configuration.loader.FileMultipleTablesConfigurationLoader
  2. val configurationLoader = new FileMultipleTablesConfigurationLoader("configuration.conf")
  3. configurationLoader.loadConfiguration()

Load configuration from directory

Use class: DirectoryConfigurationLoader.

One file should contain configuration for one table (TableConfiguration).

Load configuration from database

Use class: DatabaseConfigurationLoader.

One table row should contain configuration for one table (TableConfiguration).

Validation rules

Currently supported categories of data validation rules:

  • field rules - validating value of single field e.g.: not null, min value, max value
  • group rules - validating result of group by expression e.g.: expected groups (countries, types)
  • table trend rules - validating table trend rules e.g.: comparing current day row count vs previous day row count

Field rules

Field rules should be defined in section rules.rowRules:

  1. tablesConfiguration = [
  2. {
  3. location = [...],
  4. rules = {
  5. rowRules = [
  6. {
  7. field = Field name,
  8. rules = [...]
  9. }
  10. ]
  11. }
  12. }
  13. ]

Supported field validation rules:

  • not null

    {type = NotNull}

  • dictionary

    {type = dict, values=[1,2,3]}

  • regex

    {type = regex, value = """\s.*"""}

  • min value

    {type = min, value = 0}

  • max value

    {type = max, value = 100}

Group rules

Group rules should be defined in section groups.rules:

  1. tablesConfiguration = [
  2. {
  3. location = [...],
  4. rules = [...],
  5. groups = [
  6. {
  7. name = Group name,
  8. field = Group by field name,
  9. rules = [
  10. {
  11. type = NotEmptyGroups,
  12. expectedGroups = [c1,c2,c3,c4]
  13. }
  14. ]
  15. }
  16. ]
  17. }
  18. ]

Supported group validation rules:

  • not empty groups

    {type = NotEmptyGroups, expectedGroups = [c1,c2,c3,c4]}

Table trend rules

Table trend rules should be defined in section rules.tableTrendRules:

  1. tablesConfiguration = [
  2. {
  3. location = [...],
  4. rules = {
  5. rowRules = [...],
  6. tableTrendRules = [
  7. {type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}
  8. ]
  9. }
  10. }
  11. ]

Supported table trends validation rules:

  • current vs previous day row count

    {type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}

Log validation results

Validation results can be logged into:

  • Elasticsearch using class ElasticsearchValidationResultLogger

    1. val logger = new ElasticsearchValidationResultLogger(
    2. esUrl = "http://localhost:9200", // Elasticsearch URL
    3. invalidRecordsIndexName = "invalid_records", // Index name where to store invalid records
    4. tableStatisticsIndexName = "table_statistics", // Index name where to store table statistics
    5. columnStatisticsIndexName = "column_statistics", // Index name where to store column statistics
    6. groupsStatisticsIndexName = "group_statistics", // Index name where to store group statistics
    7. invalidGroupsIndexName = "invalid_groups" // Index name where to store group statistics
    8. )
  • RDBMS using class DatabaseValidationResultLogger

    1. val logger = new DatabaseValidationResultLogger(
    2. driverClassName = "org.h2.Driver", // JDBC driver class name
    3. dbUrl = connectionString, // DB connection string
    4. connectionProperties = new Properties(), // JDBC connection properties, especially user and password
    5. invalidRecordsTableName = "INVALID_RECORDS", // name of table where to insert invalid records
    6. tableStatisticsTableName = "TABLE_STATISTICS", // name of table where to insert table statistics records
    7. columnStatisticsTableName = "COLUMN_STATISTICS", // name of table where to insert column statistics records
    8. groupsStatisticsTableName = "GROUP_STATISTICS", // name of table where to insert group by statistics records
    9. invalidGroupsTableName = "INVALID_GROUPS" // name of table where to insert invalid groups
    10. )

Send alerts

Alerts can be send to:

  • Slack using class SlackAlertSender

Additionally there are plans to support:

  • email

Full example

Example

  1. import com.datawizards.dqm.configuration.loader.FileConfigurationLoader
  2. import com.datawizards.dqm.logger.ElasticsearchValidationResultLogger
  3. import com.datawizards.dqm.alert.SlackAlertSender
  4. import com.datawizards.dqm.DataQualityMonitor
  5. val configurationLoader = new FileConfigurationLoader("configuration.conf")
  6. val esUrl = "http://localhost:9200"
  7. val invalidRecordsIndexName = "invalid_records"
  8. val tableStatisticsIndexName = "table_statistics"
  9. val columnStatisticsIndexName = "column_statistics"
  10. val groupsStatisticsIndexName = "group_statistics"
  11. val invalidGroupsIndexName = "invalid_groups"
  12. private val logger = new ElasticsearchValidationResultLogger(esUrl, invalidRecordsIndexName, tableStatisticsIndexName, columnStatisticsIndexName, groupsStatisticsIndexName, invalidGroupsIndexName)
  13. val alertSender = new SlackAlertSender("webhook url", "Slack channel", "Slack user name")
  14. val processingDate = new java.util.Date()
  15. DataQualityMonitor.run(processingDate, configurationLoader, logger, alertSender)

configuration.conf:

  1. tablesConfiguration = [
  2. {
  3. location = {type = Hive, table = clients},
  4. rules = {
  5. rowRules = [
  6. {
  7. field = client_id,
  8. rules = [
  9. {type = NotNull}
  10. ]
  11. }
  12. ]
  13. }
  14. }
  15. ]