Resources

  1. Kinesalite
  2. Getting started with Kinesalite by Gavin Stewart - Gavin’s guide gave me the first clues that I needed to make this work
  3. Akka Stream Source.actorRef
  4. Akka Alpakka AWS Kinesis Connector
  5. awscli kinesis commands

At work I need to move an application to Kubernetes, but some of its logs are necessary for user usage data and tracking ad impressions. Setting up rolling logging from our container to AWS S3 looked more complicated and risky than our current setup, so we didn’t even investigate it. Other applications at the company use Amazon AWS Kinesis. It made sense to do the same. I wrote a code snippet to push log messages to Kinesis via an Akka Stream. I could get everything working in the REPL except for the part that pushes to Kinesis.

I tried to use kinesalite, an application which implements AWS Kinesis, but there aren’t any guides for getting it up and running. I assumed you would just start kinesalite, point your Kinesis endpoint at localhost with the right port, and it would just work. I did that, and nothing happened. No error messages, no kinesalite log messages, nothing.

It took way too long (two days) to figure out how to write to kinesalite by….

Writing to Kinesis

Below is my example code to write to Kinesis. It creates an Akka Stream to which you can send messages. I tested each part of the Akka Stream in the Scala REPL.

import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.OverflowStrategy.fail
import akka.stream.alpakka.kinesis.KinesisFlowSettings
import akka.stream.alpakka.kinesis.scaladsl._
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{Sink, Flow}
import akka.util.ByteString
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.kinesis.AmazonKinesisAsync
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry

implicit val system: ActorSystem = ActorSystem("TestActor")
implicit val materializer: Materializer = ActorMaterializer()

// Create a Kinesis endpoint pointed at our local kinesalite
val endpoint = new EndpointConfiguration("http://localhost:4567", "us-east-1")

implicit val amazonKinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.standard().withEndpointConfiguration(endpoint).build()

// From the Akka Alpakka example
val flowSettings = KinesisFlowSettings(parallelism = 1,
    maxBatchSize = 500,
    maxRecordsPerSecond = 1000,
    maxBytesPerSecond = 1000000,
    maxRetries = 5,
    backoffStrategy = KinesisFlowSettings.Exponential,
    retryInitialTimeout = 100 millis
  )

val streamName = "myStreamName"
val partitionKey = "logs"

val loggingActor = Source.actorRef[String](Int.MaxValue, fail)
    .map(log => ByteString(log).toByteBuffer)
    .map(data => new PutRecordsRequestEntry().withData(data).withPartitionKey(partitionKey))
    .to(KinesisSink(streamName, flowSettings))
    .run()

loggingActor ! "testing this thing"
loggingActor ! "test test"

Setting Fake AWS Credentials

Either the AWS Java SDK requires credentials (my bet) or Kinesalite requires credentials even though it doesn’t care what those credentials are. Create a $HOME/.aws/credentials file with the below contents.

[default]
aws_access_key_id = x
aws_secret_access_key = x
region = us-east-1

This was the last step I completed to get Kinesalite working. Neither the AWS Java SDK or Kinesalite showed a single error message when trying to connect to Kinesis without authentication credentials.

Install the AWS CLI Tool

You need this to setup Kinesalite.

// pip3 if you're using Python 3 or just pip if it's properly aliased
pip2 install awscli

Creating Your Stream

I didn’t know you had to do this. Ops created our staging and production streams. I expected Kinesalite to accept requests for any stream, but I guess it behaves exactly like AWS Kinesis.

Run the AWS CLI tool with the following parameters. It is super sensitive to typos. I copied and pasted from examples without any noticeable spelling errors. The only messages it will give is something like “–stream-name is requires”

AWS_ACCESS_KEY_ID=x AWS_SECRET_ACCESS_KEY=x aws --endpoint-url http://localhost:4567/ kinesis create-stream --stream-name=myStreamName --shard-count=1 –-no-verify-ssl
AWS_ACCESS_KEY_ID=x AWS_SECRET_ACCESS_KEY=x aws --endpoint-url http://localhost:4567/ kinesis list-streams

The first command creates your stream. The second command lists all existing streams.

Send Messages to Kinesis

In your REPL, send something to Kinesis.

loggingActor ! "testing this thing"
loggingActor ! "test test"

Verifying the Output

Two parts to reading what has been pushed to Kinesis. First you need to find the shard iterator to read data from the stream.

AWS_ACCESS_KEY_ID=x AWS_SECRET_ACCESS_KEY=x aws --endpoint-url http://localhost:4567/ kinesis list-streams
AWS_ACCESS_KEY_ID=x AWS_SECRET_ACCESS_KEY=x aws --endpoint-url http://localhost:4567/ kinesis describe-stream --stream-name myStreamName

Once you have the shard iterator, you can read all of the records since that iterator. Replace the –shard-iterator with the one in your output.

AWS_ACCESS_KEY_ID=x AWS_SECRET_ACCESS_KEY=x aws --endpoint-url http://localhost:4567/ kinesis get-records --shard-iterator AAAAA+somekeyvalues

Your record is a base64 encoded string. The following scala snippet will decode it back to what you pushed to Kinesis.

import java.util.Base64

def decode(str: String): String = {
  Base64.getDecoder.decode(str).map(_.toChar).mkString
}

Sidenote: Keep Your Libraries up to Date

Sending a Kinesis request for every log message is inefficient. There’s an Akka Flow called groupedWithin that lets you batch your requests by either a number of elements or a timeout. If you don’t reach the element limit within your timeout, groupedWithin will flush your batch. Even better there is groupedWeightedWithin which lets you specify a weight. Kinesis has a 1MB limit for its payloads, so we can batch our requests until we get close to 1000000 bytes.

We can’t use groupedWeightedWithin. Our application is still running on the Spray web framework. The latest Akka it supports is 2.4. The groupedWeightedWithin function wasn’t introduced until Akka 2.5.1. We’ll have to wait until we upgrade our application to Akka HTTP before we can use it.

If we kept our libraries up to date, we would have access to groupedWeightedWithin.

Asides

  1. Our current log roller was written by someone who doesn't work here anymore, and it's not dependable at all. It's based on logback which is designed to fail itself before it ever fails your application. One time we had a bug that could result in two instances of our application running at the same time. Only one of them was bound to the port that received connections. Our log rolling library would lock the log file during the roller over. Inevitably, the application not receiving requests would roll the logs and lock the other application out of ever writing to the log file.