AWS Kinesis with Java
An introduction to Kinesis using Apache Camel and Spring Cloud Stream
This article is aimed at developers with a reasonable understanding of Java and Spring, but no experience whatsoever with AWS Kinesis, Apache Camel or Spring Cloud Stream. It will explain the role of message brokers, introduce Kinesis and conclude with a worked example based on Twitter data.
Message brokers act as an intermediate between a data producer and a data consumer, responsible for routing, validating and transforming their input.
The benefits of using a message broker are decoupling the producer and consumer, maintaining a queue, providing reliable storage, managing the transactions and guaranteeing the message delivery.
Often this is used when we have large streams of data, or if we want to communicate between different components in a Microservices architecture. Overall it allows us to move from batch processing to real-time analytics.
Kinesis is the AWS message broker offering. There are several different Kinesis capabilities:
- Video streams: This allows us to securely stream video from devices to AWS. We could then use services like Rekognition for automated content discovery.
- Data streams: This is a scalable and durable solution for streaming large amounts of data between producers and consumers. Along with data analytics it will be the focus of this article.
- Data firehose: This is for pushing data into AWS data stores, and can be used alongside BI tools.
- Data analytics: This can be used to process data in real time using SQL or Apache Flink.
So how is Kinesis different to a queueing system like AWS SQS? Well, only one consumer can read from SQS, and once a message has been processed it disappears forever. However, in Kinesis a message can be read by multiple consumers and remains on the shard until the Time to Live (TTL) is expired.
Let’s move on to clarify some core terminology.
- Data Record: A data record reflects a message in our system. They are composed of a sequence number, partition key and a data blob.
- Sequence Number: This is a unique (to the partition) number that accompanies a data record. It is generated internally by Kinesis.
- Partition Key: This is used to group data records onto shards. We must specify a partition key when putting data onto a stream.
- Shard: This is a sequence of data records within a stream. The more shards we have the more throughput we can achieve.
- Kinesis Data Stream: A Kinesis data stream is a group of shards.
- Kinesis Data Stream Application: This is the consumer of our Kinesis Data Stream
To outline how the components come together, let’s make a diagram.
Note that a data stream is set in a region. It is then automatically replicated across multiple availability zones for maximum resilience. In our example we have also used EC2s to represent our data stream application, but it could be anything.
So how do we decide a partition key? There are two main factors that come into play: ordering and capacity. Each shard has a maximum capacity, so to share load evenly we may generate a random key — the hash generated from this will distribute the records.
Equally, ordering may be important to us. We may have several different producers, each of which need their messages to be played in sequence. A shard guarantees the order of messages, however if we distribute these messages across shards there is no such guarantee. Therefore we may want to put all messages from a given source on a particular shard.
- Transformation Operator: This is what carries out the transformation. It takes one or more data streams as an input, then produces an output data stream.
- Source/ Sink: These are connecters used to read external data, or write to external locations respectively. They correspond to the data producer and consumer.
- Operator: This is a connector used to carry out the transformation.
In the above we are using Kinesis as the data producers and consumers, but we can read and write to a variety of sources.
Before we go to our example, let’s examine Apache Camel, which is how we are going to integrate with the Twitter API. Apache Camel is an Open Source framework for integrating systems consuming or producing data.
Let’s define some more key terms:
- Endpoint: These are the ends of an inter-process communication. For example in a client-server communication the two endpoints would be the client and the server.
- Message: This represents a message being exchanged between systems.
- Processor: Processors are used to manipulate and mediate messages between Endpoints.
- Routes: A route is the passage of a message through a Camel application, from an input to a destination (if there is one).
- Components: Components are how Camel connects to systems. For example, we will have a component for Kinesis, and one for REST.
- Camel Context: This represents the runtime system. Normally you would create your camel context, add endpoints, components and routes, then start the context running, stopping it when the application terminates.
Although this may still seem slightly opaque, the worked example will help answer any questions.
Spring Cloud Stream
Spring Cloud Stream is the part of the Spring ecosystem responsible for connecting with messaging systems. It supports publisher/ subscriber semantics and consumer groups, both of which we employ in our worked example.
It centres on ‘Binder Implementations’, which are provided for each of the systems we may want to integrate with (Rabbit MQ, Kafka, Kinesis…). The in turn comprise of the following:
- Destination Binders: These are components for integrating the external messaging systems.
- Destination Bindings: These bridge the application code (producer/consumer) provided by the end user and the external messaging system.
- Message: This is the data structure used by producers and consumers to communicate with other applications via the destination binders.
However, in reality most of this is abstracted away, and in our example we will see how easy it is to set up.
In the worked example we are going to develop the below system. I will roughly outline how things fit together, but all of the code can be found in the repository here.
First of all we need to create the two Java Spring applications. One will be used to process the Tweets from the API using Camel, and put them onto the initial data stream (the producer). The other will be used to pull from the data analytics stream using Spring Cloud Stream, and print them (the consumer).
In our producer component we will poll the Twitter API. To get access to the API we need to set up a Twitter Developer account. Once this is done we will be given a bearer token which will allow us to hit the endpoints.
We will also set up some Kinesis applications, first the two data streams, then the analytics application. Setting up a data stream is very straightforward, in fact in the AWS console there is only one screen.
Hit create and we’ve set up our first Kinesis component! We will need two data streams
Now let’s make a Kinesis analytics application, whose setup is slightly more involved.
Initially we specify the name and a description. We also select Apache Flink as our runtime language. We will be using this to do our analytics.
From there we create a new IAM role to assign to the application, and choose the development set of application settings.
We now have all the Kinesis applications we need!
Finally we need to create an S3 bucket in order to store the .jar for our Data Analytics application. All of the default settings can remain.
There’s a little bit too much code in our example to cover in detail, so we’ll focus on the core components, using links to the relevant files.
The first thing we look at is the route. Here we can see every 20 seconds we poll the API for Tweets to the BBC account. Once we have received them we pass them to a processor, which will send them to our first Kinesis Data Stream.
The processor is used to deserialize the response from Twitter and pass to the Kinesis service, which contains the code for putting some data on Kinesis! It uses a preconfigured client, takes the message, converts it into bytes and puts it on the stream. Easy!
That’s our producer component checked off, let’s have a look at the analytics application. Again, concentrating on the main class we see it defines a source and a sink, linking the two together.
So far this seems quite familiar! However, we now need to do a little bit of work to get all of our components to play nicely together, a useful document is found here. We need to upload our Flink code to the S3 bucket and give our Data Analytics application access. This is done through the IAM role we implicitly created in an earlier step.
Finally we write our consumer, which uses Spring Cloud Streams to ingest the messages from the output Data Stream. There is fairly minimal configuration needed, and only a single bean to wire!
Let’s say the producer is running on our local machine, we would see the logs below:
You can see we have retrieved the Tweets, deserialized them to an object and put them on the Kinesis Data Stream.
Now let’s have a look at what’s going on in the Data Analytics Application. For this we can use the Apache Flink dashboard provided by Kinesis Data Analytics.
Opening up the console we have a screen outlining the shape of the application:
From the dashboard we have a source going into a sink, just as we expected.
Finally we look at our consumer application logs from polling the other side:
We are successfully reading them off the output stream!
In conclusion we have introduced AWS Kinesis, Apache Camel and Spring Cloud Streams, bringing them together in a worked example employing the Twitter API.