Kafka 101 Series - Part 2: Stream Processing and Kafka Streams API
In the previous blogpost, streaming data was briefly defined as unbounded data flow coming continuously from numerous data sources. Unlike bounded data which has fixed starting and ending boundaries and hence it is easier to store and process the data as a whole, streaming data is infinite. In that sense, streaming data can capture better the flow of events in the real world for its characteristics of being continuous and boundless.
Nevertheless, it is very challenging to effectively process and analyze streaming data. Since data can come from multiple sources at varying transmission rates, the records can arrive late and be out-of-order in regard to event-time. For example, we want to capture scores in a mobile gaming app to calculate a high-score table of players within a specific time window. However, the devices of some players may be offline while they are playing and their scores are only published when they get online again. As a result, these late-arriving scores are eventually not added in the desired time window. For certain processing logic, the uncertainty that whether all necessary data has arrived or not could be a huge problem. Moreover, streaming data often needs to be processed upon arriving and to meet some real-time requirements to provide timely and continuous results as well. Therefore, care should be taken when choosing the processing method for such streaming data.
It is possible to handle streaming data using batch processing. In this way, the stream is divided into smaller bounded batches, each of which can be processed sequentially and independently from the others. Nevertheless, this approach discloses several drawbacks. First of all, because the segregation of batches is done in fixed size, the problem of records necessary for processing being scattered across two or more batches emerges. Considering one common example, activities in a web session of a user cannot be inspected adequately because the records are lying on two different batches. Moreover, the system always has to wait for a full batch either by waiting for time elapsed or until number of records reaches a threshold value before taking action. Thus, batch processing introduces delay into the response of the system to events. Although this delay can be reduced by using micro-batch with very small size, there are still certain performance factors limiting such size and the fact that records are not processed the moment they arrive at the system can be problematic for certain use cases.
The programming paradigm of stream processing, unlike the approach of batch processing, aims at handling data as it arrives in a continuous manner. Hence, it can provide insights into the events enclosed within the data as they happen. As a result, stream processing can fit better to real-time applications where prompt reaction to events is a vital key. Moreover, when there is no boundary for the processed data, the analyzing operations can be more flexible. However, as stated in the previous section, processing data in unbounded manner means that you never know when the data will come. Operation relying on the availability of data like outer join might encounter some challenges. This problem can be tackled by the approach of windowing in which the boundary of data to be processed is explicitly defined for certain types of operations. Windowing is also useful when it is more needful to analyze streaming data in different periods of time separately rather than the entire history of events.
Stream processing can either be stateless or stateful. With stateless processing, a record can be processed independently without the need of additional information from preceding records. One example of stateless stream processing is filtering out a specific type of event from the data stream to use it to trigger some responding actions. In this situation, the processor does not have to maintain any internal state, it only reads an incoming event, checks whether the event matches some criteria and either drops it or forwards it to downstream processing. On the other hand, in stateful processing, the tasks involve using information about the state cumulated from other processed records. Therefore, there have to be means of persisting and querying the state for the stream processor. There are numerous stateful processing operators. For instance, counting the number of data records with the same attribute value, aggregating data such as moving average, finding pre-determined pattern on the data stream.
Local State in Stream Processing
Stateful processing is only possible when there are some means of persisting current state for the processor. The processor has to ingest input streams, query corresponding current states and combine them together to yield new stateful results. The key factor regarding state store that can greatly enhance the performance of stream processor is local state store.
The idea is quite straightforward. Instead of storing states in some external database for querying later, the states are kept locally on the machine doing the processing. In case a stream processing application is deployed as multiple instances, each instance will keep a subset of the entire states. For fault-tolerance in case of machine breaking down, changes in the states are also periodically sent to a changelog which could be a Kafka topic. In that way, this changelog can be read by other machines or the broken-down machine itself to obtain the current state and continue the unfinished work.
This local state has a number of advantages. First of all, accessing state locally even from the disk is much faster than doing remote call to a database over the network. Therefore, a higher throughput of processed records and much lower latency can be achieved. Secondly, since each processor maintains its own state locally, state access on one machine will not have effect on others. In particular, consistency is preserved; once a value has been written, it is guaranteed that each subsequent request will return this value. Moreover, race condition is also avoided since the state is accessed only from a single thread. On the other hand, there are also drawbacks that should be taken into consideration with this approach. As the states get larger, the restoration time for them gets longer. Moreover, by introducing state into the application, the operation can become more sophisticated than in the case of stateless application.
Use Cases of Stream Processing
Three typical use cases of stream processing are Data Analytics, Event-Driven and Data Pipeline. In these applications, data comes continuously into the system via streams as the data-backbones and needs to be processed fast to provide prompt results or to not disrupt the flow of data. As a result, stream processing is a good option in these cases.
- Data Analytics: traditional data analyzing is often done using batch processing. However, with the increasing demand for real-time insight into raw data for prompt reaction, stream processing becomes a better choice.
- Event-Driven: in these applications, services and components communicate with each other via events. These events are continuously produced by users or the system and ingested by different listeners in the application to trigger responding actions.
- Data Pipeline: stream processing is fitting here since it can transform and move data between systems continuously. As a result, the delay introduced into the data flow is minimized.
Kafka Streams API
Since Apache Kafka v0.10, the Kafka Streams API was introduced providing a library to write stream processing clients that are fully compatible with Kafka data pipeline. Compared with other stream processing frameworks, Kafka Streams API is only a light-weight Java library built on top of Kafka Producer and Consumer APIs. As a result, it is very easy to develop and deploy stream processing instances as normal Java applications without the need of any special configuration or set-up. Scaling is also easily done with Kafka Streams by simply starting other instances of the application with the same application ID. Moreover, Kafka Streams also inherits the concept of partitions from Kafka log structure and hence can support parallel computing on existing Kafka topics.
The first feature that makes Kafka Streams suitable for stream processing is its semantics of time. The event-time when something occurs and processing-time when the record of that event is processed is clearly distinguished in the concept of Kafka Streams. Depending on the use cases, different semantics of time can be used in stream processing logic. Local state store is supported by Kafka Streams as well for stateful operations. The persisted states can be easily queried locally using Interactive Queries. Interactive Queries also supports discovering remote state store in other running instances of the application. Based on that, developers have the flexibility to build a custom RPC layer to query the full state of the application.
Kafka Streams provides two different ways to implement the processing logic. The first way is to use low level Processor API from Kafka Streams to implement custom processors and have more control over processing and optimization. The second method is utilizing Streams Domain Specific Language (Streams DSL) which is based upon Processor API. Streams DSL provides many common operators for stream transformation such as mapping, filtering, aggregating, and joining which are ready to be used. Although it may limit the flexibility when defining the stream processing logic, it is very convenient in terms of reducing implementation time and effort.
To see how stream processors can be implemented with Kafka Streams, let’s consider a simple example. We want to construct a stream processing application to count the number of times a keyword is tweeted as hashtag to determine the current trend on Twitter. In this sample use case, tweets are fed as stream of records into a Kafka topic named “tweets” from Twitter with a Kafka connector. After that, with our stream processing application, each of these tweets is mapped to tuples of hashtagged keywords as keys. The tuples with the same keyword are grouped together and reduced into a record of their aggregated count. These counts are then published to another topic called “keyword-count” for other processors to listen to and do further analysis. In the diagram below, for better visualization and interpretation, the streaming application is depicted as two stages with separate nodes, namely, ‘map tweets to keywords’ and ‘group and count’. Nevertheless, keep in mind that nodes with same instance number are parts of a single application instance.
With Streams DSL from Kafka Streams, it is quite easy to implement the stream processing application for the map and reduce tasks by first configuring necessary properties and then building up the stream processing topology with StreamsBuilder.
Properties streamProps = new Properties(); //Configure properties of the processor
StreamsBuilder builder = new StreamsBuilder(); //StreamsBuilder is used to construct the stream topology when using Streams DSL
KStream<String, String> source = builder.stream("tweets"); //Ingest records from Kafka topic "tweets"
source.flatMap((key,value) -> extractKeywords(value)) //extractKeywords() is a function to read in a tweet and return a list of tuples <key,value> with keys are hashtagged keywords in the tweet
.groupByKey() //Group the records by their keys which are the hashtagged keywords
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("hashtag-count")) //Count the number of records in each group of hashtag and store it in the local state store "hashtag-count"
.toStream() //The result of count operator is a KTable, it must be converted to a stream before sending to Kafka topic
.to("keyword-count", Produced.with(Serdes.String(), Serdes.Long())); //Publish the result to Kafka topic "keyword-count"
Topology streamTopology = builder.build();
// Create and initialize the stream processor
KafkaStreams stream = new KafkaStreams(streamTopology, streamProps);
In the implementation, the flatMap processor is used which causes changes of the keys of records. As a result, Kafka Streams creates an internal topic hashtag-count-repartition to make sure that records with the same key will be in the same partition. The next processor which is groupByKey will read and operate on records from this internal topic. This repartition when keys of records are changed, which is transparent to developers, helps ensure that the stream processing application will still produce right results when scaling up the application with multiple instances.
In the code snippet above, the number of times each keyword is tweeted is counted for the entire stream for simplicity. However, in a real use case, it could me more useful to analyze the trend of keywords for a specific period of time such as over the last hour which can be achieved with windowing. Also note that the count() operator is stateful. Therefore, it can be seen from the diagram that each application instance has a separate local store to persist the current counts of the keywords it processes. Once the stream processor starts and writes to the local state store, the state can be queried locally from the same stream processing instance with ease using Interactive Queries.
//Get the state store "hashtag-count"
ReadOnlyKeyValueStore<String, Long> keyValueStore =
Long count = keyValueStore.get("Kafka"); //Get the current count of the keyword "Kafka"
However, it is important to remember that when starting multiple instances of the stream processor for horizontal scaling as in the diagram, each instance will only process a subset of the entire list of keywords. The code snippet above therefore can only query the counts of keywords that this instance manages. In case the complete list of keyword counts is required, all running instances of the processor in the network need to be discovered using Interactive Queries and then you must implement a way to communicate and query from the remote instances. This topic of local state store and full state query using Interactive Queries is actually very important and useful when using Kafka Streams and therefore could be discussed in more detail in another blogpost. One more point to notice is that, by publishing the keyword counts to another Kafka topic, other stream processors can simply be plugged in and start consume the entire list of counts from the topic without having to know how the records are produced or how many instances are involved in the processing of records. With stream as the data-backbone, the services in our application can be better decoupled and multiple listeners can consume the same topic without interfering each other.
Moreover, Kafka Streams also comes with several other robust features which are not discussed here such as processing guarantee of exactly-once semantics, handling of out-of-order records in terms of event time, supporting operations on streams and tables.
So, that’s it! That are some basic ideas about streaming data, stream processing as well as some fundamental features of Kafka Streams API. In the next blogpost, we will get our hands on implementing a simple stream-based application using Kafka broker and Kafka Streams API.