[ 
https://issues.apache.org/jira/browse/KAFKA-18986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-18986:
-----------------------------------
    Description: 
In KIP-1071, the number of input topics and some internal topics is left 
parametric, so that a single application can be deployed with varying number of 
partitions. The process of turning a 'parametric' topology into a topology 
where the number of partitions is defined for all topics and the presence of 
all internal topics is validated is called "configuring the topology". The 
result of configuring the topology is either a configured topology, or a 
validation error with potentially a set of internal topics that need to be 
created.

The configured topology in streams groups is a result of taking the "partition 
metadata" (current state of all the topics involved in a topology) and the 
topology, both of which are persisted in the consumer offset topic and can be 
restored on fail-over. However, the configured topology itself is not stored 
(since it can be derived). We still want to keep it in memory, to avoid 
recomputing it all the time. Right now, during a heartbeat RPC that either 
detects a change in topic metadata or initializes a topology, we configure the 
topology and store the topology as "soft state". Also, when partition metadata 
/ topology records are replayed from the offset topic, the configured topology 
will be derived.

This is improper for the following reasons 

 - When a heartbeat is handled and the configured topology is initialized, it 
may be that the write operations of topology and partition metadata records 
fails, which means that we'll end up in an inconsistent state, with an outdated 
topology / partition metadata but a new configured topology.

 - It can happen that the topology is configured several times - in a heartbeat 
handler, and then again when replaying the new records from the consumer offset 
topic

 - The topology configuration is a non-trivial operation, both in time 
complexity and generally in the amount of logic that is being executed. 
Ideally, we should not execute this on the replay-path.

To solve these problems, the idea would be to correct key the configured 
topology with a hash of the partition metadata and a topology epoch or topology 
hash. That means

 1. When a heartbeat handler initializes a new topology or detects changed 
partition metadata, the computes new partition metadata hash (if necessary) and 
we store the configured topology as soft state in the streams group in-memory 
representation together, keyed by the topology epoch and the partition metadata 
hash.

 2. On the replay path, the configured topology is not touched at all.

 3. When using the configured topology, both inside the describe-handler or the 
heartbeat-handler, we can compare the topology epoch and the partition metadata 
hash, to check if the current configured topology (soft state) matches the 
partition metadata and topology epoch (hard state). If not, we can configure 
the topology.

Fortunately, KIP-1101 introduces the means to generate partition metadata 
hashes, and proposes saving only the hash to the consumer offset topic. We can 
facilitate these changes to, rather trivially, fix the handling of configured 
topologies in streams groups. 

  was:
In KIP-1101, the number of input topics and some internal topics is left 
parametric, so that a single application can be deployed with varying number of 
partitions. The process of turning a 'parametric' topology into a topology 
where the number of partitions is defined for all topics and the presence of 
all internal topics is validated is called "configuring the topology". The 
result of configuring the topology is either a configured topology, or a 
validation error with potentially a set of internal topics that need to be 
created.

The configured topology in streams groups is a result of taking the "partition 
metadata" (current state of all the topics involved in a topology) and the 
topology, both of which are persisted in the consumer offset topic and can be 
restored on fail-over. However, the configured topology itself is not stored 
(since it can be derived). We still want to keep it in memory, to avoid 
recomputing it all the time. Right now, during a heartbeat RPC that either 
detects a change in topic metadata or initializes a topology, we configure the 
topology and store the topology as "soft state". Also, when partition metadata 
/ topology records are replayed from the offset topic, the configured topology 
will be derived.

This is improper for the following reasons 

 - When a heartbeat is handled and the configured topology is initialized, it 
may be that the write operations of topology and partition metadata records 
fails, which means that we'll end up in an inconsistent state, with an outdated 
topology / partition metadata but a new configured topology.

 - It can happen that the topology is configured several times - in a heartbeat 
handler, and then again when replaying the new records from the consumer offset 
topic

 - The topology configuration is a non-trivial operation, both in time 
complexity and generally in the amount of logic that is being executed. 
Ideally, we should not execute this on the replay-path.

To solve these problems, the idea would be to correct key the configured 
topology with a hash of the partition metadata and a topology epoch or topology 
hash. That means

 1. When a heartbeat handler initializes a new topology or detects changed 
partition metadata, the computes new partition metadata hash (if necessary) and 
we store the configured topology as soft state in the streams group in-memory 
representation together, keyed by the topology epoch and the partition metadata 
hash.

 2. On the replay path, the configured topology is not touched at all.

 3. When using the configured topology, both inside the describe-handler or the 
heartbeat-handler, we can compare the topology epoch and the partition metadata 
hash, to check if the current configured topology (soft state) matches the 
partition metadata and topology epoch (hard state). If not, we can configure 
the topology.

Fortunately, KIP-1101 introduces the means to generate partition metadata 
hashes, and proposes saving only the hash to the consumer offset topic. We can 
facilitate these changes to, rather trivially, fix the handling of configured 
topologies in streams groups. 


> Properly handle configured topology soft-state using KIP-1101 
> --------------------------------------------------------------
>
>                 Key: KAFKA-18986
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18986
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Lucas Brutschy
>            Priority: Major
>
> In KIP-1071, the number of input topics and some internal topics is left 
> parametric, so that a single application can be deployed with varying number 
> of partitions. The process of turning a 'parametric' topology into a topology 
> where the number of partitions is defined for all topics and the presence of 
> all internal topics is validated is called "configuring the topology". The 
> result of configuring the topology is either a configured topology, or a 
> validation error with potentially a set of internal topics that need to be 
> created.
> The configured topology in streams groups is a result of taking the 
> "partition metadata" (current state of all the topics involved in a topology) 
> and the topology, both of which are persisted in the consumer offset topic 
> and can be restored on fail-over. However, the configured topology itself is 
> not stored (since it can be derived). We still want to keep it in memory, to 
> avoid recomputing it all the time. Right now, during a heartbeat RPC that 
> either detects a change in topic metadata or initializes a topology, we 
> configure the topology and store the topology as "soft state". Also, when 
> partition metadata / topology records are replayed from the offset topic, the 
> configured topology will be derived.
> This is improper for the following reasons 
>  - When a heartbeat is handled and the configured topology is initialized, it 
> may be that the write operations of topology and partition metadata records 
> fails, which means that we'll end up in an inconsistent state, with an 
> outdated topology / partition metadata but a new configured topology.
>  - It can happen that the topology is configured several times - in a 
> heartbeat handler, and then again when replaying the new records from the 
> consumer offset topic
>  - The topology configuration is a non-trivial operation, both in time 
> complexity and generally in the amount of logic that is being executed. 
> Ideally, we should not execute this on the replay-path.
> To solve these problems, the idea would be to correct key the configured 
> topology with a hash of the partition metadata and a topology epoch or 
> topology hash. That means
>  1. When a heartbeat handler initializes a new topology or detects changed 
> partition metadata, the computes new partition metadata hash (if necessary) 
> and we store the configured topology as soft state in the streams group 
> in-memory representation together, keyed by the topology epoch and the 
> partition metadata hash.
>  2. On the replay path, the configured topology is not touched at all.
>  3. When using the configured topology, both inside the describe-handler or 
> the heartbeat-handler, we can compare the topology epoch and the partition 
> metadata hash, to check if the current configured topology (soft state) 
> matches the partition metadata and topology epoch (hard state). If not, we 
> can configure the topology.
> Fortunately, KIP-1101 introduces the means to generate partition metadata 
> hashes, and proposes saving only the hash to the consumer offset topic. We 
> can facilitate these changes to, rather trivially, fix the handling of 
> configured topologies in streams groups. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to