[ 
https://issues.apache.org/jira/browse/KAFKA-18986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-18986:
-----------------------------------
    Component/s: streams

> Properly handle configured topology soft-state using KIP-1101 
> --------------------------------------------------------------
>
>                 Key: KAFKA-18986
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18986
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Lucas Brutschy
>            Priority: Major
>
> In the topology metadata sent from the client to the broker (similar to 
> subscribe in consumer groups) in KIP-1071, the number of input topics and 
> some internal topics is left parametric, so that a single application can be 
> deployed with varying number of partitions. The process of turning a 
> 'parametric' topology into a topology where the number of partitions is 
> defined for all topics and the presence of all internal topics is validated 
> is called "configuring the topology". The result of configuring the topology 
> is either a configured topology, or a validation error with potentially a set 
> of internal topics that need to be created, or a subset of partitions that 
> are not co-partitioned but must be (e.g. because they are used in a join).
> The configured topology in streams groups is derived from the "partition 
> metadata" (current state of all the topics involved in a topology) and the 
> topology metadata sent by the client, both of which are persisted in the 
> consumer offset topic and can be restored on fail-over. However, the 
> configured topology itself is not stored (since it can be derived). We still 
> want to keep it in memory, to avoid recomputing it all the time. Right now, 
> during a heartbeat RPC that either detects a change in partition metadata or 
> initializes a topology, we configure the topology and store the configured 
> topology as "soft state". Also, when partition metadata / topology records 
> are replayed from the offset topic, the configured topology will be computed.
> This is improper for the following reasons 
>  1. When a heartbeat is handled and the configured topology is initialized, 
> it may be that the write operations of topology and partition metadata 
> records fails, which means that we'll end up in an inconsistent state, with 
> an outdated topology / partition metadata but a new configured topology.
> 2. It can happen that the topology is configured several times - in a 
> heartbeat handler, and then again when replaying the new records from the 
> consumer offset topic
>  3. The topology configuration is a non-trivial operation, both in time 
> complexity and generally in the amount of logic that is being executed. 
> Ideally, we should not execute this on the replay-path.
> To solve these problems, the idea would be to correct key the configured 
> topology with a hash of the partition metadata and a topology epoch or 
> topology hash. That means
>  1. When a heartbeat handler initializes a new topology or detects changed 
> partition metadata, the computes new partition metadata hash (if necessary) 
> and we store the configured topology as soft state in the streams group 
> in-memory representation together, keyed by the topology epoch and the 
> partition metadata hash.
>  2. On the replay path, the configured topology is not touched at all.
>  3. When using the configured topology, both inside the describe-handler or 
> the heartbeat-handler, we can compare the topology epoch and the partition 
> metadata hash, to check if the current configured topology (soft state) 
> matches the partition metadata and topology epoch (hard state). If not, we 
> can configure the topology.
> Fortunately, KIP-1101 introduces the means to generate partition metadata 
> hashes, and proposes saving only the hash to the consumer offset topic. We 
> can facilitate these changes to, rather trivially, fix the handling of 
> configured topologies in streams groups. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to