AWS DEEP DIVE SERIES

Amazon DynamoDB Deep Dive. Chapter 3: Consistency, DynamoDB streams, TTL, Global tables, DAX

The story of one of the world’s fastest database in a human-friendly format

  • Chapter 1: Essentials
  • Chapter 2: DynamoDB components
  • Chapter 3: Consistency, DynamoDB streams, TTL, Global tables, DAX (you are here!)
  • Chapter 4: Data modeling, best practices, What’s next (TBD)

Hello again!

In this section, we will observe some advanced features of DynamoDB. We will see how to integrate data flow with other services using DDB Streams, build a multi-region database infrastructure using Global Tables, use DAX for caching, and apply TTL (Time-To-Live) on the items.

To better understand the trade-off of distributed computing, we will start our path from Consistency!

DynamoDB Consistency

TL;DR: If you are familiar with the terms ACID and CAP theorem — DynamoDB is A/P.

A long time ago, one of the main challenges with the databases was how to avoid data corruption during concurrent access. To solve those, the Ancient Engineers have implemented lock mechanisms and transactions. Implementing effective and secure transactional technology has led to the creation of ACID as a set of the required properties for database systems.

The ACID acronym stands for the following:

  • Atomicity — the transaction is either committed upon completion or rolled back.
  • Consistency — each transaction guarantees the change of the data from one state to another.
  • Isolation — concurrent transaction executions do not affect each other.
  • Durability — committed transaction is saved and not lost in case of a system crash (not to mistreat with server loss!)

SQL commands such as COMMIT and ROLLBACK implement ACID. On top of that, the Lock Manager — one of the DBMS components — ensures that the data is not accessed by multiple transactions simultaneously (in programming, you call it a Race Condition).

While such security features provide us with data protection, database operations' performance — mostly writes’ — dramatically degrades over time, the data and activity growth.

However, improving performance means other requirements must suffer. The CAP theorem was introduced just before the millennia to mitigate what specific distributed systems can and cannot do.

The CAP theorem stands for the following:

  • Consistency
  • Availability
  • Partition Tolerance

While you get C and A, Partition Tolerance means if we can retrieve the remaining data if part of it is lost (even forever). The theorem states that the distributed system can fulfill only 2 out of 3. CAP is an exciting topic to investigate, so make sure you study it through!

That does not mean that we can not have distributed ACID-compliant databases. But it introduces us to another kind of database systems — BASE.

  • Basic Availability — “most likely, I will process your request.”
  • Soft state — “let me not guarantee you that the data is as it is right now.”
  • Eventual Consistency — “if you are lucky, you will read the latest update.”

Where was I? Ah, DynamoDB!

The straightforward idea behind DynamoDB is to provide you with as efficient and quick reads and writes as possible. The tradeoff is the consistency of the data.

When you invoke PutItem and then immediately do GetItem for the same data, there is a chance that you will retrieve older data. There are several things we can do about that.

Performing a consistent read is plain simple:

>>> characters.get_item(
... Key={
... 'playerId': '7877e1b90fe2',
... 'characterId': '74477fae0c9f'
... },
... ConsistentRead=True
... )['Item']
{'race': 'human', 'level': Decimal('7'), 'health': Decimal('9000'), 'characterId': '74477fae0c9f', 'mana': Decimal('10'), 'playerId': '7877e1b90fe2', 'strength': Decimal('42'), 'speed': Decimal('23'), 'playerRegion': 'us-east', 'currentServer': 'srv01.us-east.bestrpg.com', 'class': 'knight', 'playerEmail': 'foo@bar.com'}

Where’s the catch? Well, in the pricing. ;)

While eventually, consistent reads cost us 0.5 RCU (Read Capacity Unit), strongly consistent reads consume the whole RCU.

There is another thing we can do in favor of consistency — conditional write! Let’s suppose our datacenter had a severe outage, and we need to migrate our characters to another server. We need to run the following:

>>> characters.update_item(
... Key={
... 'playerId': '7877e1b90fe2',
... 'characterId': '74477fae0c9f'
... },
... UpdateExpression = 'SET currentServer = :newserver',
... ExpressionAttributeValues={
... ':newserver': 'srv02.us-east.bestrpg.com'
... }
... )

Imagine when we accidentally receive a double request to move a player’s character to another server (duplication is often the case in distributed systems)? In this scenario, the character will be moved twice and impact the user experience.

To mitigate that, we have conditional writes; the idea is that we perform a GetItem just before UpdateItem or PutItem.

>>> current = characters.get_item(
... Key={
... 'playerId': '7877e1b90fe2',
... 'characterId': '74477fae0c9f'
... },
... ProjectionExpression = 'currentServer'
... )['Item']['currentServer']
>>> characters.update_item(
... Key={
... 'playerId': '7877e1b90fe2',
... 'characterId': '74477fae0c9f'
... },
... UpdateExpression = 'SET currentServer = :newserver',
... ExpressionAttributeValues={
... ':newserver': 'srv02.us-east.bestrpg.com'
... },
KeyConditionExpression=Key('currentServer').eq(current)
... )

Conditional writes are not tightly related to the consistent reads but are useful to avoid race conditions in a concurrent environment.

We have just changed the player’s server, but the mighty knight or skillful mage won’t move to another server just because of that. We need to run some external logic when the change happens in the database.

Introducing — DynamoDB Streams!

DynamoDB Streams

Let’s reiterate our disaster scenario. One or more of our servers go down, and the alarming system issues player migration. The recovery system finds the least loaded cluster and sends an update to the table to change the affected users. As we need to perform some additional action, we need to capture that change and process it. For example, send a notification to the client with a new server address.

Technically we could send an SNS notification or put a message in the SQS queue, or even send an API call to some service. However, there is an easier way to capture the change supported by DynamoDB out of the box.

DynamoDB Streams is an implementation of CDC — Change Data Capture — a design pattern that allows for tracing changes happening here and there.

To enable Stream on a table, we need to update our table settings.

$ aws dynamodb update-table \
--table-name characters \
--stream-specification \
StreamEnabled=True,StreamViewType=NEW_AND_OLD_IMAGES
{
# very long output...
"StreamSpecification": {
"StreamEnabled": true,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"LatestStreamLabel": "2021-01-06T17:41:06.230",
"LatestStreamArn": "arn:aws:dynamodb:REGION:ACCT_ID:table/characters/stream/2021-01-06T17:41:06.230"
}
}

That was easy! Note the parameter StreamViewType — it indicates what exactly we put in the stream. At the moment of this writing, supported projections are:

  • KEYS_ONLY — put the only primary key of the modified item
  • NEW_IMAGE — only new changes
  • OLD_IMAGE — only previous state
  • NEW_AND_OLD_IMAGES — both new and previous state

What you want to expose depends on your purchasing power's business needs, as Streams are billed as Read Request Units. So don’t forget to check the pricing section before enabling it!

In my case, I need both old and new items, as my logic needs to send a notification to the client and decommission an old server.

Enabling streams is not enough to perform all the logic we need. For that, we need the Lambda function and a trigger. I am expecting that you already understand AWS Lambda. So here’s our stream processing function:

import jsondef handler(event, context):
return {
'statusCode': 200,
'body': json.dumps(event)
}

The function will need the following IAM permissions to consume events from the stream:

{
"Effect": "Allow",
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:ListStreams"
],
"Resource":
"arn:aws:dynamodb:region:accountID:table/characters/stream/*" }

We’re good to go! Let’s create a trigger.

Side note: if you prefer using AWS Console (I am not judging!), you can create a trigger from the DynamoDB console. However, the actual job is done on the Lambda side, not DynamoDB.

Creating a trigger for Streams is no different from any other event source.

$ aws lambda create-event-source-mapping \
--function-name streams \
--event-source characters_stream_arn \
--batch-size 1 \
--starting-position TRIM_HORIZON

--batch-size stands for how many records I am passing to the function. --starting-position is a position from where to start reading and applies only to DynamoDB Streams, Kinesis, and MSK. TRIM_HORIZON means “everything that’s in the stream!” (for adding features to existing workloads, I would suggest using LATEST).

Once my function is subscribed to the changes, I can check it out. Let’s make a change to an existing item and see what Lambda will receive!

>>> chars.update_item(
... Key={
... 'playerId': '7877e1b90fe2',
... 'characterId': '74477fae0c9f'
... },
... UpdateExpression = 'SET currentServer = :newserver',
... ExpressionAttributeValues={
... ':newserver': 'srv03.us-west.bestrpg.com'
... }
... )

Now I am going to view the logs of my stream processing function.

{
'Records': [
{
'eventID': '4699f01181c775a95d63bc3fc518427f',
'eventName': 'MODIFY',
'eventVersion': '1.1',
'eventSource': 'aws:dynamodb',
'awsRegion': 'us-east-1',
'dynamodb':
{
'ApproximateCreationDateTime': 1610028408.0,
'Keys':
{
'characterId': {'S': '74477fae0c9f'},
'playerId': {'S': '7877e1b90fe2'}
},
'NewImage':
{
'characterId': {'S': '74477fae0c9f'},
'currentServer': {'S': 'srv02.us-east.bestrpg.com'},
'playerId': {'S': '7877e1b90fe2'},
# the rest...
},
'OldImage':
{
'characterId': {'S': '74477fae0c9f'},
'currentServer': {'S': 'srv03.us-west.bestrpg.com'},
'playerId': {'S': '7877e1b90fe2'},
# the rest...
},
# and many many useful meta data...
]
}

Aha! So anEvent object contains an array of updated records (block dynamodb) with an old and new image. We also have MODIFY as an eventName.

I am going to add some logic to my function. When it receives a modification event with a change of servers, it will send a call to decommission the server and notify the player. This is how the code will look like:

import jsondef notify_player(email, new_server):
print(f'Notified player {email} to move to the {new_server}')
def decommission_server(old_server):
print(f'Decommissioned {old_server}')
def lambda_handler(event, context):
statusCode = 200
message = ''
event_name = event['Records'][0]['eventName']
change = event['Records'][0]['dynamodb']
old_server = change['OldImage']['currentServer']['S']
new_server = change['NewImage']['currentServer']['S']
email = change['NewImage']['playerEmail']['S']
if event_name == 'MODIFY':
if new_server != old_server:
notify_player(email, new_server)
decommission_server(old_server)
message = 'Did useful stuff!'
return {
'statusCode': 200,
'body': {
'message': message
}
}

Side note: the code above is not error-prone. We don’t know what to do if we receive OldImage when we don’t change an existing item but place a new one. What about DeleteItem? We might face many caveats, so please don’t copy this code in your production system!

Let’s update the table again and check the logs!

Yay! Few more things we need to know before implementing Streams.

First, Streams integrate only with AWS Lambda. If you want to use different consumers, you want to integrate DynamoDB with Kinesis Streams. Second, the change lives 24 hours in the Stream. Speaking of TTL…

DynamoDB Item TTL

DynamoDB is a great database to store and skim individual items of data. There might be cases when we don’t need the items to live forever. For this purpose, there is another feature called TTL (Time-To-Live).

Suppose we run a batch job, which drops plenty of in-game loot on random spots on the map. Each “loot” item looks as follows:

{
'locationId': 'ewref2214', # partition key
'lootId': 'fdjijr23q21', # sort key
'rarity': 'rare',
'lattitude': '48.856144',
'longitude': '2.297820'
}

Periodically the server will scan the table to get all the items and place in-game items on a map.

If I want to expire the loot, e.g., remove it from the map, I can store the TTL on the server, and it will remove the loot automatically from the map, but I also will have to manually remove the item from the table, which will cost me at least one WCU per item. Or I can leverage DynamoDB TTL!

The good thing about TTL is that it happens in the background, and I am not being charged for the delete. To use it, I will add TTL to the loot item.

{
'locationId': 'ewref2214', # partition key
'lootId': 'fdjijr23q21', # sort key
'rarity': 'rare',
'lattitude': '48.856144',
'longitude': '2.297820',
'ttl': 1610471877 # must be a UNIX epoch time
}

And enable TTL for items in the table.

$ aws dynamodb update-time-to-live \
--table-name loot \
--time-to-live-specification \
"Enabled=true,AttributeName=ttl"

The AttributeName=ttl corresponds with the attribute of the item. When TTL is enabled, the backend of DynamoDB will compare the TTL attribute with the current server time and delete the item from the table and every index if it is expired. If streams are enabled, it will also push an event into it to process that if needed.

If the item does not have a TTL attribute, it will not be deleted by a background worker, which allows us to combine disposable and persistent data in the same table.

TTLs are easy to implement and handy to use. Combined with Streams, we can even turn DynamoDB into a real-time streaming platform. And using Global Tables — into the plane-scale near-real-time streaming platform!

Wait, did I just say Global Tables?

DynamoDB Global Tables

Running a distributed system in a single data center is challenging yet doable. Running a distributed system in multiple data centers across continents is even harder and very expensive to achieve. The good thing about AWS is that extremely competent engineers manage many things for our products' sake and prosperity. And DynamoDB is no different.

DynamoDB Global Tables are multiple tables (also called replica) tables that share changes between each other. They can be created in different AWS regions in a single account.

Side note: when it comes to consistency and global distribution, one may start asking how the concurrency is managed. For Global Tables, the last writer wins, and DynamoDB will do its best to determine who is that last writer. At least that’s what AWS says.

DynamoDB Streams with NEW_AND_OLD_IMAGES are essential for the Global Tables to work, but we already have them enabled for our game characters table.

My table resides in us-east-1 and I want to have replicas in us-west-1, eu-west-1 (Ireland), eu-central-1 (Germany), and ap-southeast-1 (Singapore). I will issue the following command:

$ aws dynampdb update-table \
--table-name characters \
--replica-updates \
'{"Create": {"RegionName": "us-west-1"}}'

It will take some time for the replica to create in another region.

$ aws dynamodb describe-table \
--table-name characters \
--query 'Table.TableStatus' \
--output text
UPDATING

Once the status is back to ACTIVE I can work on making my application truly distributed! Let’s try it ou — I will initiate two separate Boto3 sessions to work in different regions.

# I cannot create a resource object with a region,
# as region endpoint is passed to the Client or Session object
# So first I need two sessions.
>>> session_east = boto3.session.Session(region_name='us-east-1')
>>> session_west = boto3.session.Session(region_name='us-west-1')
# Now I can create 2 Resource and Table objects
# corresponding to the respective region
>>> ddb_east = session_east.resource('dynamodb')
>>> ddb_west = session_west.resource('dynamodb')
>>> characters_east = ddb_east.Table('characters')
>>> characters_west = ddb_west.Table('characters')

I will create an item in the characters table in the us-east-1 region and then read this item from us-west-1.

>>> characters_east.put_item(
... Item={
... 'playerId': 'foo1234',
... 'characterId': 'bar5678'
... }
... )
>>> characters_west.get_item(
... Key={
... 'playerId': 'foo1234',
... 'characterId': 'bar5678'
... }
... )['Item']
{'characterId': 'bar5678', 'playerId': 'foo1234'}

Great, the item has just appeared in a millisecond!

But the table in the N. Virginia region is our main table. Let’s check if we have the same when writing to the replica table.

>>> characters_west.put_item(
... Item={
... 'playerId': 'foobar1234',
... 'characterId': 'barfoo5678'
... }
... )
>>> characters_east.get_item(
... Key={
... 'playerId': 'foobar1234',
... 'characterId': 'barfoo5678'
... }
... )['Item']
{'characterId': 'barfoo5678', 'playerId': 'foobar1234'}

We will cover Global Tables a bit more in the next chapter when we discuss best practices. On to our last topic of today — DynamoDB Accelerator or DAX.

DAX

Database Systems developers are doing their best to build the world’s fastest databases. However, getting the data right from the computer's memory is always faster than reading it from persistent storage. The same applies to DynamoDB, so its development team introduced a proprietary caching system called DAX. Unlike ElastiCache for Redis, DAX acts as a proxy in front of the DynamoDB table, leveraging both caching strategies: lazy loading and write-through.

Creating a DAX cluster is slightly more complicated than creating a table as we need additional resources such as network subnets and IAM role with permissions to access a table. Below is the CloudFormation template to create the cluster.

I will deploy a stack:

$ aws cloudformation deploy \
--stack-name dax \
--template-file dax.yaml \
--parameter-overrides \
TableName=characters \
ClusterSize=1 \
Subnets="subnet-123,subnet-456" \
--capabilities CAPABILITY_IAM

It will take a while to provision the cluster. When it’s done, I can configure my client to use the cluster endpoint.

$ aws cloudformation describe-stacks \
--stack-name dax \
--query "Stacks[].Outputs[].OutputValue" \
--output text
uglystring.clustercfg.dax.use1.cache.amazonaws.com:8111

I will also need to create a role for the EC2 instance, to access DAX and DynamoDB.

{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"dax:*"
],
"Effect": "Allow",
"Resource": [
"arn:aws:dax:region:acct_id:cache/uglystring"
]
}
]
}

From the EC2 instance, provisioned in the same network as the DAX cluster, I can install the Python dependencies to connect to the caching layer.

$ sudo yum -y install python3-pip
$ pip3 install amazon-dax-client boto3 --user

I need a different client, as DAX’s API is similar to DynamoDB but serves as another endpoint. This endpoint is passed to the DAX client, and I can create the same Table object to work with.

>>> import boto3
>>> from amazondax import AmazonDaxClient
>>> dax = AmazonDaxClient.resource(
... endpoint_url="verylongurl:8111",
# If you don't add region name
# you most likely will get a routing issue
... region_name='us-east-1')
>>> characters = dax.Table('characters')
>>> characters.scan()['Items']
[{'playerId': 'foobar1234', 'characterId': 'barfoo5678'}, {'playerId': 'foo1234', 'characterId': 'bar5678'}, {'playerId': '7877e1b90fe2', 'characterId': '74477fae0c9f', 'class': 'knight', 'currentServer': 'srv10.us-east.bestrpg.com', 'health': Decimal('9000'), 'level': Decimal('7'), 'mana': Decimal('10'), 'playerEmail': 'foo@bar.com', 'playerRegion': 'us-east', 'race': 'human', 'speed': Decimal('23'), 'strength': Decimal('42')}]

I can also add an item to the table via DAX.

>>> characters.put_item(
... Item={
... 'playerId': 'createdByDax',
... 'characterId': 'createdByDax'
... }
... )
# I will create a new connection directly to Dynamodb
>>> ddb = boto3.resource('dynamodb')
>>> characters = ddb.Table('characters')
>>> characters.get_item(
... Key={
... 'playerId': 'createdByDax',
... 'characterId': 'createdByDax'
... }
... )['Item']
{'characterId': 'createdByDax', 'playerId': 'createdByDax'}

Magnifique!

Few more things before I let you go! DAX is an instance-based service, so you pay for the nodes, regular traffic costs, and the W/RCUs from DAX to DynamoDB. DAX also supports LSIs and GSIs, so you are never limited in features!

This is the 3rd and almost the last chapter of the DynamoDB Deep Dive series! You have walked such a long path with me, and we are very close to the finale!

In the next chapter, I will explain the best practices and patterns of using DynamoDB and end this journey with many useful links for you to fulfill your Serverless journey!

xoxo

Written by

Solution Architect, Creative Problem Solver, Pure Engineering Advocate, World-Class FizzBuzz Developer, AWS APN Ambassador EMEA, Data Community Builder.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store