AWS Kinesis with Java

An introduction to Kinesis using Apache Camel and Spring Cloud Stream

Streaming and its integration with Spring

Audience

Argument

Message Brokers

The benefits of using a message broker are decoupling the producer and consumer, maintaining a queue, providing reliable storage, managing the transactions and guaranteeing the message delivery.

Example of a Message Broker sitting between data producers and data consumers

Often this is used when we have large streams of data, or if we want to communicate between different components in a Microservices architecture. Overall it allows us to move from batch processing to real-time analytics.

Kinesis

  • Video streams: This allows us to securely stream video from devices to AWS. We could then use services like Rekognition for automated content discovery.
  • Data streams: This is a scalable and durable solution for streaming large amounts of data between producers and consumers. Along with data analytics it will be the focus of this article.
  • Data firehose: This is for pushing data into AWS data stores, and can be used alongside BI tools.
  • Data analytics: This can be used to process data in real time using SQL or Apache Flink.

So how is Kinesis different to a queueing system like AWS SQS? Well, only one consumer can read from SQS, and once a message has been processed it disappears forever. However, in Kinesis a message can be read by multiple consumers and remains on the shard until the Time to Live (TTL) is expired.

Let’s move on to clarify some core terminology.

  • Data Record: A data record reflects a message in our system. They are composed of a sequence number, partition key and a data blob.
  • Sequence Number: This is a unique (to the partition) number that accompanies a data record. It is generated internally by Kinesis.
  • Partition Key: This is used to group data records onto shards. We must specify a partition key when putting data onto a stream.
  • Shard: This is a sequence of data records within a stream. The more shards we have the more throughput we can achieve.
  • Kinesis Data Stream: A Kinesis data stream is a group of shards.
  • Kinesis Data Stream Application: This is the consumer of our Kinesis Data Stream

To outline how the components come together, let’s make a diagram.

How the various components of Kinesis fit together

Note that a data stream is set in a region. It is then automatically replicated across multiple availability zones for maximum resilience. In our example we have also used EC2s to represent our data stream application, but it could be anything.

So how do we decide a partition key? There are two main factors that come into play: ordering and capacity. Each shard has a maximum capacity, so to share load evenly we may generate a random key — the hash generated from this will distribute the records.

Equally, ordering may be important to us. We may have several different producers, each of which need their messages to be played in sequence. A shard guarantees the order of messages, however if we distribute these messages across shards there is no such guarantee. Therefore we may want to put all messages from a given source on a particular shard.

The next few definitions apply to Kinesis Data Analytics. There are two different APIs we can use: the DataStream API or the Table API. We will be covering the former, using Apache Flink.

  • Transformation Operator: This is what carries out the transformation. It takes one or more data streams as an input, then produces an output data stream.
  • Source/ Sink: These are connecters used to read external data, or write to external locations respectively. They correspond to the data producer and consumer.
  • Operator: This is a connector used to carry out the transformation.
Example of using Kinesis Data Analytics

In the above we are using Kinesis as the data producers and consumers, but we can read and write to a variety of sources.

Apache Camel

Let’s define some more key terms:

  • Endpoint: These are the ends of an inter-process communication. For example in a client-server communication the two endpoints would be the client and the server.
  • Message: This represents a message being exchanged between systems.
  • Processor: Processors are used to manipulate and mediate messages between Endpoints.
  • Routes: A route is the passage of a message through a Camel application, from an input to a destination (if there is one).
  • Components: Components are how Camel connects to systems. For example, we will have a component for Kinesis, and one for REST.
  • Camel Context: This represents the runtime system. Normally you would create your camel context, add endpoints, components and routes, then start the context running, stopping it when the application terminates.

Although this may still seem slightly opaque, the worked example will help answer any questions.

Spring Cloud Stream

It centres on ‘Binder Implementations’, which are provided for each of the systems we may want to integrate with (Rabbit MQ, Kafka, Kinesis…). The in turn comprise of the following:

  • Destination Binders: These are components for integrating the external messaging systems.
  • Destination Bindings: These bridge the application code (producer/consumer) provided by the end user and the external messaging system.
  • Message: This is the data structure used by producers and consumers to communicate with other applications via the destination binders.

However, in reality most of this is abstracted away, and in our example we will see how easy it is to set up.

Worked Example

Example architecture for polling the Twitter API

First of all we need to create the two Java Spring applications. One will be used to process the Tweets from the API using Camel, and put them onto the initial data stream (the producer). The other will be used to pull from the data analytics stream using Spring Cloud Stream, and print them (the consumer).

In our producer component we will poll the Twitter API. To get access to the API we need to set up a Twitter Developer account. Once this is done we will be given a bearer token which will allow us to hit the endpoints.

We will also set up some Kinesis applications, first the two data streams, then the analytics application. Setting up a data stream is very straightforward, in fact in the AWS console there is only one screen.

Setting up a Kinesis Data Stream

Hit create and we’ve set up our first Kinesis component! We will need two data streams twitter-input-data-stream and twitter-output-data-stream.

Now let’s make a Kinesis analytics application, whose setup is slightly more involved.

Setting up a data analytics component, part one

Initially we specify the name and a description. We also select Apache Flink as our runtime language. We will be using this to do our analytics.

From there we create a new IAM role to assign to the application, and choose the development set of application settings.

Setting up a data analytics component, part two

We now have all the Kinesis applications we need!

Finally we need to create an S3 bucket in order to store the .jar for our Data Analytics application. All of the default settings can remain.

There’s a little bit too much code in our example to cover in detail, so we’ll focus on the core components, using links to the relevant files.

The first thing we look at is the route. Here we can see every 20 seconds we poll the API for Tweets to the BBC account. Once we have received them we pass them to a processor, which will send them to our first Kinesis Data Stream.

The processor is used to deserialize the response from Twitter and pass to the Kinesis service, which contains the code for putting some data on Kinesis! It uses a preconfigured client, takes the message, converts it into bytes and puts it on the stream. Easy!

That’s our producer component checked off, let’s have a look at the analytics application. Again, concentrating on the main class we see it defines a source and a sink, linking the two together.

So far this seems quite familiar! However, we now need to do a little bit of work to get all of our components to play nicely together, a useful document is found here. We need to upload our Flink code to the S3 bucket and give our Data Analytics application access. This is done through the IAM role we implicitly created in an earlier step.

Finally we write our consumer, which uses Spring Cloud Streams to ingest the messages from the output Data Stream. There is fairly minimal configuration needed, and only a single bean to wire!

Let’s say the producer is running on our local machine, we would see the logs below:

Example log output for the producer

You can see we have retrieved the Tweets, deserialized them to an object and put them on the Kinesis Data Stream.

Now let’s have a look at what’s going on in the Data Analytics Application. For this we can use the Apache Flink dashboard provided by Kinesis Data Analytics.

Opening up the console we have a screen outlining the shape of the application:

Example Apache Flink Dashboard screen

From the dashboard we have a source going into a sink, just as we expected.

Finally we look at our consumer application logs from polling the other side:

Consuming Tweets on the other side of the Kinesis applications

We are successfully reading them off the output stream!

Conclusion

Principal Software Engineer at the BBC