[ 
https://issues.apache.org/jira/browse/KAFKA-20170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18078150#comment-18078150
 ] 

sanghyeok An edited comment on KAFKA-20170 at 5/4/26 9:48 AM:
--------------------------------------------------------------

Hi, [~lucasbru]!
I would like to get your opinion on one case. A topology update could turn two 
previously independent source topics into a copartition group. For example:

{code:java}
  // topology epoch 0
  KStream<String, Order> orders = builder.stream("orders");     // 3 partitions
  KStream<String, Click> clicks = builder.stream("clicks");     // 5 partitions

  orders.mapValues(...).to("order-out");
  clicks.mapValues(...).to("click-out");
{code}

After the topology update, the two streams could be joined:

{code:java}
  // topology epoch 1
  KStream<String, Order> orders = builder.stream("orders");     // 3 partitions
  KStream<String, Click> clicks = builder.stream("clicks");     // 5 partitions

  orders.join(
      clicks,
      (order, click) -> enrich(order, click),
      JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
  ).to("enriched-orders");
{code}

In this specific example, the new topology should fail the existing topology 
configuration/validation with INCORRECTLY_PARTITIONED_TOPICS, because the two 
external source topics in the copartition group have different partition 
counts. In that case, the task assignor is not invoked with the invalid 
topology; the configured topology is not ready and the target assignment 
becomes empty.

So I do not think this invalid copartitioning case needs special handling in 
the topology-epoch-aware assignment logic itself. Does that match your 
understanding?


was (Author: JIRAUSER303328):
[~lucasbru]
I would like to get your opinion on one case. A topology update could turn two 
previously independent source topics into a copartition group. For example:

{code:java}
  // topology epoch 0
  KStream<String, Order> orders = builder.stream("orders");     // 3 partitions
  KStream<String, Click> clicks = builder.stream("clicks");     // 5 partitions

  orders.mapValues(...).to("order-out");
  clicks.mapValues(...).to("click-out");
{code}

After the topology update, the two streams could be joined:

{code:java}
  // topology epoch 1
  KStream<String, Order> orders = builder.stream("orders");     // 3 partitions
  KStream<String, Click> clicks = builder.stream("clicks");     // 5 partitions

  orders.join(
      clicks,
      (order, click) -> enrich(order, click),
      JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
  ).to("enriched-orders");
{code}

In this specific example, the new topology should fail the existing topology 
configuration/validation with INCORRECTLY_PARTITIONED_TOPICS, because the two 
external source topics in the copartition group have different partition 
counts. In that case, the task assignor is not invoked with the invalid 
topology; the configured topology is not ready and the target assignment 
becomes empty.

So I do not think this invalid copartitioning case needs special handling in 
the topology-epoch-aware assignment logic itself. Does that match your 
understanding?

> Support topology updates for streams groups without requiring a new group
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-20170
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20170
>             Project: Kafka
>          Issue Type: Task
>          Components: group-coordinator, streams
>            Reporter: Lucas Brutschy
>            Assignee: sanghyeok An
>            Priority: Major
>              Labels: kip1071
>
> The streams rebalance protocol \(KIP\-1071\) does not currently support 
> topology updates. If a topology is changed significantly \(e.g., by adding 
> new source topics or changing the number of subtopologies\), users must 
> create a new streams group. This is a significant limitation for applications 
> that need to evolve over time.
> As specified in KIP\-1071, topology updates should be supported through 
> explicit versioning via a topology.epoch configuration. When a member joins 
> with a different topology, the broker should compare epochs: if the epoch is 
> the same but metadata differs, return STREAMS\_INVALID\_TOPOLOGY\_EPOCH; if 
> the epoch is lower, return STREAMS\_TOPOLOGY\_FENCED; if the epoch is bumped 
> by one, accept and update the group topology. Members with stale topologies 
> should receive a STALE\_TOPOLOGY status in heartbeat responses but continue 
> processing with their current tasks until upgraded. This enables rolling 
> upgrades of the topology across application instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to