一个luigi驱动的分析/仓库堆栈
Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more
Luigi-Warehouse adds
VariableS3Client
)python setup.py install
pip3 install -r requirements.txt
if you want full functionality of all data sourcesmkdir your-path-to/data
luigi.cfg
. luigi.cfg-example
shows some possible options. You can also $ export LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python...
File | Description | Main Class(es) |
---|---|---|
gsheet_to_redshift.py | replicates all data from a google sheet to a redshift table (full copy/replace) | Run |
gsheet_to_hadoop.py | replicates all data from a google sheet to a hadoop hive table via spark (full copy/replace) | main |
postgres_to_redshift.py | replicates postgres tables to redshift (incrementally or full copy/replace) | Run - PerformIncrementalImport PerformFullImport |
postgres_to_hadoop.py | spark app that replicates postgres tables to hadoop(hive) (incrementally or copy/replace) | Run - RunIncremental RunFromScratch |
salesforce_to_redshift.py | replicates a salesforce report or SOQL to a redshift table(full copy/replace) | SOQLtoRedshift ReporttoRedshift |
teradata_to_redshift.py | replicates given teradata SQL to redshift table (incrementally or full copy/replace) | Run |
typeform_to_redshift.py | replicates all data from typeform responses to a redshift table (full copy/replace) | Run |
zendesk_to_redshift.py | extracts users,orgs,tickets,ticket_events from zendesk to redshift (partially incremental) | Run |
zendesk_to_hadoop.py | generic class to extract from zendesk API and load to hadoop hive via spark (incrementally or full copy/replace) | ZendeskSpark |
Example to start the luigi scheduler daemon
$ ./start_luigi_server.bash
Example to run a workflow with multiple workers in parallel
$ LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python3 luigi_warehouse/postgres_to_redshift.py Run --params here --workers 50
requires some configuring to install. We typically have to do
$ mv ~/.odbc.ini ~/.odbc.ini.orig
$ cp /opt/teradata/client/15.10/odbc_64/odbcinst.ini ~/.odbcinst.ini
$ cp /opt/teradata/client/15.10/odbc_64/odbc.ini ~/.odbc.ini
Props to cghall for the capability to query salesforce reports directly using the analytics API
Also available are SalesforceBulk
and SalesforceBulkJob
classes which use the Salesforce bulk API
We currently use slack or email for job status notifications which can easily be added
from luigi_slack import SlackBot, notify
slack_channel = 'luigi-status-messages'
...
...
...
if __name__ == '__main__':
slack_channel = 'luigi-status-messages'
slacker = SlackBot(token=luigi.configuration.get_config().get('slackbots', 'BOWSER_SLACK_API_KEY'),
channels=[slack_channel])
with notify(slacker):
luigi.run()
import boto3
class Email:
def __init__(self, region, aws_access_key_id, aws_secret_access_key):
self.client = boto3.client('ses',region_name=region,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
def send(self, from_, to_list, subject, body):
return self.client.send_email(Source=from_,
Destination={'ToAddresses': to_list},
Message={'Subject':
{'Data': subject},
'Body':
{'Text':
{'Data': body},
'Html':
{'Data':' '}
}
}
)
modules/validation.py
)VARCHAR
is acceptable for any python datatype)pass in the taskobj
with the following attributes
type
= [‘LoadError’, ‘Structure’]target
= Redshifttable
=schema
= local_file
= local csv file pathload_start
= when you started to copy the records from S3doing RunAnywayTarget(self).done()
will not do validation
RunAnywayTarget(self).validation()
will do the validation and if successful also say we’re done the tasktarget_cols
: a list of columns ordered for how you want your dataframe to be structureddf
: your dataframe you want restructured['one','two','three','four','five','six']
>>> from validation import OrderedDF
>>> import pandas as pd
>>> test = [[None,'',1,7,8],[None,'',2,5,6]]
>>> test = pd.DataFrame(test,columns=['one','two','four','five','three'])
>>> test
one two four five three
0 None 1 7 8
1 None 2 5 6
>>> result = OrderedDF(['one','two','three','four','five','six'],t)
>>> result.df
one two three four five six
0 None 8 1 7 None
1 None 6 2 5 None
To run use
StructureDynamic(target_schema= ,# redshift schema your table is in
target_table= # your table
)
.run(
add_cols= ,# True or False for if you want columns added in attempting to fix
change_dtypes= ,# True or False if you want column data types changed in attempting to fix
copy= ,# copy command you attempted
load_start= # when you started the copy command, '%Y-%m-%d %H:%M:$S
)
Example usage:
CREATE TABLE public.test(id INT, col VARCHAR);
INSERT INTO test VALUES (1,'2');
INSERT INTO test VALUES (2, 'two');
1,2
two,2
3,4
5,6
ab,test
COPY public.test FROM 's3://luigi-godata/test.csv'
CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;
from validation import StructureDynamic
copy = '''COPY public.test FROM 's3://luigi-godata/test.csv'
CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
CSV DELIMITER ',' COMPUPDATE ON MAXERROR 0;'''
StructureDynamic(target_schema='public',target_table='test').run(add_cols=True,change_dtypes=True,copy=copy,load_start='2016-10-6 10:15:00')
public.test
public.test_orig_backup