A data pipeline moving data from a Relational database system (RDBMS) to a Hadoop file system (HDFS).
This project aims to move the data from a Relational database system (RDBMS) to a Hadoop file system (HDFS). Here are the technology stacks we will use in this project.
docker
file based on your use cases.docker
in both traditional and distributed environments.sqoop
, hive
, and spark
.linux/amd64
I provide you a docker-compose file so that you can run the whole application with the following command.
docker-compose up -d
Then you can access the Airflow UI webserver through port 8080
Please feel free to turn the dag button on for the hands_on_test
.
It sets a start_date
to days_ago(1) and schedule to run on a daily basis.
Assume that the pipeline is run completely. You can test the result on the following components like this.
# show table in database
docker exec postgres-db psql -U postgres -d lineman_wongnai -c \\dt
# describe table
docker exec postgres-db psql -U postgres -d lineman_wongnai -c "
SELECT
table_name,
column_name,
data_type
FROM
information_schema.columns
WHERE
table_name = '<<TARGET TABLE>>';
"
# change <<TARGET TABLE>> to your table name e.g., 'order_detail', 'restaurant_detail'
# sample data
docker exec postgres-db psql -U postgres -d lineman_wongnai -c "SELECT * FROM <<TARGET TABLE>> LIMIT 5;"
# change <<TARGET TABLE>> to your table name e.g., 'order_detail', 'restaurant_detail'
# show tables
docker exec hive-server beeline -u jdbc:hive2://localhost:10000/default -e "SHOW TABLES;"
# describe table
docker exec hive-server beeline -u jdbc:hive2://localhost:10000/default -e "SHOW CREATE TABLE <<TARGET TABLE>>;"
# change <<TARGET TABLE>> to your table name e.g., 'order_detail', 'restaurant_detail'
# sample data
docker exec hive-server beeline -u jdbc:hive2://localhost:10000/default -e "SELECT * FROM <<TARGET TABLE>> LIMIT 5;"
# change <<TARGET TABLE>> to your table name e.g., 'order_detail', 'restaurant_detail'
# check partitioned parquet
docker exec hive-server hdfs dfs -ls /user/spark/transformed_order_detail
docker exec hive-server hdfs dfs -ls /user/spark/transformed_restaurant_detail
# check the source of external table in ./airflow/scripts/hql script.
For SQL requirement files, the CSV files will be placed in the ./sql_result
when the dag is completed.
After you finish the test, you can close the whole application by
docker-compose down -v
docker-compose.yaml
file. Changing it to another open port. e.g., '10000:'10000'
to '10001:10000'
arm64
architecture to run this project, the import_sqoop
task will be failed. Please swith to run it in the linux/amd64
architecture.Create two tables in postgres database with the above given column types.
order_detail.csv
restaurant_detail.csv
COPY
command in airflow/dags/script/sql_queries.py
Once we have these two tables in postgres DB, ETL the same tables to Hive with the same names and corresponding Hive data type using the below guidelines
external table
parquet file format
dt
(type string) with a static value latest
dt
(type string) extracted from order_created_timestamp
in the format YYYYMMDD
DESCRIBE TABLE
and SAMPLE DATA
commands in the previous HIVE section.After creating the above tables in Hive, create two new tables order_detail_new and restaurant_detail_new with their respective columns and partitions and add one new column for each table as explained below.
discount_no_null
- replace all the NULL values of discount column with 0cooking_bin
- using esimated_cooking_time column and the below logicnull
value.Check SAMPLE DATA
commands in the previous HIVE section. You can edit query to test the requirement such as SELECT COUNT(*) FROM order_detail_new WHERE discount_no_null IS NULL
. The expected result would be 0.
|estimated_cooking_time |cooking_bin|
|——- |———— |
|10-40 |1 |
|41-80 |2 |
|81-120 |3 |
|> 120 |4 |
Final column count of each table (including partition column):
DESCRIBE TABLE
command in the previous HIVE section.SQL requirements & CSV output requirements
discount
or discount_no_null
columns, so I calculate it both. It could lead to different business interpretation../sql_result
for discount.csv
and cooking.csv
airflow
. It will run on a daily basis. Each task will have 1 times retry.PostgresOperator
to submit the SQL script provided in airflow/scripts/sql_queries.py
to create postgres tables.PostgresDataQualityOperator
to check that the number of row for each table matched with the source data in ./data
folder.sqoop
. Unfortunately, with the best understanding I have about the HADOOP ecosystem, I can’t set up the sqoop
container for isolating this operation. So, I work around by installing sqoop
component in hive-server
. Then I use the BashOperator
to ingest the data from shell script. The data will be placed in hive-server
hdfs filesystem under /user/sqoop/
folder. For the source code and required components for sqoop you can find it in ./airflow/othres/sqoop
SparkSubmitOperator
, and HiveOperator
from the airflow
containers. So I work around by modified the airflow docker image to be able to use the docker
command within the container. You can find the script to build that image in dockerfile
and requirements.txt
files. I have already pushed modified image to docker hub so you don’t to to build it yourself.docker exec
command. It use the BashOperator
to run those scripts. For spark script, you can find the source code in ./airflow/dags/scripts/spark
. For hive script, you can find the source code in ./airflow/dags/scripts/hive
Postgres
and Hive
I use the drop-create
style . For the spark output, the default mode is overwrite
. ./sql_result
folder.