Testing Your Kinesis Stream With Kinesalite
Resources
- Kinesalite
- Getting started with Kinesalite by Gavin Stewart - Gavin’s guide gave me the first clues that I needed to make this work
- Akka Stream Source.actorRef
- Akka Alpakka AWS Kinesis Connector
- awscli kinesis commands
At work I need to move an application to Kubernetes, but some of its logs are necessary for user usage data and tracking ad impressions. Setting up rolling logging from our container to AWS S3 looked more complicated and risky than our current setup, so we didn’t even investigate it. Other applications at the company use Amazon AWS Kinesis. It made sense to do the same. I wrote a code snippet to push log messages to Kinesis via an Akka Stream. I could get everything working in the REPL except for the part that pushes to Kinesis.
I tried to use kinesalite, an application which implements AWS Kinesis, but there aren’t any guides for getting it up and running. I assumed you would just start kinesalite, point your Kinesis endpoint at localhost with the right port, and it would just work. I did that, and nothing happened. No error messages, no kinesalite log messages, nothing.
It took way too long (two days) to figure out how to write to kinesalite by….
Writing to Kinesis
Below is my example code to write to Kinesis. It creates an Akka Stream to which you can send messages. I tested each part of the Akka Stream in the Scala REPL.
Setting Fake AWS Credentials
Either the AWS Java SDK requires credentials (my bet) or Kinesalite requires credentials even though it doesn’t care what those credentials are. Create a $HOME/.aws/credentials file with the below contents.
This was the last step I completed to get Kinesalite working. Neither the AWS Java SDK or Kinesalite showed a single error message when trying to connect to Kinesis without authentication credentials.
Install the AWS CLI Tool
You need this to setup Kinesalite.
Creating Your Stream
I didn’t know you had to do this. Ops created our staging and production streams. I expected Kinesalite to accept requests for any stream, but I guess it behaves exactly like AWS Kinesis.
Run the AWS CLI tool with the following parameters. It is super sensitive to typos. I copied and pasted from examples without any noticeable spelling errors. The only messages it will give is something like “–stream-name is requires”
The first command creates your stream. The second command lists all existing streams.
Send Messages to Kinesis
In your REPL, send something to Kinesis.
Verifying the Output
Two parts to reading what has been pushed to Kinesis. First you need to find the shard iterator to read data from the stream.
Once you have the shard iterator, you can read all of the records since that iterator. Replace the –shard-iterator with the one in your output.
Your record is a base64 encoded string. The following scala snippet will decode it back to what you pushed to Kinesis.
Sidenote: Keep Your Libraries up to Date
Sending a Kinesis request for every log message is inefficient. There’s an Akka Flow called groupedWithin that lets you batch your requests by either a number of elements or a timeout. If you don’t reach the element limit within your timeout, groupedWithin will flush your batch. Even better there is groupedWeightedWithin which lets you specify a weight. Kinesis has a 1MB limit for its payloads, so we can batch our requests until we get close to 1000000 bytes.
We can’t use groupedWeightedWithin. Our application is still running on the Spray web framework. The latest Akka it supports is 2.4. The groupedWeightedWithin function wasn’t introduced until Akka 2.5.1. We’ll have to wait until we upgrade our application to Akka HTTP before we can use it.
If we kept our libraries up to date, we would have access to groupedWeightedWithin.
Asides
- Our current log roller was written by someone who doesn't work here anymore, and it's not dependable at all. It's based on logback which is designed to fail itself before it ever fails your application. One time we had a bug that could result in two instances of our application running at the same time. Only one of them was bound to the port that received connections. Our log rolling library would lock the log file during the roller over. Inevitably, the application not receiving requests would roll the logs and lock the other application out of ever writing to the log file.