Amazon QLDB Streams integration with Amazon Elasticsearch
The sample in this project demonstrates how to integrate Amazon Elasticsearch Service with Amazon QLDB using Streams.
It consists of a AWS Lambda function written in Python which reads QLDB Streams and indexes documents to Amazon Elasticsearch.
This sample is modelled around a department of motor vehicles (DMV) database that tracks the complete historical information about vehicle registrations.
The Sample demonstrates how you can replicate your documents in Amazon QLDB to Amazon Elasticsearch Service in near real time using Amazon Kinesis.
Person
documents for only insert
cases in QLDB and indexes VehicleRegistration
for insert and update
cases.sample_scenarios
to insert, update and delete
data into the QLDB tables. AWS SAM provides you with a command line tool, the AWS SAM CLI, that makes it easy for you to create and manage serverless applications. You need to install and configure a few things in order to use the AWS SAM CLI. See AWS SAM CLI Installation for details.
SAM requires an S3 bucket to host the source code for lambda function. We will be using the AWS CLI for creating the bucket. Please read AWS CLI Configuration for help on how to configure the CLI.
The examples require Python 3.4 or above. Please see the link below for more detail to install Python:
It is required that you clone this repository. The project consists of two main directories:
src
: This directory has the source for the Lambda function.sample_scenarios
: This consists of python scripts which can be used to insert, update and delete documents in vehicle-registration
. These will be used after setup is complete to verify that the sample works as expected. vehicle-registration
Please Follow the steps listed here to create a new Ledger.
We would need to create an S3 bucket. This S3 bucket would be used by SAM to host the source code of the lambda function.
export BUCKET_NAME=some_unique_valid_bucket_name
aws s3 mb s3://$BUCKET_NAME
sam build
sam package \
--output-template-file packaged.yaml \
--s3-bucket $BUCKET_NAME
sam deploy \
--template-file packaged.yaml \
--stack-name STACK_NAME \
--capabilities CAPABILITY_NAMED_IAM \
--parameter-overrides ParameterKey=ElasticsearchDomainName,ParameterValue=DOMAIN_NAME
Replace DOMAIN_NAME
with a domain name of your choice
Replace STACK_NAME
with a stack name of your choice
Note: After the deployment completes, you should see KibanaEndpoint
in the outputs. Please copy this, we will need it later.
The Deployment will create a Cloudformation Stack with name you specify in the deploy command. As part of the Stack, Cloudformation will create the following:
person_index
and vehicle_index
on Elasticsearch using Custom Resource.Sign in to the AWS Management Console, and open the Amazon QLDB console at https://console.aws.amazon.com/qldb.
In the navigation pane, choose Streams.
Choose Create QLDB stream.
On the Create QLDB stream page, enter the following settings:
Ledger – Select the ledger vehicle-registration
from the drop down.
Start date and time – Leave this as the default. The default is current time.
End date and time – This can be left blank
Destination stream for journal data – Click browse and selectRegistrationStreamKinesis
.
Enable record aggregation in Kinesis Data Streams – Enables QLDB to publish multiple stream records in a single Kinesis Data Streams record. To learn more, see KPL Key Concepts.
IAM role – Select RegistrationStreamsKinesisRole
from the dropdown
When the settings are as you want them, choose Create QLDB stream.
If your request submission is successful, the console returns to the main Streams page and lists your QLDB streams with their current status.
Manage User Pools
registrations_kibana_demo_userpool
Users and groups
and click Create user
Create user popup
:Username
field. (This will be needed later)Password
field. (This will be needed later)Create user
buttonperson_index
, vehicle_index
. Check here on how to create index patterns on Kibana.You should see some documents for person_index
and vehicle_index
.
Here we will insert and update some documents in QLDB and verify that those updates reflect on Elasticsearch.
Run the following command in the root of the repository.
pip install -r sample_scenarios/requirements.txt
Note: In case your Ledger name is not vehicle-registration
, you will have to update the
ledger name in sample_scenarios/constants.py
We will insert some more documents to the tables in the ledger and verify insertions in Elasticsearch.
python -m sample_scenarios.insert_documents
We will update PendingPenaltyTicketAmount
for some documents in the VehicleRegistration
table.
The updates should reflect on Elasticsearch.
python -m sample_scenarios.single_update_to_document
We will update PendingPenaltyTicketAmount
multiple times for a document in the VehicleRegistration
table.
The final update should reflect on Elasticsearch.
python -m sample_scenarios.multiple_updates_to_a_document
Tests are defined in the tests
folder in this project. Use PIP to install the pytest and run unit tests.
pip install pytest pytest-mock --user
python -m pytest tests/ -v
To delete the sample application that you created, use the AWS CLI. Assuming you used your project name for the stack name, you can run the following:
aws cloudformation delete-stack --stack-name STACK_NAME
This library is licensed under the MIT-0 License.