At AppMonet we help mobile app developers maximize their advertising revenue by running auctions each time an ad slot is available on their users’ devices. This activity generates tons of data, almost all of which enters our system through AWS Kinesis.
We write billions of events to Kinesis each day, and if one isn’t careful, this kind of activity can end up costing a lot of money. The most efficient way to use the service is demonstrated by Amazon’s official Kinesis Producer Library (KPL). However, this library can only be used with Java. The features of this library that we are most interested in include:
- Aggregating records to the optimal payload size
- Automating retries with exponential backoff with jitter strategy.
Amazon also offers a series of aggregation libraries in several other popular languages which would allow you to aggregate your records using the same technique as the KPL, but leaves the actual transmission of the aggregated records to Kinesis up to the user. Sadly there is no official Elixir/Erlang aggregation library either.
However, the fine people over at AdRoll also make heavy use of Kinesis streams, and have published several open source libraries to help the BEAM community efficiently interact with Kinesis.
Erlmld and Exmld are two libraries made to help efficiently and safely consume Kinesis streams. Erlmld in particular, comes with a port of the Python implementation of the official Kinesis record aggregator. We extracted the producing logic and ported it to Elixir.
We have begun to use it internally because we didn’t want to pull in the entire Exmld package, which is for consuming streams, just to use this one aggregation module.
ProvisionedThroughputExceededException
Once you start writing a non trivial amount of data to your first Kinesis stream, you will inevitably come across the “ProvisionedThroughputExceededException”. Don’t let this exception scare you in to scaling up your streams (and increasing your AWS bill) just yet!
Instead, your application should retry requests that produce this exception. The ExAws Elixir library comes with a great implementation of exponential backoff with jitter, and for many AWS services, this retry strategy will kick in automatically.
Unfortunately, we found that Kinesis Put Record API calls made with ExAwsKinesis do not trigger these automatic retries. We have patched the library and submitted our fix upstream and are waiting to hear back from the package maintainers.
For now, we are using our patched version with great success and enjoying excellent throughput on a minimal number of shards!
Practical Architecture
So what does this all look like in practice? There are three main pieces in our application.
- Poolboy pool
- GenServers
- Module to manage the GenServer state
We use a pool, mainly so no single GenServer becomes a bottle neck in our application.
You can adjust the size and max overflow of the pool according to your needs. We use the fifo strategy, so that we round robin through the pool and use all of the GenServers.
The GenServers look something like:
This GenServer provides an API function, write/1
which will check out a process from our pool, and push a new record to the GenServer’s state, which uses our KPL aggregator to pack it up and hold it until we are ready to send a batch to Kinesis.
The other important part to note here is the call to `Process.flag(:trap_exit, true)`, along with the implementation of the terminate/2
callback. These should ensure that any records that are being buffered in the GenServer state, are not lost when one of these GenServers is terminated.
Lastly, let’s take a look at the module that will manage the GenServer state:
This module is just a wrapper around the ExKpl aggregator. This module only exposes 3 functions.new/1
will return a new aggregator. write/2
adds a new record to the aggregator, and will automatically flush once the aggregated record has reached the maximum size, and return a new aggregator. Finally flush/2
allows us to flush the aggregator and send all accumulated records to Kinesis. In some applications we also track the time of the last flush, so that we can have the aggregators flush if the elapsed time is greater than a certain threshold, but if your volume is high enough, you probably don’t need to worry about that.
This example uses the put_record
API method, but you should also explore put_records
as this method can send up to 500 records per request, each of which can be up to 1mb, so 5 of our carefully aggregated records a time.