How events are stored on different Event-Stream-Processing platforms
Before jumping into event storage, if you are not already familiar with the 3 platforms mentioned here, namely, Apache Kafka, Apache Pulsar and NATS Streaming, you are welcome to read the first blogpost in the series which provides a general overview of these platforms as well as Event Stream Processing in general.
A Kafka broker has two main responsibility: serving requests from clients and persisting events published by clients to its disk. Records on a Kafka partition are written sequentially to a number of log files. Thanks to this append-only write, only sequential I/O is required on the disk of the broker which helps Kafka have a great performance even on cheap rotating media. One more factor which contributes to the high performance of Kafka is that a Kafka broker can already confirms the persistence of an event to client when the message is written to I/O buffer instead of being safely flushed to the disk. However, this brings about the possibility of message loss when a broker crashes before the message is flushed to disk. To overcome this, Kafka relies on the redundant write on multiple broker instances. A Kafka partition can be configured to be replicated on multiple brokers for fault-tolerance. Each replica of a partition resides completely on the disk of a broker in the Kafka cluster. Even when these brokers all confirm the persistence of a message before flushing it to their disks, broker failure can be tolerated without message loss given that not all brokers with the copies fail simultaneously.
However, enabling data replication on the broker alone cannot guarantee the reliability of event storage on Kafka. By default, after publishing a message to Kafka, a Kafka producer only waits for the acknowledgement about successful write from the leader partition which may cause message loss when the broker with leader partition fails before the message is replicated to other brokers. Therefore, the producer must be strictly configured to wait for acknowledgements from the followers as well. Followers of a partition must regularly send fetching requests to the leader to stay up-to-date with the latest messages. If a follower is lagged too much behind the leader, it will increase the waiting time of client. To mitigate this, the leader of a partition maintains a list of in-sync replicas (ISR) which contains itself and other followers which currently stay in-sync with it. When a Kafka producer is instructed to wait for acknowledgements from both leader and followers of a partition, it actually only waits for acknowledgements from the partitions in the ISR list. This list can shrink when some slow followers are kicked out of the list or expand when the slow follower has fully caught up with the leader and rejoins the list. As a result, it is possible that the list can shrink down to contain only the leader. In these cases, the redundant acknowledgements cannot be guaranteed. Therefore, to ensure the persistence of a message, it is essential to set a minimum number of ISR for a Kafka topic. When the ISR falls below this number, new request from clients will simply be rejected by Kafka.
Theoretically, the persistence layer of Kafka can be scaled infinitely. When a new broker is added to the cluster, it will be automatically considered to retain a replica of a newly created partition. In the figure below, when broker 4 is added to the cluster, it is chosen to be the leader of partition 4. However, for partitions 1, 2 and 3 which are created before broker 4 is added to the cluster, their replicas is not automatically offloaded to broker 4 to balance the load in the cluster. Moreover, with each replica must reside completely on the disk of a broker in the cluster, when the numbers of messages are uneven among the partitions, the load will also unevenly distributed in the cluster. Kafka provides tools to re-balance the load in the cluster. However, this must be conducted manually by users and requires careful planning before execution.
Pulsar uses BookKeeper as its persistence layer for event storage. When a Pulsar broker receives a request from client, it will reach to the BookKeeper cluster to persist or obtain the message. BookKeeper provides the storage abstraction called ledger which is internally stored on the disk of a BookKeeper node (Bookie) as log files. A Pulsar topic is made from multiple BookKeeper ledgers. A ledger can be further made up from multiple fragments.
For each BookKeeper ledger, three configuration options are required:
- Ensemble size (E): this is the number of Bookies which will be selected randomly from the BookKeeper cluster to be considered for writing and replicating messages on this ledger. Whenever some Bookie in this ensemble fails, a new fragment is created for the ledger with a new ensemble without the failed Bookie.
- Write quorum size (Qw): this is the replication factor of messages in the ledger. For each message, Qw Bookies will be selected from the ensemble to persist the message on their disks.
- Acknowledge quorum size (Qa): Whenever a Bookie has safely persisted a message, it will send back an acknowledgement. The Pulsar broker only confirms the successful write to client after it has received Qa acknowledgements from the BookKeeper cluster.
By tweaking these three numbers, users can adjust the message replication and acknowledgement quorum to control the trade-off between reliability and waiting time of clients.
Moreover, because a Pulsar topic is only a logical abstraction on top of the persistence layer provided by BookKeeper, unlike Kafka, messages on a Pulsar topic can be physically spread across multiple Bookies. This means that the load on each topic is distributed more evenly in the BookKeeper cluster. This also helps ease the scaling of the persistence layer of Pulsar.
Whenever a new Bookie is added to the BookKeeper cluster, it will automatically be considered when choosing the ensemble for new fragments of either existing topic or newly created topic. Moreover, if a Bookie node dies and leaves some fragment under-replicated, BookKeeper provides an auto-recovery feature to detect failed nodes and re-replicate messages on this node to other Bookies in the cluster. This helps ensure that the write quorum is maintained. BookKeeper can tolerate up to Qa – 1 failed Bookies and guarantee that a message can still be re-replicated to meets the write quorum. Compared to Kafka, the persistence layer of Pulsar is much flexible and more resilient to changes. The cluster can be scaled up or down at any time and the internal mechanism of BookKeeper ensures that the load is evenly distributed and each message is always well replicated for fault-tolerance. More importantly, this process is completely automatic and requires minimum manual handling from users.
For event storage, NATS Streaming provides a pluggable persistence layer which is separated from the request-serving layer. Users can choose to persist message in the filesystem in the disk mounted on the server instance or in a relational database which are referred as file store and SQL store respectively in the NATS document. With file store, a directory is created for each NATS channel and messages on that channels are stored in a number of log files in the directory. With the SQL store option, all messages published to NATS are persisted in a message table and each of them is uniquely identified by an incremental sequence ID and the ID of the channel.
Regarding the fault-tolerance of the persistence layer, NATS Streaming has two different operational modes which are corresponding to different levels of support for fault-tolerance. The first mode is fault-tolerance mode. In this mode, a NATS Streaming cluster can have multiple server instances but they are all mounted to the same data store. This mode only aims at fault-tolerance on the processing layer. More specifically, at any time, only one server instance can obtain the exclusive lock on the data store and serve requests from client. When this active instance goes down, the lock is passed to one of the other instances. However, on the persistence layer in this mode, data replication for fault-tolerance is not supported by NATS Streaming. The data store shared by all server instances can become the single point of failure. With this mode, it is the responsibility of users to manually replicate data if necessary.
The second operational mode of NATS Streaming is clustering mode. In this mode, each server instance in the NATS cluster has a separated data store and Raft consensus algorithm is used to replicated data among these data stores. One server instance is elected as the Raft leader and this leader serves all requests from clients as well as replicates every message published to NATS on the data store of all server instances in the cluster. With the Raft algorithm and its majority vote approach, a cluster of 2n – 1 server instances can tolerate up to n – 1 failed instances without message loss. However, in this mode, each server instance retains the full copy of all messages published to the NATS cluster. Therefore, it is not possible to expand the capacity of the persistence layer of the cluster by simply adding more server instances.
Scalability of the event storage is not supported by NATS Streaming. It must be handled by users depending on which type of pluggable storage is selected. For instance, for file store, a fully managed network filesystem such as Amazon Elastic File System can be employed. When SQL store, the scalability depends on the database technology.
After this blog, we have now covered the first ESP functionality which is event storage on three platforms Apache Kafka, Apache Pulsar and NATS Streaming. All three platforms support durable storage with data replication for fault-tolerance. However, it can be seen that the persistence layer of Pulsar is most prominent with great flexibility and scalability. The storage layer of Kafka is also greatly scalable but at some points could require manual handling to balance the load. On the other hand with NATS Streaming, since the persistence layer is pluggable, NATS does not provide much support for scalability and it depends on users to choose the appropriate data store and the right configuration.