How Druid Kafka Indexing Service works - a detailed explanation

Kafka Indexing Service (KIS) is an extension for Druid that enables both real-time and historical data ingestion from Kafka topics with an "exactly-once" guarantee.

It is meant to replace the old Druid Real-Time nodes as a more reliable and flexible solution. As opposed to the Real-Time node mechanism or the new Tranquility feature, KIS is also able to read non-recent events from Kafka and is not subject to the window period considerations imposed on other ingestion mechanisms.

Kafka Ingestion Service uses Overlord and Middlemanager for its ingestion and indexing tasks.

Below, we explain what a detailed data flow is (from ingestion to querying) when using the Kafka Indexing Service.

  1. The KIS task reads an event stream from Kafka and creates a set of partitions stored in memory. The number of created partitions equals the number of Kafka partitions * segmentGranularity interval.

  2. These partitions are saved to disk when maxRowsInMemory is reached. Bear in mind that all of this data (both in-memory and saved on disks) is available for query requests.

  3. When taskDuration is reached, the KIS task stops reading from Kafka and merges the spilled partitions together into a final set of partitions.

  4. This final set of partitions is moved into deep storage (S3, HDFS, NFS) and the segment descriptors are written into the metadata table. The task waits for handoff.

  5. To this moment those segments are not available to query via Historical nodes, because they're not there yet. So the querying requests for this data are handled by Middlemanagers.

  6. One of the Coordinator tasks is to periodically check the metadata database for new, unserved segments, and when it detects these segments, it instructs the historical nodes to load the segments according to a cost balancing algorithm.

  7. The Historical nodes pull the segments from deep storage into their local segment cache, which are then loaded into Historical server memory. The name local segment cache can be misleading, as both Historical and Broker nodes have their own, separate cache. So the term local segment cache means all segments available for querying that are stored on Historical nodes.

  8. Once the Historical nodes have loaded all the segments, the indexing task is notified that it can stop serving queries for the segments it generated since the Historicals will now take over that responsibility. It's now done and the process terminates.

Check out what Deep.BI is all about!