[
https://issues.apache.org/jira/browse/KAFKA-20170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18078150#comment-18078150
]
sanghyeok An edited comment on KAFKA-20170 at 5/4/26 9:48 AM:
--------------------------------------------------------------
Hi, [~lucasbru]!
I would like to get your opinion on one case. A topology update could turn two
previously independent source topics into a copartition group. For example:
{code:java}
// topology epoch 0
KStream<String, Order> orders = builder.stream("orders"); // 3 partitions
KStream<String, Click> clicks = builder.stream("clicks"); // 5 partitions
orders.mapValues(...).to("order-out");
clicks.mapValues(...).to("click-out");
{code}
After the topology update, the two streams could be joined:
{code:java}
// topology epoch 1
KStream<String, Order> orders = builder.stream("orders"); // 3 partitions
KStream<String, Click> clicks = builder.stream("clicks"); // 5 partitions
orders.join(
clicks,
(order, click) -> enrich(order, click),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
).to("enriched-orders");
{code}
In this specific example, the new topology should fail the existing topology
configuration/validation with INCORRECTLY_PARTITIONED_TOPICS, because the two
external source topics in the copartition group have different partition
counts. In that case, the task assignor is not invoked with the invalid
topology; the configured topology is not ready and the target assignment
becomes empty.
So I do not think this invalid copartitioning case needs special handling in
the topology-epoch-aware assignment logic itself. Does that match your
understanding?
was (Author: JIRAUSER303328):
[~lucasbru]
I would like to get your opinion on one case. A topology update could turn two
previously independent source topics into a copartition group. For example:
{code:java}
// topology epoch 0
KStream<String, Order> orders = builder.stream("orders"); // 3 partitions
KStream<String, Click> clicks = builder.stream("clicks"); // 5 partitions
orders.mapValues(...).to("order-out");
clicks.mapValues(...).to("click-out");
{code}
After the topology update, the two streams could be joined:
{code:java}
// topology epoch 1
KStream<String, Order> orders = builder.stream("orders"); // 3 partitions
KStream<String, Click> clicks = builder.stream("clicks"); // 5 partitions
orders.join(
clicks,
(order, click) -> enrich(order, click),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5))
).to("enriched-orders");
{code}
In this specific example, the new topology should fail the existing topology
configuration/validation with INCORRECTLY_PARTITIONED_TOPICS, because the two
external source topics in the copartition group have different partition
counts. In that case, the task assignor is not invoked with the invalid
topology; the configured topology is not ready and the target assignment
becomes empty.
So I do not think this invalid copartitioning case needs special handling in
the topology-epoch-aware assignment logic itself. Does that match your
understanding?
> Support topology updates for streams groups without requiring a new group
> -------------------------------------------------------------------------
>
> Key: KAFKA-20170
> URL: https://issues.apache.org/jira/browse/KAFKA-20170
> Project: Kafka
> Issue Type: Task
> Components: group-coordinator, streams
> Reporter: Lucas Brutschy
> Assignee: sanghyeok An
> Priority: Major
> Labels: kip1071
>
> The streams rebalance protocol \(KIP\-1071\) does not currently support
> topology updates. If a topology is changed significantly \(e.g., by adding
> new source topics or changing the number of subtopologies\), users must
> create a new streams group. This is a significant limitation for applications
> that need to evolve over time.
> As specified in KIP\-1071, topology updates should be supported through
> explicit versioning via a topology.epoch configuration. When a member joins
> with a different topology, the broker should compare epochs: if the epoch is
> the same but metadata differs, return STREAMS\_INVALID\_TOPOLOGY\_EPOCH; if
> the epoch is lower, return STREAMS\_TOPOLOGY\_FENCED; if the epoch is bumped
> by one, accept and update the group topology. Members with stale topologies
> should receive a STALE\_TOPOLOGY status in heartbeat responses but continue
> processing with their current tasks until upgraded. This enables rolling
> upgrades of the topology across application instances.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)