Data Quality Monitoring Tool
Data Quality Monitoring Tool for Big Data implemented using Spark
Include dependency:
"com.github.piotr-kalanski" % "data-quality-monitoring_2.11" % "0.3.2"
or
<dependency>
<groupId>com.github.piotr-kalanski</groupId>
<artifactId>data-quality-monitoring_2.11</artifactId>
<version>0.3.2</version>
</dependency>
Data quality monitoring process consists from below steps:
Configuration can be loaded from:
Additionally there are plans to support:
tablesConfiguration = [
{
location = {type = Hive, table = clients}, // location of first table that should be validated
rules = { // validation rules
rowRules = [ // validation rules working on single row level
{
field = client_id, // name of field that should be validated
rules = [
{type = NotNull}, // this field shouldn't be null
{type = min, value = 0} // minimum value for this field is 0
]
},
{
field = client_name,
rules = [
{type = NotNull} // this field shouldn't be null
]
}
]
}
},
{
location = {type = Hive, table = companies}, // location of first table that should be validated
rules = {
rowRules = [
{
field = company_id, // name of field that should be validated
rules = [
{type = NotNull}, // this field shouldn't be null
{type = max, value = 100} // maximum value for this field is 100
]
},
{
field = company_name, // name of field that should be validated
rules = [
{type = NotNull} // this field shouldn't be null
]
}
]
}
}
]
Use class: FileSingleTableConfigurationLoader
or FileMultipleTablesConfigurationLoader
.
Example:
import com.datawizards.dqm.configuration.loader.FileMultipleTablesConfigurationLoader
val configurationLoader = new FileMultipleTablesConfigurationLoader("configuration.conf")
configurationLoader.loadConfiguration()
Use class: DirectoryConfigurationLoader
.
One file should contain configuration for one table (TableConfiguration).
Use class: DatabaseConfigurationLoader
.
One table row should contain configuration for one table (TableConfiguration).
Currently supported categories of data validation rules:
Field rules should be defined in section rules.rowRules
:
tablesConfiguration = [
{
location = [...],
rules = {
rowRules = [
{
field = Field name,
rules = [...]
}
]
}
}
]
Supported field validation rules:
not null
{type = NotNull}
dictionary
{type = dict, values=[1,2,3]}
regex
{type = regex, value = """\s.*"""}
min value
{type = min, value = 0}
max value
{type = max, value = 100}
Group rules should be defined in section groups.rules
:
tablesConfiguration = [
{
location = [...],
rules = [...],
groups = [
{
name = Group name,
field = Group by field name,
rules = [
{
type = NotEmptyGroups,
expectedGroups = [c1,c2,c3,c4]
}
]
}
]
}
]
Supported group validation rules:
not empty groups
{type = NotEmptyGroups, expectedGroups = [c1,c2,c3,c4]}
Table trend rules should be defined in section rules.tableTrendRules
:
tablesConfiguration = [
{
location = [...],
rules = {
rowRules = [...],
tableTrendRules = [
{type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}
]
}
}
]
Supported table trends validation rules:
current vs previous day row count
{type = CurrentVsPreviousDayRowCountIncrease, tresholdPercentage = 20}
Validation results can be logged into:
Elasticsearch using class ElasticsearchValidationResultLogger
val logger = new ElasticsearchValidationResultLogger(
esUrl = "http://localhost:9200", // Elasticsearch URL
invalidRecordsIndexName = "invalid_records", // Index name where to store invalid records
tableStatisticsIndexName = "table_statistics", // Index name where to store table statistics
columnStatisticsIndexName = "column_statistics", // Index name where to store column statistics
groupsStatisticsIndexName = "group_statistics", // Index name where to store group statistics
invalidGroupsIndexName = "invalid_groups" // Index name where to store group statistics
)
RDBMS using class DatabaseValidationResultLogger
val logger = new DatabaseValidationResultLogger(
driverClassName = "org.h2.Driver", // JDBC driver class name
dbUrl = connectionString, // DB connection string
connectionProperties = new Properties(), // JDBC connection properties, especially user and password
invalidRecordsTableName = "INVALID_RECORDS", // name of table where to insert invalid records
tableStatisticsTableName = "TABLE_STATISTICS", // name of table where to insert table statistics records
columnStatisticsTableName = "COLUMN_STATISTICS", // name of table where to insert column statistics records
groupsStatisticsTableName = "GROUP_STATISTICS", // name of table where to insert group by statistics records
invalidGroupsTableName = "INVALID_GROUPS" // name of table where to insert invalid groups
)
Alerts can be send to:
SlackAlertSender
Additionally there are plans to support:
import com.datawizards.dqm.configuration.loader.FileConfigurationLoader
import com.datawizards.dqm.logger.ElasticsearchValidationResultLogger
import com.datawizards.dqm.alert.SlackAlertSender
import com.datawizards.dqm.DataQualityMonitor
val configurationLoader = new FileConfigurationLoader("configuration.conf")
val esUrl = "http://localhost:9200"
val invalidRecordsIndexName = "invalid_records"
val tableStatisticsIndexName = "table_statistics"
val columnStatisticsIndexName = "column_statistics"
val groupsStatisticsIndexName = "group_statistics"
val invalidGroupsIndexName = "invalid_groups"
private val logger = new ElasticsearchValidationResultLogger(esUrl, invalidRecordsIndexName, tableStatisticsIndexName, columnStatisticsIndexName, groupsStatisticsIndexName, invalidGroupsIndexName)
val alertSender = new SlackAlertSender("webhook url", "Slack channel", "Slack user name")
val processingDate = new java.util.Date()
DataQualityMonitor.run(processingDate, configurationLoader, logger, alertSender)
configuration.conf:
tablesConfiguration = [
{
location = {type = Hive, table = clients},
rules = {
rowRules = [
{
field = client_id,
rules = [
{type = NotNull}
]
}
]
}
}
]