Build an Airflow pipeline from Amazon s3 to Amazon Redshift
About an ETL pipeline for a data lake hosted on S3.
A music streaming company, Sparkify, has decided that it is time to introduce more automation and monitoring to their data warehouse ETL pipelines and come to the conclusion that the best tool to achieve this is Apache Airflow.
This traffic activity generates a large number of different events that need to be collected, transformed, reconciled, analyzed and understood in order to fully understand customer needs and improve Sparkify’s services. Several tools such as Amazon S3 or Redshift allow to solve most dimensioning problems.
Airflow allows to define, plan, execute, control and manage workflows in pure Python code, while providing the tools and user interface to manage these workflow operations.
The data pipeline should be:
The twice datasetset reside in S3:
Song data: s3://udacity-dend/song_data
Log data: s3://udacity-dend/log_data
The first dataset is a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song’s track ID.
song_data/A/B/C/TRABCEI128F424C983.json
song_data/A/A/B/TRAABJL12903CDCF1A.json
And below is an example of what a single song file, TRAABJL12903CDCF1A.json, looks like.
{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
The second dataset consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate app activity logs from an imaginary music streaming app based on configuration settings.
The log files in the dataset you’ll be working with are partitioned by year and month.
log_data/2018/11/2018-11-12-events.json
log_data/2018/11/2018-11-13-events.json
And below is an example of what the data in a log file, 2018-11-12-events.json, looks like.
If you don’t need Docke setup, Redshift scripts and Makefile, go to Last word to setup Connections in the Airflow web UI.
To setup, I first needed to create infrastructure to run Airflow and Aws. It’s looked like this:
systemctl start docker
psyco
conda create --quiet --yes --name psyco psycopg2 boto3 configparser numpy pandas tabulate botocore
conda install --yes -c conda-forge ipython-sql
Then, setup the
conda activate psyco # run
conda deactivate # close
conda remove --name psyco --all # remove
conda info --envs # check psyco setup
./redshift/dwh.cfg
file with your AWS credentials. The mycluster.py
python script, create a roleArn, a Redshift Cluster and a connection with the Vpc default Group. For now, we have “psyco” Conda environment and Docker container setup. This Makefile allows me to use commands in a simple way and often.
make
to get the help for this Makefileconda activate psyco
make redshift
make config
, ./airflow-secret.yaml
with your credentials. It’s a temporary file which will be deleted with the make run
command just after it has been copied to the new container. make run
make clean
Down docker and remove filesmake stop
Down Redshift
sudo systemctl # start Docker
docker images # list docker images
docker rmi -f $(docker images -qa) # delete images
docker stop $(docker ps -a -q) # stop container running
docker rm $(docker ps -a -q) # delete container
docker-compose up -d # run docker
docker-compose up -d --build # run docker and buil image
docker-compose logs # Displays log output
docker-compose ps # List containers
If you want to run airflow sub-commands, you can do so like this:
docker-compose run --rm webserver airflow list_dags # List dags
docker-compose run --rm webserver airflow test [DAG_ID] [TASK_ID] [EXECUTION_DATE] # Test specific task
If you want to run/test python script, you can do so like this:
docker-compose run --rm webserver python /usr/local/airflow/dags/[PYTHON-FILE].py # Test python script
docker exec -i -t $(shell docker ps -q --filter ancestor=puckel/docker-airflow) /bin/bash # run bash command in the container e.g make tty
docker exec -i -t $(shell docker ps -q --filter ancestor=postgres:9.6) psql -U airflow # run psql command in the container and check metadatabase Airflow e.g make psql
id: aws_credential
type: aws
login: <AWS_ACCESS_KEY_ID>
password: <AWS_SECRET_ACCESS_KEY>
This creates hooks in Airflow that Dags can use.
id: redshift
type: postgres
host: <CLUSTER ENDPOINT>
schema: dev
login: awsuser
password: Passw0rd
port: 5439
DAG :The top-level object is a Python object that defines the organization, structure and schedule of a workflow. Tasks in a specific workflow will be attached to it.
Dag run: run-time version of a DAG with additional context (state, beginnig, end, success)
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.udacity_plugin import (StageToRedshiftOperator,
LoadFactOperator,
LoadDimensionOperator,
DataQualityOperator,
HasRowsOperator)
from helpers import CreateTables, SqlQueries
from subdag import get_s3_to_redshift_subdag, get_dimTables_to_Redshift_subdag
default_args = {
'owner': 'Sparkify & Co',
'depends_on_past': False,
'catchup': False,
'start_date': datetime.datetime(2018, 11, 1, 0, 0, 0, 0),
'end_date' : datetime.datetime(2018, 11, 30, 0, 0, 0, 0),
'email_on_retry': False,
'retries': 3,
'provide_context': True,
'retry_delay': datetime.timedelta(minutes=5),
}
dag = DAG(
'ETL_Sparkify_v3',
default_args=default_args,
description='ETL from S3 to Redshift with Airflow',
schedule_interval='@hourly', # schedule_interval='0 * * * *'
max_active_runs=1
)
This creates a DAG, planned to start every hour(the beginning of an hour) from 2018-11-1. If any of its task fails, it will be retried 3 times, at 5 minutes interval.
The Airflow UI gives us useful views to follow each task instance.
Task : It;s a step in the DAG, for a specific Operator. Tasks made the DAG worflow logic with dependencies.
Task instance : It’s a run time specific execution task in the DAG. It’s send by the scheduler to the worker in Airflow.
Dependencies : Organize tasks instance.
2 subdags performs basic hourly tasks on the database.
The first one run sql statements from s3 to Redshift, copying data in stagings tables then check success.
get_s3_to_redshift_subdag()
def get_s3_to_redshift_subdag(
parent_dag_name,
task_id,
redshift_conn_id,
aws_credentials_id,
create_tbl,
target_table,
s3_bucket,
s3_key,
custom,
*args, **kwargs):
dag= DAG(
f"{parent_dag_name}.{task_id}",
**kwargs
)
templated_command = """
echo "**************** {{ task.owner }}, {{ task.task_id }}"
echo "**************** The execution date : {{ ds }}"
echo "**************** {{ task_instance_key_str }} is running"
"""
info_task = BashOperator()
copy_task = StageToRedshiftOperator()
check_staging = HasRowsOperator()
info_task >> copy_task
copy_task >> check_staging
return dag
get_dimTables_to_Redshift_subdag()
The second one, run sql statements to copy fron staging tables to dimension tables and check if null values in the primary key column.
The sql code is provide in ./plugins/helpers/sql_queries.py
user_table_insert = (""" SELECT distinct
userid,
firstname,
lastname,
gender,
level
FROM staging_events
WHERE page='NextSong'
""")
Operator : It’s a Python class to make specific operation used in a DAG. Here, I use standard operators (PostgresOperator, BashOperator) and define customize operators (StageToRedshiftOperator, DataQualityOperator, HasRowOperators)
class HasRowsOperator(BaseOperator):
@apply_defaults
def __init__(self,
redshift_conn_id="",
target_table="",
*args, **kwargs):
super(HasRowsOperator, self).__init__(*args, **kwargs)
self.target_table = target_table
self.redshift_conn_id = redshift_conn_id
def execute(self, context):
self.log.info('********** HasRowsOperator is processing')
redshift_hook = PostgresHook(self.redshift_conn_id)
records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.target_table}")
self.log.info(f"********** Running for {self.target_table}")
if len(records) < 1 or len(records[0]) < 1:
raise ValueError(f"********** Data quality check failed. {self.target_table} returned no results")
num_records = records[0][0]
if num_records < 1:
raise ValueError(f"********** Data quality check failed. {self.target_table} contained 0 rows")
logging.info(f"********** Data quality on table {self.target_table} check passed with {records[0][0]} records")
self.log.info(f"********** HasRowsOperator end !!")
@rchang/a-beginners-guide-to-data-engineering-part-ii-47c4e7cbda71">Data Modeling