项目作者: christopherbantle

项目描述 :
Create an S3 data lake containing Shopify cart and checkout events.
高级语言: Python
项目地址: git://github.com/christopherbantle/Shopify-Data-Lake.git
创建时间: 2019-01-20T19:11:22Z
项目社区:https://github.com/christopherbantle/Shopify-Data-Lake

开源协议:MIT License

下载


Shopify Data Lake

An AWS SAM application that allows you to capture Shopify cart and checkout events via webhooks, and save them to S3.

1. Use Case

Given that, on average, two thirds of carts will be abandoned before being converted, there should be roughly three
times more data points on what customers are adding to their carts relative to the number of data points on what
customers are actually purchasing. Thus, with cart data at your disposal, you can more quickly, and with greater
confidence, identify changes in customer behaviour than if you were to rely soley on purchase data. This is especially
helpful for merchants with smaller volumes of sales.

2. Architecture

Diagram

Shopify will send webhook notifications (in particular, for the carts/create, carts/update, checkouts/create,
and checkouts/update events) to an API Gateway endpoint. API Gateway will invoke a Lambda function
asynchronously, returning a response to Shopify without waiting for the results of the Lambda invocation. The Lambda
function will then put the data into a Kinesis Firehose (assuming the computed HMAC digest
matches the digest supplied in the request). Finally, the Kinesis Firehose will save the data to S3 in batches. There will be one Lambda functions and Kinesis Firehoses for the cart data, and another for the checkout data.

Using a Kinesis Firehose to write data to S3, instead of having the data-receiving Lambda function write directly to S3,
will significantly reduce the number of put object operations. In addition, this will provide the option to leverage
the Firehose for data transformations, or data format conversion.

3. Deployment

Note that when you deploy the application, you must have already created the S3 bucket that will be used to store your
data. Note also that the Kinesis Firehose is not configured to encrypt the data that it puts in S3, and so, in
order to have this data encrypted, you should enable default encryption on the bucket, or modify the configuration of the Firehose.

Build Lambda Artifacts

  1. sam build --base-dir lambda_code

Upload Artifacts to S3

  1. sam package --s3-bucket <deployment bucket> --output-template-file .deployment/template.yml --s3-prefix shopify_data_lake

Create Stack

  1. sam deploy --template-file .deployment/template.yml --stack-name <stack name> --capabilities CAPABILITY_NAMED_IAM --parameter-overrides $(cat .deployment/parameters)

Note that .deployment/parameters should be of the format:

  1. <parameter key>=<parameter value>
  2. ...

Configure Webhooks

Configure the webhooks for the carts/create, carts/update, checkouts/create, and checkouts/update events, using
either the admin section of your Shopify store, or via the Shopify API. You can find the URL of your HTTP endpoint
using the API Gateway console. The base URL should have the format
https://<random hash>.execute-api.<region>.amazonaws.com/prod, where /cart is appended to form the URL for
cart notifications, and/checkout is appended to form the URL for checkout notifications.

4. Test Locally

Create Test Events and Environment Variable Files

The environment variables file should contain a mapping from logical resource id (e.g., ‘HandleCartEventFunction’)
to key to value. For example:

  1. {
  2. "HandleCartEventFunction": {
  3. "KINESIS_FIREHOSE": "<name of Kinesis Firehose for cart events>",
  4. "SHOPIFY_AUTHENTICATION_KEY": "<key provided by Shopify to authenticate requests>"
  5. },
  6. "HandleCheckoutEventFunction": {
  7. "KINESIS_FIREHOSE": "<name of Kinesis Firehose for checkout events>",
  8. "SHOPIFY_AUTHENTICATION_KEY": "<key provided by Shopify to authenticate requests>"
  9. }
  10. }

Build Artifacts

  1. sam build --base-dir lambda_code

Single Invocation

  1. sam local invoke --event .test/events/authentic_request.json --env-vars .test/env_vars.json HandleCartEventFunction

Multiple Invocations

  1. sam local start-lambda --env-vars .test/env_vars.json
  2. aws lambda invoke --function-name HandleCartEventFunction --endpoint-url http://127.0.0.1:3001 --no-verify-ssl --payload "$(cat .test/events/authentic_request.json)" /dev/null
  3. aws lambda invoke --function-name HandleCartEventFunction --endpoint-url http://127.0.0.1:3001 --no-verify-ssl --payload "$(cat .test/events/digest_does_not_match.json)" /dev/null
  4. aws lambda invoke --function-name HandleCartEventFunction --endpoint-url http://127.0.0.1:3001 --no-verify-ssl --payload "$(cat .test/events/invalid_request.json)" /dev/null

The logs for the invocations will show up in the terminal session where the local Lambda container was started.

5. Query Data with Athena

Note that for simplicity, a number of the event attributes are omitted in the table creations. You can
find all of the event attributes in the Shopify documentation.

Create Database for Shopify Events

  1. CREATE DATABASE shopify_events;

Create Table for Cart Events

  1. CREATE EXTERNAL TABLE shopify_events.carts (
  2. token string,
  3. updated_at string,
  4. created_at string,
  5. line_items array<struct<
  6. quantity:int,
  7. title:string,
  8. price:string
  9. >>
  10. )
  11. ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
  12. LOCATION 's3://<data bucket>/cart/';

Create View For Cart Line Items

  1. CREATE OR REPLACE VIEW shopify_events.cart_line_items AS
  2. SELECT token, cast(from_iso8601_timestamp(updated_at) AS TIMESTAMP) AS update_time, cast(from_iso8601_timestamp(created_at) AS TIMESTAMP) AS creation_time, line_item.quantity AS quantity, line_item.title AS title, cast(line_item.price AS DECIMAL) AS price
  3. FROM shopify_events.carts
  4. CROSS JOIN UNNEST(line_items) AS t(line_item);

Get Most Recent State of Carts

For each cart in its most recent state, there will be one row per line item.

  1. SELECT a.*
  2. FROM shopify_events.cart_line_items a
  3. INNER JOIN (
  4. SELECT token, max(update_time) as last_update_time
  5. FROM shopify_events.cart_line_items
  6. GROUP BY token
  7. ) b ON a.token = b.token AND a.update_time = b.last_update_time;

Get All Items Added to Carts

  1. SELECT token, title, max(quantity) as max_quantity
  2. FROM shopify_events.cart_line_items
  3. GROUP BY token, title;

Create Table for Checkout Events

  1. CREATE EXTERNAL TABLE shopify_events.checkouts (
  2. token string,
  3. cart_token string,
  4. updated_at string,
  5. created_at string,
  6. completed_at string,
  7. line_items array<struct<
  8. quantity:int,
  9. title:string,
  10. price:string
  11. >>,
  12. customer struct<
  13. id:string
  14. >
  15. )
  16. ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
  17. LOCATION 's3://<data bucket>/checkout/';

Create View for Checkout Line Items

  1. CREATE OR REPLACE VIEW shopify_events.checkout_line_items AS
  2. SELECT token AS checkout_token, cart_token, cast(from_iso8601_timestamp(updated_at) AS TIMESTAMP) AS update_time, cast(from_iso8601_timestamp(created_at) AS TIMESTAMP) AS creation_time, cast(from_iso8601_timestamp(completed_at) AS TIMESTAMP) AS completion_time, line_item.quantity AS quantity, line_item.title AS title, cast(line_item.price AS DECIMAL) AS price, customer.id AS customer_id
  3. FROM shopify_events.checkouts
  4. CROSS JOIN UNNEST(line_items) AS t(line_item);

Get Most Recent State of Checkouts

  1. SELECT DISTINCT a.*
  2. FROM shopify_events.checkout_line_items a
  3. INNER JOIN (
  4. SELECT checkout_token, max(update_time) as last_update_time
  5. FROM shopify_events.checkout_line_items
  6. GROUP BY checkout_token
  7. ) b ON a.checkout_token = b.checkout_token AND a.update_time = b.last_update_time;