Create an S3 data lake containing Shopify cart and checkout events.
An AWS SAM application that allows you to capture Shopify cart and checkout events via webhooks, and save them to S3.
Given that, on average, two thirds of carts will be abandoned before being converted, there should be roughly three
times more data points on what customers are adding to their carts relative to the number of data points on what
customers are actually purchasing. Thus, with cart data at your disposal, you can more quickly, and with greater
confidence, identify changes in customer behaviour than if you were to rely soley on purchase data. This is especially
helpful for merchants with smaller volumes of sales.
Shopify will send webhook notifications (in particular, for the carts/create
, carts/update
, checkouts/create
,
and checkouts/update
events) to an API Gateway endpoint. API Gateway will invoke a Lambda function
asynchronously, returning a response to Shopify without waiting for the results of the Lambda invocation. The Lambda
function will then put the data into a Kinesis Firehose (assuming the computed HMAC digest
matches the digest supplied in the request). Finally, the Kinesis Firehose will save the data to S3 in batches. There will be one Lambda functions and Kinesis Firehoses for the cart data, and another for the checkout data.
Using a Kinesis Firehose to write data to S3, instead of having the data-receiving Lambda function write directly to S3,
will significantly reduce the number of put object operations. In addition, this will provide the option to leverage
the Firehose for data transformations, or data format conversion.
Note that when you deploy the application, you must have already created the S3 bucket that will be used to store your
data. Note also that the Kinesis Firehose is not configured to encrypt the data that it puts in S3, and so, in
order to have this data encrypted, you should enable default encryption on the bucket, or modify the configuration of the Firehose.
sam build --base-dir lambda_code
sam package --s3-bucket <deployment bucket> --output-template-file .deployment/template.yml --s3-prefix shopify_data_lake
sam deploy --template-file .deployment/template.yml --stack-name <stack name> --capabilities CAPABILITY_NAMED_IAM --parameter-overrides $(cat .deployment/parameters)
Note that .deployment/parameters
should be of the format:
<parameter key>=<parameter value>
...
Configure the webhooks for the carts/create
, carts/update
, checkouts/create
, and checkouts/update
events, using
either the admin section of your Shopify store, or via the Shopify API. You can find the URL of your HTTP endpoint
using the API Gateway console. The base URL should have the formathttps://<random hash>.execute-api.<region>.amazonaws.com/prod
, where /cart
is appended to form the URL for
cart notifications, and/checkout
is appended to form the URL for checkout notifications.
The environment variables file should contain a mapping from logical resource id (e.g., ‘HandleCartEventFunction’)
to key to value. For example:
{
"HandleCartEventFunction": {
"KINESIS_FIREHOSE": "<name of Kinesis Firehose for cart events>",
"SHOPIFY_AUTHENTICATION_KEY": "<key provided by Shopify to authenticate requests>"
},
"HandleCheckoutEventFunction": {
"KINESIS_FIREHOSE": "<name of Kinesis Firehose for checkout events>",
"SHOPIFY_AUTHENTICATION_KEY": "<key provided by Shopify to authenticate requests>"
}
}
sam build --base-dir lambda_code
sam local invoke --event .test/events/authentic_request.json --env-vars .test/env_vars.json HandleCartEventFunction
sam local start-lambda --env-vars .test/env_vars.json
aws lambda invoke --function-name HandleCartEventFunction --endpoint-url http://127.0.0.1:3001 --no-verify-ssl --payload "$(cat .test/events/authentic_request.json)" /dev/null
aws lambda invoke --function-name HandleCartEventFunction --endpoint-url http://127.0.0.1:3001 --no-verify-ssl --payload "$(cat .test/events/digest_does_not_match.json)" /dev/null
aws lambda invoke --function-name HandleCartEventFunction --endpoint-url http://127.0.0.1:3001 --no-verify-ssl --payload "$(cat .test/events/invalid_request.json)" /dev/null
The logs for the invocations will show up in the terminal session where the local Lambda container was started.
Note that for simplicity, a number of the event attributes are omitted in the table creations. You can
find all of the event attributes in the Shopify documentation.
CREATE DATABASE shopify_events;
CREATE EXTERNAL TABLE shopify_events.carts (
token string,
updated_at string,
created_at string,
line_items array<struct<
quantity:int,
title:string,
price:string
>>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<data bucket>/cart/';
CREATE OR REPLACE VIEW shopify_events.cart_line_items AS
SELECT token, cast(from_iso8601_timestamp(updated_at) AS TIMESTAMP) AS update_time, cast(from_iso8601_timestamp(created_at) AS TIMESTAMP) AS creation_time, line_item.quantity AS quantity, line_item.title AS title, cast(line_item.price AS DECIMAL) AS price
FROM shopify_events.carts
CROSS JOIN UNNEST(line_items) AS t(line_item);
For each cart in its most recent state, there will be one row per line item.
SELECT a.*
FROM shopify_events.cart_line_items a
INNER JOIN (
SELECT token, max(update_time) as last_update_time
FROM shopify_events.cart_line_items
GROUP BY token
) b ON a.token = b.token AND a.update_time = b.last_update_time;
SELECT token, title, max(quantity) as max_quantity
FROM shopify_events.cart_line_items
GROUP BY token, title;
CREATE EXTERNAL TABLE shopify_events.checkouts (
token string,
cart_token string,
updated_at string,
created_at string,
completed_at string,
line_items array<struct<
quantity:int,
title:string,
price:string
>>,
customer struct<
id:string
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<data bucket>/checkout/';
CREATE OR REPLACE VIEW shopify_events.checkout_line_items AS
SELECT token AS checkout_token, cart_token, cast(from_iso8601_timestamp(updated_at) AS TIMESTAMP) AS update_time, cast(from_iso8601_timestamp(created_at) AS TIMESTAMP) AS creation_time, cast(from_iso8601_timestamp(completed_at) AS TIMESTAMP) AS completion_time, line_item.quantity AS quantity, line_item.title AS title, cast(line_item.price AS DECIMAL) AS price, customer.id AS customer_id
FROM shopify_events.checkouts
CROSS JOIN UNNEST(line_items) AS t(line_item);
SELECT DISTINCT a.*
FROM shopify_events.checkout_line_items a
INNER JOIN (
SELECT checkout_token, max(update_time) as last_update_time
FROM shopify_events.checkout_line_items
GROUP BY checkout_token
) b ON a.checkout_token = b.checkout_token AND a.update_time = b.last_update_time;