项目作者: groupon

项目描述 :
一个luigi驱动的分析/仓库堆栈
高级语言: Python
项目地址: git://github.com/groupon/luigi-warehouse.git
创建时间: 2017-01-25T23:43:38Z
项目社区:https://github.com/groupon/luigi-warehouse

开源协议:Other

下载


Luigi-Warehouse

A boilerplate implementation of Luigi at Groupon

pic

  • Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more

  • Luigi-Warehouse adds

    • example workflows (i.e. replicating postgresql tables to redshift)
    • more data sources
    • variable data sources that do not rely on default luigi behavior/configs (i.e. VariableS3Client)

Install / Setup

  • Install python3 - This repo has been tested against python 3.4+
Simple

python setup.py install

Developers - if you’re wanting to modify/use the workflows with your custom logic
  • Clone this repo
  • pip3 install -r requirements.txt if you want full functionality of all data sources
Post-Install
  • mkdir your-path-to/data
  • Put your credentials and settings in luigi.cfg. luigi.cfg-example shows some possible options. You can also $ export LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python...
  • You’re ready to replicate or move data around…

Getting Started

  • Some example workflows are included. Assumptions, Args & Comments are in the File
File Description Main Class(es)
gsheet_to_redshift.py replicates all data from a google sheet to a redshift table (full copy/replace) Run
gsheet_to_hadoop.py replicates all data from a google sheet to a hadoop hive table via spark (full copy/replace) main
postgres_to_redshift.py replicates postgres tables to redshift (incrementally or full copy/replace) Run - PerformIncrementalImport PerformFullImport
postgres_to_hadoop.py spark app that replicates postgres tables to hadoop(hive) (incrementally or copy/replace) Run - RunIncremental RunFromScratch
salesforce_to_redshift.py replicates a salesforce report or SOQL to a redshift table(full copy/replace) SOQLtoRedshift ReporttoRedshift
teradata_to_redshift.py replicates given teradata SQL to redshift table (incrementally or full copy/replace) Run
typeform_to_redshift.py replicates all data from typeform responses to a redshift table (full copy/replace) Run
zendesk_to_redshift.py extracts users,orgs,tickets,ticket_events from zendesk to redshift (partially incremental) Run
zendesk_to_hadoop.py generic class to extract from zendesk API and load to hadoop hive via spark (incrementally or full copy/replace) ZendeskSpark
  • Example to start the luigi scheduler daemon

    1. $ ./start_luigi_server.bash
  • Example to run a workflow with multiple workers in parallel

    1. $ LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python3 luigi_warehouse/postgres_to_redshift.py Run --params here --workers 50

Data Sources

Dependent python packages required & API reference

Luigi - Spotify/Luigi
Postgres / Redshift - psycopg2
MySQL - pymysql
Adwords - googleads : API Reference
Googlesheets - gspread : API Reference
Slack - slackclient : API Reference
Five9 - suds : API Reference
Twilio - twilio : API Reference
Livechat - API Reference
Zendesk - zdesk : API Reference
Shiftplanning - API Reference
Kochava - API Reference
Teradata - teradata
  • requires some configuring to install. We typically have to do

    1. $ mv ~/.odbc.ini ~/.odbc.ini.orig
    2. $ cp /opt/teradata/client/15.10/odbc_64/odbcinst.ini ~/.odbcinst.ini
    3. $ cp /opt/teradata/client/15.10/odbc_64/odbc.ini ~/.odbc.ini
OnboardIQ - API Reference
AppBoy - API Reference
Salesforce - simple-salesforce : API Reference
  • Props to cghall for the capability to query salesforce reports directly using the analytics API

  • Also available are SalesforceBulk and SalesforceBulkJob classes which use the Salesforce bulk API

Braintree - braintree : API Reference
Typeform - API Reference
Checkr - API Reference
AWS - boto : boto3

Notifications

  • We currently use slack or email for job status notifications which can easily be added

  • luigi-slack

  1. from luigi_slack import SlackBot, notify
  2. slack_channel = 'luigi-status-messages'
  3. ...
  4. ...
  5. ...
  6. if __name__ == '__main__':
  7. slack_channel = 'luigi-status-messages'
  8. slacker = SlackBot(token=luigi.configuration.get_config().get('slackbots', 'BOWSER_SLACK_API_KEY'),
  9. channels=[slack_channel])
  10. with notify(slacker):
  11. luigi.run()
  1. import boto3
  2. class Email:
  3. def __init__(self, region, aws_access_key_id, aws_secret_access_key):
  4. self.client = boto3.client('ses',region_name=region,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
  5. def send(self, from_, to_list, subject, body):
  6. return self.client.send_email(Source=from_,
  7. Destination={'ToAddresses': to_list},
  8. Message={'Subject':
  9. {'Data': subject},
  10. 'Body':
  11. {'Text':
  12. {'Data': body},
  13. 'Html':
  14. {'Data':' '}
  15. }
  16. }
  17. )

Data Validation

  • Targeted towards ensuring successful replication of data to Redshift (see modules/validation.py)
Structure
  • if the same number of columns in the csv are in the target table
  • if the columns have the same datatypes in the same order (VARCHAR is acceptable for any python datatype)
    • uses python_redshift_dtypes to convert
LoadError
  • Checks for load errors for the target:schema:table provided since the load_start provided timestamp
RunAnywayTarget
  • Use the wrapper class RunAnywayTarget if you want to make it easier as we make each validation scheme better
  • pass in the taskobj with the following attributes

    • type = [‘LoadError’, ‘Structure’]
    • target = Redshift
    • table =
    • schema =
    • local_file = local csv file path
    • load_start = when you started to copy the records from S3
  • doing RunAnywayTarget(self).done() will not do validation

  • doing RunAnywayTarget(self).validation() will do the validation and if successful also say we’re done the task
OrderedDF
  • Takes the following args
  1. target_cols : a list of columns ordered for how you want your dataframe to be structured
  2. df : your dataframe you want restructured
  • example: I my dataframe to have columns in this order ['one','two','three','four','five','six']
    1. >>> from validation import OrderedDF
    2. >>> import pandas as pd
    3. >>> test = [[None,'',1,7,8],[None,'',2,5,6]]
    4. >>> test = pd.DataFrame(test,columns=['one','two','four','five','three'])
    5. >>> test
    6. one two four five three
    7. 0 None 1 7 8
    8. 1 None 2 5 6
    9. >>> result = OrderedDF(['one','two','three','four','five','six'],t)
    10. >>> result.df
    11. one two three four five six
    12. 0 None 8 1 7 None
    13. 1 None 6 2 5 None
StructureDynamic
  • This class will fix tables for you
  1. Check for copy errors
  2. Handle the copy errors
    • Add column(s) if needed
    • Change dtype(s) if needed
  3. Get orig table’s schema
  4. Craft new table’s schema with changes from errors
  5. Make the change and retry the copy and remove duplicate * records
  6. While there are copy errors
    • handle the errors
    • attempt to fix
    • retry copy
    • remove duplicate * records
  • To run use

    1. StructureDynamic(target_schema= ,# redshift schema your table is in
    2. target_table= # your table
    3. )
    4. .run(
    5. add_cols= ,# True or False for if you want columns added in attempting to fix
    6. change_dtypes= ,# True or False if you want column data types changed in attempting to fix
    7. copy= ,# copy command you attempted
    8. load_start= # when you started the copy command, '%Y-%m-%d %H:%M:$S
    9. )
  • Example usage:

    • sql prep: create the table
      1. CREATE TABLE public.test(id INT, col VARCHAR);
      2. INSERT INTO test VALUES (1,'2');
      3. INSERT INTO test VALUES (2, 'two');
    • test.csv: create the csv you want to attempt to copy
      1. 1,2
      2. two,2
      3. 3,4
      4. 5,6
      5. ab,test
    • we attempt to copy normally but we get load errors because one of the columns isn’t right
      1. COPY public.test FROM 's3://luigi-godata/test.csv'
      2. CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
      3. CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;
    • we run ValidationDynamic
      1. from validation import StructureDynamic
      2. copy = '''COPY public.test FROM 's3://luigi-godata/test.csv'
      3. CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
      4. CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;'''
      5. StructureDynamic(target_schema='public',target_table='test').run(add_cols=True,change_dtypes=True,copy=copy,load_start='2016-10-6 10:15:00')
    • our table is fixed and called public.test
    • our original table is kept as public.test_orig_backup
    • stdout lists the stl_load_errors
    • the changes made to the table’s ddl is printed to stdout