[ https://issues.apache.org/jira/browse/KAFKA-18986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Brutschy updated KAFKA-18986: ----------------------------------- Component/s: streams > Properly handle configured topology soft-state using KIP-1101 > -------------------------------------------------------------- > > Key: KAFKA-18986 > URL: https://issues.apache.org/jira/browse/KAFKA-18986 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Lucas Brutschy > Priority: Major > > In the topology metadata sent from the client to the broker (similar to > subscribe in consumer groups) in KIP-1071, the number of input topics and > some internal topics is left parametric, so that a single application can be > deployed with varying number of partitions. The process of turning a > 'parametric' topology into a topology where the number of partitions is > defined for all topics and the presence of all internal topics is validated > is called "configuring the topology". The result of configuring the topology > is either a configured topology, or a validation error with potentially a set > of internal topics that need to be created, or a subset of partitions that > are not co-partitioned but must be (e.g. because they are used in a join). > The configured topology in streams groups is derived from the "partition > metadata" (current state of all the topics involved in a topology) and the > topology metadata sent by the client, both of which are persisted in the > consumer offset topic and can be restored on fail-over. However, the > configured topology itself is not stored (since it can be derived). We still > want to keep it in memory, to avoid recomputing it all the time. Right now, > during a heartbeat RPC that either detects a change in partition metadata or > initializes a topology, we configure the topology and store the configured > topology as "soft state". Also, when partition metadata / topology records > are replayed from the offset topic, the configured topology will be computed. > This is improper for the following reasons > 1. When a heartbeat is handled and the configured topology is initialized, > it may be that the write operations of topology and partition metadata > records fails, which means that we'll end up in an inconsistent state, with > an outdated topology / partition metadata but a new configured topology. > 2. It can happen that the topology is configured several times - in a > heartbeat handler, and then again when replaying the new records from the > consumer offset topic > 3. The topology configuration is a non-trivial operation, both in time > complexity and generally in the amount of logic that is being executed. > Ideally, we should not execute this on the replay-path. > To solve these problems, the idea would be to correct key the configured > topology with a hash of the partition metadata and a topology epoch or > topology hash. That means > 1. When a heartbeat handler initializes a new topology or detects changed > partition metadata, the computes new partition metadata hash (if necessary) > and we store the configured topology as soft state in the streams group > in-memory representation together, keyed by the topology epoch and the > partition metadata hash. > 2. On the replay path, the configured topology is not touched at all. > 3. When using the configured topology, both inside the describe-handler or > the heartbeat-handler, we can compare the topology epoch and the partition > metadata hash, to check if the current configured topology (soft state) > matches the partition metadata and topology epoch (hard state). If not, we > can configure the topology. > Fortunately, KIP-1101 introduces the means to generate partition metadata > hashes, and proposes saving only the hash to the consumer offset topic. We > can facilitate these changes to, rather trivially, fix the handling of > configured topologies in streams groups. -- This message was sent by Atlassian Jira (v8.20.10#820010)