DynamoDB Ingestion to an Enterprise Data Lake
A journey in discovering architectural patterns
Overview
Companies manage a lot of data – often having application data in many different data stores and in a variety of formats (e.g., File storage, DB2, PostgreSQL, Oracle, MSSQL, MySQL, MongoDB, DynamoDB, and more). That data can also live on many different physical data centers ranging from on-premises to vendor data centers to public cloud data centers. Each data store is created for a specific purpose, such as customer contact information, conversation logs, policy data, or purchase history.
There are generally two types of data usage: operational and analytical. Operational data usage is straightforward. Each application connects directly to the data store and manages that data store directly. Services that support live business need the most up-to-date information all the time. Analytical usage is altogether different; requiring aggregation of many different data sources together and translating the data into usable information. For these purposes, it often makes the most sense to centralize all the data into a data lake.
Why Data Lake?
Without a data lake, aggregating data gets complicated fast. It requires gaining access to many different data stores, managing multiple credentials, maintaining connections, dealing with different formats, ensuring that you don’t impact business operations, and monitoring connections constantly. The data lake removes many of these obstacles by centralizing the data, conforming it into a common format like Apache Parquet, and implementing a common security model. In AWS, a data lake foundationally consists of one or more Amazon S3 buckets containing aggregated data. After the initial landing, the data can be used in conjunction with other services to massage and analyze it in a variety of ways in order to provide value to the business.
The Issue
I have a sizeable amount of data in AWS DynamoDB residing in an individual, siloed account. I need to move that data to a data lake account. Once the data is in the data lake, analysis becomes a lot easier. Analysis can be done using a variety of tools in order to conform data and measure key business outcomes.
So, how can you get that data into the data lake? Well, that’s the question of the day.
The Options
DynamoDB S3 Export
As of November 2020, DynamoDB supports native S3 export. This is perfect, right? It fits the diagram above!
In fact, it does work, and it works well. Exports can be triggered daily by invoking a Lambda that initiates the S3 export using the DynamoDB.ExportTableToPointInTime API method.
Exports look like this:
Each file simply contains one line per row in the database. Combining all the files together produces all the data in the database, exactly as it was at time of export.
What’s the catch? Well, this option is quite affordable if you have a small to moderate sized database. You only pay for the storage you use. If your table is large, keeping daily copies quickly becomes expensive. For example, daily exports of a table with 100GB of data would increase storage requirements by 3TB per month. Furthermore, the data won’t be in a format conducive to analyzing how data changes over time.
DynamoDB Global Table Replica
DynamoDB Global Tables are an easy way to replicate a live database for analytics purposes. It’s cost efficient, in that you only pay for data transfer and storage.
However, global tables aren’t designed for analytical purposes. When all the other data is in S3, reading a DynamoDB requires more code to connect and query. Additionally, you’re directly coupling your live, production database to the analytic database. If you deploy changes to your main table, it can negatively impact the replica, and vice versa. Just like S3 exports, you can’t view historical data. Finally, writes can be made to the replica, which will get sent back to the main table. This pattern does not effectively create a division between analytical needs and operational needs, thus it is not ideal for a data lake.
One-time S3 Export & Delta DynamoDB Streams
To separate operational from analytical data and get historical view of a database, you can take a two-phased approach. Do a one-time DynamoDB S3 export (first option above), and follow up with deltas livestreamed using DynamoDB Streams.
First, export the database just like in the first option. Then, consume it into a Lambda that breaks up the exported files (containing many rows each) into individual rows, and send those to a queue. That queue is consumed by a record writer, which writes the files into the data lake S3 bucket.
Additionally, a DynamoDB stream and a Lambda can consume the stream to write delta objects into S3. Example object:
{
"action": "MODIFY",
"sequenceNumber": 64659840197412712, // Unique sequence number sent by the DynamoDB Stream; used for ordering purposes
"object": {
"primaryKey": "TheKey",
"otherField": 7,
"yetAnotherField": ["a", "b", "c"]
// ...
}
}
This process, however, is (relatively) complicated. There’s a lot going on that can go wrong. There are a lot of moving pieces to be monitored with a relatively large amount of code to be maintained.
One-time Glue Export & Delta DynamoDB Streams
To simplify the workflow, AWS Glue, a fully managed Extract Transform Load (ETL) service, can be used.
The steps are as follows:
- Invoke a Glue Crawler.
- The crawler connects to the source DynamoDB table, reads all the rows in the table, and determines the schema (columns and data types) based on the rows it read.
- The crawler stores (creates or updates) the discovered schema into a Glue Data Catalog, which simply stores the schema and makes it available for consumption.
- Next, invoke a Glue Job that reads the data from the DynamoDB table, aided by the Glue Catalog, and export the data into the Enterprise Data Lake S3 bucket.
This Glue job is written in Python using PySpark, which is an Apache project designed to make data transformation easy. In fact, the actual Glue job code is so simple that it’s pasted below.
import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
glueContext = GlueContext(SparkContext.getOrCreate())
args = getResolvedOptions(sys.argv, ["glue_database_name", "s3_destination_prefix", "table_to_export"])
glue_database_name = args["glue_database_name"]
s3_destination_prefix = args["s3_destination_prefix"]
table_to_export = args["table_to_export"]
# -------------------- Load database --------------------
database = glueContext.create_dynamic_frame.from_catalog(database=glue_database_name, table_name=table_to_export)
database_df = database.toDF()
# -------------------- Clean out previous export's S3 files --------------------
print(f'Purging {s3_destination_prefix}')
glueContext.purge_s3_path(s3_destination_prefix, options={"retentionPeriod": -1})
print(f'Purged {s3_destination_prefix}')
# -------------------- Write to Enterprise Data Lake --------------------
destination = f'{s3_destination_prefix}/{table_to_export}'
print(f'Writing {table_to_export} with {database_df.count()} rows to {destination}')
database_df.write.parquet(destination, mode="overwrite")
print('Done')
Seriously, that’s how easy it is to export a table to S3 using PySpark + Glue!
After the initial export, the same delta streaming, described in the previous example, can be done. A Kinesis Data Firehose can reduce complexity, decrease cost, and improve computational efficiency. Firehose saves money (via reduced S3 PUT operations) by taking in records over a period of time (60-900 seconds, configurable) and lumping those records into a single file in S3.
The final file structure looks like this:
~> aws s3 ls --recursive s3://<bucket>/mock-datalake/
/delta/2021/01/29/12/delta-1-2021-01-29-12-58-25-6608d3fb-719b-47b5-a832-478fbd17e63e
/delta/2021/01/29/13/delta-1-2021-01-29-13-00-14-09aff94c-52ad-45ab-b35a-ba36774d46eb
...
/onetime/<database-name>/part-00000-15868e4d-614e-4bdc-b94b-e9d5e064144f-c000.snappy.parquet
/onetime/<database-name>/part-00001-15868e4d-614e-4bdc-b94b-e9d5e064144f-c000.snappy.parquet
...
With a second Glue job, you can easily combine the one-time export and deltas into a single view of the data. In my opinion, this approach is the best of all worlds. It’s cheap and efficient. There is little code to maintain, and what code remains is quite simple. It allows historical views to be created for the table by reading only the deltas needed.
Summary
There are a lot of different ways to export data from DynamoDB into a data lake S3 bucket. All of them “work,” but in my experimentation, I’ve found that a simple Glue export and a DynamoDB stream into Firehose is often the best way to go. However, you should consider cost, availability, maintainability, and other factors to determine what best fits your needs. Finally, there are other considerations that I have glossed over to keep this article short, such as file formats, compression, IAM roles, and more that should be taken into account.
To learn more about technology careers at State Farm, or to join our team visit, https://www.statefarm.com/careers.