项目作者: anthelix

项目描述 :
Build an Airflow pipeline from Amazon s3 to Amazon Redshift
高级语言: Python
项目地址: git://github.com/anthelix/udacity_project5.git
创建时间: 2020-04-07T19:34:16Z
项目社区:https://github.com/anthelix/udacity_project5

开源协议:

下载


Udacity Data Engineering Nanodegree







Project : Data Pipelines

About an ETL pipeline for a data lake hosted on S3.

Table of contents

About The Project

A music streaming company, Sparkify, has decided that it is time to introduce more automation and monitoring to their data warehouse ETL pipelines and come to the conclusion that the best tool to achieve this is Apache Airflow.

Purpose

This traffic activity generates a large number of different events that need to be collected, transformed, reconciled, analyzed and understood in order to fully understand customer needs and improve Sparkify’s services. Several tools such as Amazon S3 or Redshift allow to solve most dimensioning problems.
Airflow allows to define, plan, execute, control and manage workflows in pure Python code, while providing the tools and user interface to manage these workflow operations.
The data pipeline should be:

  • dynamic
  • build from reusable tasks
  • can be monitored
  • allow backfills
  • run tests after the ETL steps have been executed.

Getting Started

Dataset

The twice datasetset reside in S3:

  • Song data: s3://udacity-dend/song_data
  • Log data: s3://udacity-dend/log_data
Song Dataset

The first dataset is a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song’s track ID.

  1. song_data/A/B/C/TRABCEI128F424C983.json
  2. song_data/A/A/B/TRAABJL12903CDCF1A.json

And below is an example of what a single song file, TRAABJL12903CDCF1A.json, looks like.

  1. {"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
Log Dataset

The second dataset consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate app activity logs from an imaginary music streaming app based on configuration settings.
The log files in the dataset you’ll be working with are partitioned by year and month.

  1. log_data/2018/11/2018-11-12-events.json
  2. log_data/2018/11/2018-11-13-events.json

And below is an example of what the data in a log file, 2018-11-12-events.json, looks like.
log dataset image

Bypass Setup

If you don’t need Docke setup, Redshift scripts and Makefile, go to Last word to setup Connections in the Airflow web UI.

Setup

To setup, I first needed to create infrastructure to run Airflow and Aws. It’s looked like this:

  • Docker to build containers Postgres and Airflow
  • Python scripts to launch and stop Aws Redshift and access to Aws s3 in an conda environment ‘psyco’.
  • A Makefile to allow a couple of commands and everything is set.
  • First git clone this project.

Docker

  • Assume to have Docker installed, I use puckel/docker-airfloow from Docker Hub.
  • Build 2 containers, Airflow and Postgres to airflow metadata
  • You can visit the Docker Folder to check the config
  • run : systemctl start docker

Python scripts to setup AWS

  • I create a Conda Environment psyco
    1. conda create --quiet --yes --name psyco psycopg2 boto3 configparser numpy pandas tabulate botocore
    2. conda install --yes -c conda-forge ipython-sql
  • To uses it:
    1. conda activate psyco # run
    2. conda deactivate # close
    3. conda remove --name psyco --all # remove
    4. conda info --envs # check psyco setup
    Then, setup the ./redshift/dwh.cfg file with your AWS credentials. The mycluster.py python script, create a roleArn, a Redshift Cluster and a connection with the Vpc default Group.

Makefile

For now, we have “psyco” Conda environment and Docker container setup. This Makefile allows me to use commands in a simple way and often.

  • Run make to get the help for this Makefile
  • In the root folder of this project, location of the Makefile file,
    • conda activate psyco
    • make redshift
    • make config,
    • setup the ./airflow-secret.yaml with your credentials. It’s a temporary file which will be deleted with the make run command just after it has been copied to the new container.
    • make run
    • Once you do that, Airflow is running on your machine, and you can visit the UI by visiting http://localhost:8080/admin/

At the end

  • make clean Down docker and remove files
  • make stop Down Redshift

Workflow: useful cmd

  1. sudo systemctl # start Docker
  2. docker images # list docker images
  3. docker rmi -f $(docker images -qa) # delete images
  4. docker stop $(docker ps -a -q) # stop container running
  5. docker rm $(docker ps -a -q) # delete container
  6. docker-compose up -d # run docker
  7. docker-compose up -d --build # run docker and buil image
  8. docker-compose logs # Displays log output
  9. docker-compose ps # List containers

If you want to run airflow sub-commands, you can do so like this:

  1. docker-compose run --rm webserver airflow list_dags # List dags
  2. docker-compose run --rm webserver airflow test [DAG_ID] [TASK_ID] [EXECUTION_DATE] # Test specific task

If you want to run/test python script, you can do so like this:

  1. docker-compose run --rm webserver python /usr/local/airflow/dags/[PYTHON-FILE].py # Test python script
  1. docker exec -i -t $(shell docker ps -q --filter ancestor=puckel/docker-airflow) /bin/bash # run bash command in the container e.g make tty
  2. docker exec -i -t $(shell docker ps -q --filter ancestor=postgres:9.6) psql -U airflow # run psql command in the container and check metadatabase Airflow e.g make psql

Last word

  • If you don’t want to use the Docker setup nether the Redshift scripts,
    • Install and setup Airflow
    • Launch a redshift cluster, get the ENDPOINT
    • Make sure you’ve configured connections: In Airflow web UI, Go to Admin -> Connections and Edit
      • AWS connection
        1. id: aws_credential
        2. type: aws
        3. login: <AWS_ACCESS_KEY_ID>
        4. password: <AWS_SECRET_ACCESS_KEY>
      • Redshift connection
        1. id: redshift
        2. type: postgres
        3. host: <CLUSTER ENDPOINT>
        4. schema: dev
        5. login: awsuser
        6. password: Passw0rd
        7. port: 5439
        This creates hooks in Airflow that Dags can use.

Worflow

Creating a Dag To Extract Files From S3, Transform and Load Tables To Redshift database

dag1

Main DAG

DAG :The top-level object is a Python object that defines the organization, structure and schedule of a workflow. Tasks in a specific workflow will be attached to it.

Dag run: run-time version of a DAG with additional context (state, beginnig, end, success)

  1. import datetime
  2. from airflow import DAG
  3. from airflow.operators.bash_operator import BashOperator
  4. from airflow.operators.dummy_operator import DummyOperator
  5. from airflow.operators.postgres_operator import PostgresOperator
  6. from airflow.operators.subdag_operator import SubDagOperator
  7. from airflow.operators.udacity_plugin import (StageToRedshiftOperator,
  8. LoadFactOperator,
  9. LoadDimensionOperator,
  10. DataQualityOperator,
  11. HasRowsOperator)
  12. from helpers import CreateTables, SqlQueries
  13. from subdag import get_s3_to_redshift_subdag, get_dimTables_to_Redshift_subdag
  14. default_args = {
  15. 'owner': 'Sparkify & Co',
  16. 'depends_on_past': False,
  17. 'catchup': False,
  18. 'start_date': datetime.datetime(2018, 11, 1, 0, 0, 0, 0),
  19. 'end_date' : datetime.datetime(2018, 11, 30, 0, 0, 0, 0),
  20. 'email_on_retry': False,
  21. 'retries': 3,
  22. 'provide_context': True,
  23. 'retry_delay': datetime.timedelta(minutes=5),
  24. }
  25. dag = DAG(
  26. 'ETL_Sparkify_v3',
  27. default_args=default_args,
  28. description='ETL from S3 to Redshift with Airflow',
  29. schedule_interval='@hourly', # schedule_interval='0 * * * *'
  30. max_active_runs=1
  31. )

This creates a DAG, planned to start every hour(the beginning of an hour) from 2018-11-1. If any of its task fails, it will be retried 3 times, at 5 minutes interval.
The Airflow UI gives us useful views to follow each task instance.
dag1
dag1

Sub DAG skeleton

Task : It;s a step in the DAG, for a specific Operator. Tasks made the DAG worflow logic with dependencies.

Task instance : It’s a run time specific execution task in the DAG. It’s send by the scheduler to the worker in Airflow.

Dependencies : Organize tasks instance.

2 subdags performs basic hourly tasks on the database.
The first one run sql statements from s3 to Redshift, copying data in stagings tables then check success.

  • get_s3_to_redshift_subdag()

    1. def get_s3_to_redshift_subdag(
    2. parent_dag_name,
    3. task_id,
    4. redshift_conn_id,
    5. aws_credentials_id,
    6. create_tbl,
    7. target_table,
    8. s3_bucket,
    9. s3_key,
    10. custom,
    11. *args, **kwargs):
    12. dag= DAG(
    13. f"{parent_dag_name}.{task_id}",
    14. **kwargs
    15. )
    16. templated_command = """
    17. echo "**************** {{ task.owner }}, {{ task.task_id }}"
    18. echo "**************** The execution date : {{ ds }}"
    19. echo "**************** {{ task_instance_key_str }} is running"
    20. """
    21. info_task = BashOperator()
    22. copy_task = StageToRedshiftOperator()
    23. check_staging = HasRowsOperator()
    24. info_task >> copy_task
    25. copy_task >> check_staging
    26. return dag

    dag1

  • get_dimTables_to_Redshift_subdag()

The second one, run sql statements to copy fron staging tables to dimension tables and check if null values in the primary key column.

The sql code is provide in ./plugins/helpers/sql_queries.py

  1. user_table_insert = (""" SELECT distinct
  2. userid,
  3. firstname,
  4. lastname,
  5. gender,
  6. level
  7. FROM staging_events
  8. WHERE page='NextSong'
  9. """)

dag1

Operators

Operator : It’s a Python class to make specific operation used in a DAG. Here, I use standard operators (PostgresOperator, BashOperator) and define customize operators (StageToRedshiftOperator, DataQualityOperator, HasRowOperators)

  1. class HasRowsOperator(BaseOperator):
  2. @apply_defaults
  3. def __init__(self,
  4. redshift_conn_id="",
  5. target_table="",
  6. *args, **kwargs):
  7. super(HasRowsOperator, self).__init__(*args, **kwargs)
  8. self.target_table = target_table
  9. self.redshift_conn_id = redshift_conn_id
  10. def execute(self, context):
  11. self.log.info('********** HasRowsOperator is processing')
  12. redshift_hook = PostgresHook(self.redshift_conn_id)
  13. records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {self.target_table}")
  14. self.log.info(f"********** Running for {self.target_table}")
  15. if len(records) < 1 or len(records[0]) < 1:
  16. raise ValueError(f"********** Data quality check failed. {self.target_table} returned no results")
  17. num_records = records[0][0]
  18. if num_records < 1:
  19. raise ValueError(f"********** Data quality check failed. {self.target_table} contained 0 rows")
  20. logging.info(f"********** Data quality on table {self.target_table} check passed with {records[0][0]} records")
  21. self.log.info(f"********** HasRowsOperator end !!")