Write throughput scaling
A common challenge for database operations is responding to high-throughput requests. High volume, concurrent updates on individual documents frequently leads to high contention within the database, with many connected clients trying to read from and write to a document simultaneously. Fortunately, there are schema design techniques to mitigate this problem. When your data access patterns involve a high volume of writes to a single document, one mitigation strategy is to aggregate write operations into "chunks", or batches, and execute them at planned intervals, thus reducing write contention.
Example use case: real-time tracking of bytes uploaded
The following section outlines a strategy for throughput scaling with a schema design pattern that reduces write contention. This schema design is applicable to many use cases and is based on two design patterns, called Event Sourcing and Command Query Responsibility Segregation, which were developed for use in high-performance banking applications.
This schema design:
-
Avoids contention caused by rapid writes to a single document.
-
Is a "write-once, read-many" pattern that avoids updates to indexes (and in Fauna’s case, unbounded history growth).
This example considers a
collection which keeps track
of data processing by individual users who perform compute operations
with a Lambda/Worker (Edge Function). A field named bytesUploaded
keeps a running count of total bytes processed by all Workers. A
sufficiently high volume of Worker activity could quickly lead to
database contention as multiple clients perform write operations
simultaneously on the bytesUploaded
field.
Data collection
Rather than updating the bytesUploaded
field every time a new update
is available, a function might insert each new update as a single
document in a separate collection named eventsLog
. This avoids
contention, because the function is creating new documents rather than
updating an existing one. Here’s an example of what such a document
could look like:
{
"ref": Ref(Collection("eventsLog"), "1"),
"ts": 1630107062410000,
"data": {
"user": Ref(Collection("User"), "123"),
"bytesUploaded": 278 }
}
The user
field contains a Reference to the user which performed the
compute operation, and the bytesUploaded
field records the result of
the operation.
Aggregation
To aggregate the accumulated data, you could read all the prior
eventsLog
documents and perform the aggregation, but that would be
prohibitively expensive in both time and operations. There’s a better
way.
Command query responsibility segregation can help. Implement another
Worker that reads through the eventsLog
documents and stores the
aggregations as a set of incremental snapshots. Note that the first time
the job runs, the Worker must read through all the existing eventsLog
documents at once, but subsequent jobs won’t have to.
Each snapshot stores the aggregate value at specific, incremental points in time. An example of the data in a snapshot would look like this:
{
"ref": Ref(Collection("snapshots"), "2"),
"ts": 1630107063800010,
"data": {
"user": Ref(Collection("User"), "123"),
"runningBytesUploaded": 1163,
"lastReadTS": 1630107062480000,
}
}
The snapshot contains the user Reference, the aggregated number of
bytes uploaded, and the timestamp of the last eventsLog
document.
Any time a Worker needs a consistent, current view of total bytes
uploaded, it first queries for the latest snapshot, then reads all
eventsLog
documents that have occurred since the lastReadTS
timestamp. Optionally, the current snapshot’s value could be written
back to the User
document for easier data retrieval.
Fetch the latest state
Getting the latest state is the same process of reading the latest
snapshot and aggregating the cumulative eventsLog
entries since that
same snapshot was made, as illustrated below:
The Workers don’t have to replay the entire log, so getting the latest aggregated value is much faster.
Possible implementation strategies:
-
Use a cron job (or equivalent) to create new snapshots at a given interval.
-
Invoke the aggregating Worker when a client makes a request for "bytes uploaded" (with polling intervals implemented client-side).
Fauna Query Language implementation notes
Let’s take a look at how you can use FQL to retrieve documents created after a certain date-time.
First we need an index that includes the ts
(timestamp) value which is
part of every Fauna document. The following command creates an index
which sorts the eventsLog
collection by timestamp from oldest to
newest:
CreateIndex({
name: "events_log_sort_by_ts",
source: Collection("eventsLog"),
values: [
{ field: ["ts"] },
{ field: ["data", "user"] },
{ field: ["data", "bytesUploaded"] }
]
})
Then you can use the after
parameter with the Paginate
function
to return all eventsLog
documents inserted after a certain date-time:
Paginate(
Match(Index("events_log_sort_by_ts")),
{
after: 1630107062480001,
}
)
The above FQL query is equivalent to the SQL statement SELECT * FROM
eventsLog WHERE ts >= @datetimevalue
.
Notes:
-
The preceding query uses the
after
parameter because this is a pagination cursor rather than a value comparison. -
after
is inclusive: it finds documents starting with the given value and higher. That’s why one microsecond was added to thelastReadTS
value: the lasteventLog
message would be included in the next snapshot, leading to limited double counting.
For ease of use and repeatability, the above command should be a
user-defined
function (UDF) so you can pass a value to the after
parameter.
The command shown below creates a new UDF which checks the
events_log_sort_by_ts
index for all documents which were added on or
after a given timestamp.
CreateFunction({
name: "findLatestEvents",
body: Query(
Lambda(
"lastSnapshotDateTime",
Paginate(
Match(Index("events_log_sort_by_ts")),
{
after: TimeAdd(Var("lastSnapshotDateTime"), 1, "microsecond")
}
)
)
)
})
You can use the new UDF with the Call
function:
Call("findLatestEvents", 1639768939970000)
The output should return the timestamp, user reference, and number of bytes uploaded for each document it finds:
{
before: [1639768939970000],
data: [
[1639768939970000, Ref(Collection("User"), "318252577924842048"), 512],
[1639768949105000, Ref(Collection("User"), "318252577924842048"), 2782],
[1639768961000000, Ref(Collection("User"), "318252577924842048"), 722],
[1639768969695000, Ref(Collection("User"), "318252577924842048"), 8830]
]
}
The results show a before field even though we passed an after
parameter to the Paginate function. This is because we’re
looking at the final page of the results set from our index query,
and there’s nothing after the last displayed item. If you wanted
to navigate to an earlier page of the result set you could use the
returned before value to get there. For more information about
navigating through the pages of a results set, see
Paginate .
|
Summary
The solution outlined in this tutorial consists of three separate functions that are each responsible for executing their own, very specific database task:
-
One Function/Worker that inserts new records into the
eventsLog
collection and does nothing else. -
One Function/Worker that runs as part of a cron job (or other task-scheduling system) and generates snapshots, and nothing else.
-
One Function/Worker that gets the latest state by loading the latest snapshot and reading the newest
eventsLog
entries.
The solution described here has the added benefit of scalability. If there’s a need in the future to aggregate on a new field, you can add additional Functions/Workers to perform the specific aggregation. You can safely add new types of aggregation Functions/Workers, as long as they don’t interfere with the existing Functions/Workers.
Is this article helpful?
Tell Fauna how the article can be improved:
Visit Fauna's forums
or email docs@fauna.com
Thank you for your feedback!