Lucas Brutschy created KAFKA-18986: -------------------------------------- Summary: Properly handle configured topology soft-state using KIP-1101 Key: KAFKA-18986 URL: https://issues.apache.org/jira/browse/KAFKA-18986 Project: Kafka Issue Type: Sub-task Reporter: Lucas Brutschy
In KIP-1101, the number of input topics and some internal topics is left parametric, so that a single application can be deployed with varying number of partitions. The process of turning a 'parametric' topology into a topology where the number of partitions is defined for all topics and the presence of all internal topics is validated is called "configuring the topology". The result of configuring the topology is either a configured topology, or a validation error with potentially a set of internal topics that need to be created. The configured topology in streams groups is a result of taking the "partition metadata" (current state of all the topics involved in a topology) and the topology, both of which are persisted in the consumer offset topic and can be restored on fail-over. However, the configured topology itself is not stored (since it can be derived). We still want to keep it in memory, to avoid recomputing it all the time. Right now, during a heartbeat RPC that either detects a change in topic metadata or initializes a topology, we configure the topology and store the topology as "soft state". Also, when partition metadata / topology records are replayed from the offset topic, the configured topology will be derived. This is improper for the following reasons - When a heartbeat is handled and the configured topology is initialized, it may be that the write operations of topology and partition metadata records fails, which means that we'll end up in an inconsistent state, with an outdated topology / partition metadata but a new configured topology. - It can happen that the topology is configured several times - in a heartbeat handler, and then again when replaying the new records from the consumer offset topic - The topology configuration is a non-trivial operation, both in time complexity and generally in the amount of logic that is being executed. Ideally, we should not execute this on the replay-path. To solve these problems, the idea would be to correct key the configured topology with a hash of the partition metadata and a topology epoch or topology hash. That means 1. When a heartbeat handler initializes a new topology or detects changed partition metadata, the computes new partition metadata hash (if necessary) and we store the configured topology as soft state in the streams group in-memory representation together, keyed by the topology epoch and the partition metadata hash. 2. On the replay path, the configured topology is not touched at all. 3. When using the configured topology, both inside the describe-handler or the heartbeat-handler, we can compare the topology epoch and the partition metadata hash, to check if the current configured topology (soft state) matches the partition metadata and topology epoch (hard state). If not, we can configure the topology. Fortunately, KIP-1101 introduces the means to generate partition metadata hashes, and proposes saving only the hash to the consumer offset topic. We can facilitate these changes to, rather trivially, fix the handling of configured topologies in streams groups. -- This message was sent by Atlassian Jira (v8.20.10#820010)