Streams
Introduction
Amazon DynamoDB Streams provide a time-ordered sequence of item-level modifications within a DynamoDB table, capturing changes such as inserts, updates, and deletes. This tool enables developers to build triggers to execute custom code in response to database changes in near real-time.
These triggers serve a variety of use cases, such as replicating to secondary datastores, computing aggregations, sending notifications, and more.
Creating a trigger
To get started with DynamoDB Streams integration, the first step is to create
a module within integrates/infra/src/streams_lambda/streams
.
Inside this module, you’ll create a file, commonly named handler.py
,
to define the function that will be invoked by DynamoDB Stream events.
Here’s a simple example of a handler.py
file with a function named process
:
The next step is to declare the trigger
in the integrates/infra/src/streams_lambda/triggers.json
file.
This file serves as the configuration for all triggers.
To simplify the process and ensure correctness,
you can leverage the provided JSON Schema with editor IntelliSense support.
Here’s an example of how you might structure the trigger object:
Local Reproducibility
Locally, your triggers will be invoked by a KCL-based consumer. This consumer reads the stream from your local instance of DynamoDB, mimicking the behavior of the stream processing mechanism in AWS Lambda, allowing you to test the functionality of your triggers in a controlled setting.
To use it, you can start the database job as usual,
running m . /integrates/db
and then perform changes that match
the filter criteria of your trigger.
Logs will be outputted to the console as the trigger functions are invoked
in response to the stream events.
You can optionally run the database job and the local consumer separately, by running the following command:
and then:
Testing
Ensuring the reliability of your DynamoDB Streams triggers is a critical aspect of development. You can adopt a two-tiered testing approach encompassing both unit tests and functional tests.
Unit Tests
For unit testing, create tests
within the integrates/infra/src/streams_lambda/test
directory.
In these tests, call the handler functions directly,
simulating DynamoDB Stream events and verifying
the state before and after trigger execution.
These unit tests enable you to validate the behavior
of your handler functions in isolation,
ensuring they correctly process DynamoDB Stream events.
Functional Tests
Functional tests, located in integrates/back/test/functional
,
provide a higher-level validation of your DynamoDB Streams triggers.
In these tests, rely on the local consumer to invoke your triggers
whenever relevant changes occur in the database.
Functional tests provide end-to-end validation of your trigger,
ensuring that changes have the expected behavior.
Important Considerations
When creating a trigger,it’s crucial to address key considerations for a robust and reliable implementation.
Ensure Idempotency
Idempotency is paramount when designing functions triggered by DynamoDB Streams. This ensures that even if an event triggers the function multiple times, the outcome remains consistent. To achieve idempotency, developers should carefully design their functions, accounting for potential retries and preventing unintended side effects.
Avoid Infinite Loops
Preventing infinite loops is crucial when dealing with Lambda functions triggered by a DynamoDB Streams. This loops can occur when a function inadvertently modifies data in the same database table, triggering additional events and leading to endless execution cycles. To mitigate this risk, developers should carefully structure their code to avoid unintentional self-triggering and implement safeguards to break potential loops.
Keep a Short Execution Time
Lambda functions triggered by DynamoDB Streams operate within a time-constrained environment, with a maximum execution timeout of 15 minutes. Designing functions to complete their tasks within this timeframe is essential to prevent premature termination.
For use cases requiring extended processing times that exceeds Lambda’s execution limit, consider alternative approaches such us initiating jobs in AWS Batch, which supports longer execution durations.
Retry Mechanism
In the event of a failure during the invocation of
Lambda functions triggered by DynamoDB Streams,
AWS Lambda automatically retries invoking the function
until the associated record expires in the stream,
typically after 24 hours.
If the issue persists beyond this timeframe,
the event is considered unrecoverable and it is sent to
a dead-letter queue (DLQ) named integrates_streams_dlq
in AWS Simple Queue Service (SQS).
This DLQ serves as a repository for events requiring
manual review due to persistent failures.
Indicators
Motivation
For faster access to aggregated information, we implement a set of
finding indicators that are calculated and stored in the main table.
These indicators are calculated using DynamoDB Streams
(current indicators are listed in streams/indicators/findings/
).
These indicators are called unreliable indicators because backend calculation has not been guaranteed to be 100% accurate and that name was the selected for the first approach to implement them. The main reason is that the indicators are calculated in real-time and the data could be inconsistent due to the asynchronous nature of the process.
With Streams, we are reliable thanks to shard processing of every database modification.
Considerations
No calculations on-the-fly. All the indicators cannot be calculated on the fly because they are based on a large amount of data. To avoid performance issues, Streams calculates and updates the finding indicators by batches.
Only findings indicators. There are other unreliable indicators at the platform (organization, groups, portfolios, etc.). All the indicators cannot be calculated using Streams because some of those events could generate cyclic calls due to indicators belongs to existing elements in the same table and some indicators could be computationally expensive due to data to check.
Add new indicators to Streams must be deliberated. In the future, we’ll implement new ones.
No calculations by add or remove one. With the shards, we could consider calculating some indicators by adding or removing a sum of data. We tried it but maintaining or extending that methods and some off-by-one errors start to be a nightmare. We consider that querying all the required information once and doing simple calculations is more maintainable.
Unit tests for methods, functional tests for mutations. We have unit tests for the methods that calculate the indicators and functional tests check, using snapshots, the full result of the indicators.
Search engine
Our search engine, Opensearch, helps to access faster to data and use more filter options. Streams are updating continuously the search engine with new data for improving the data consistency and integrity.
Indexes are strongly typed. Opensearch infers the type of the data once the data is added to the Opensearch index. Any modification to a previous column will require to repopulate the index, and Streams will reject to add incorrect data to the index.
Monitoring and Troubleshooting
Ensuring the health and performance of DynamoDB Streams triggers is essential for maintaining a responsive and reliable system. Here’s how you can effectively monitor and troubleshoot your triggers:
Bugsnag and Coralogix
In the event of unhandled errors, your trigger will report them
through Bugsnag and/or Coralogix under the integrates-streams
project.
This setup enables centralized error tracking,
enabling prompt identification and resolution of issues.
You can view traces and other performance-related insights in Coralogix.
CloudWatch Dashboard
Utilize the CloudWatch dashboard to monitor key metrics related to your Lambda functions triggered by DynamoDB Streams. Monitor metrics such as concurrent executions, indicating the number of functions running simultaneously. Additionally, track processing delay metrics to gain insights into the time elapsed since the change ocurred in the database until it was processed by a trigger. These metrics can help fine-tune the batch_size parameter of the trigger. View dashboard (internal link)
CloudWatch Logs
For detailed insight into trigger execution, review CloudWatch logs located under the log groups with the prefix “/aws/lambda/integratesstreams”. These logs capture execution details, including any custom logging statements incorporated into your code. Analyzing these logs is crucial for diagnosing specific issues and understanding the flow of DynamoDB Streams integration.