lucasbru commented on code in PR #21642:
URL: https://github.com/apache/kafka/pull/21642#discussion_r2895058356


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceProtocolMigrationIntegrationTest.java:
##########
@@ -134,6 +145,125 @@ public void 
shouldMigrateToAndFromStreamsRebalanceProtocol() throws Exception {
         processExactlyOneRecord(streamsBuilder, props, "3", "C");
     }
 
+    @Test
+    public void shouldMigrateFromClassicToStreamsAfterBrokerRestart() throws 
Exception {
+        // This test reproduces KAFKA-20254: after log compaction removes the
+        // GroupMetadata tombstone from __consumer_offsets, offset commit 
records
+        // (which precede the streams group records in the log) create a simple
+        // classic group during replay, and then the streams group records must
+        // handle this existing simple classic group.
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<String, String> input = streamsBuilder.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        input.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        final Properties props = props();
+        final String appId = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+
+        // Step 1: Run with the classic protocol and process a record.

Review Comment:
   Removed.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceProtocolMigrationIntegrationTest.java:
##########
@@ -134,6 +145,125 @@ public void 
shouldMigrateToAndFromStreamsRebalanceProtocol() throws Exception {
         processExactlyOneRecord(streamsBuilder, props, "3", "C");
     }
 
+    @Test
+    public void shouldMigrateFromClassicToStreamsAfterBrokerRestart() throws 
Exception {
+        // This test reproduces KAFKA-20254: after log compaction removes the
+        // GroupMetadata tombstone from __consumer_offsets, offset commit 
records
+        // (which precede the streams group records in the log) create a simple
+        // classic group during replay, and then the streams group records must
+        // handle this existing simple classic group.
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<String, String> input = streamsBuilder.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        input.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        final Properties props = props();
+        final String appId = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+
+        // Step 1: Run with the classic protocol and process a record.
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
+        processExactlyOneRecord(streamsBuilder, props, "1", "A");
+
+        // Wait for session to time out so the group becomes empty.

Review Comment:
   Good idea. I've added a `leaveGroup` parameter to `processExactlyOneRecord` 
and use `CloseOptions.groupMembershipOperation(LEAVE_GROUP)` for the classic 
protocol step, which removes the need for `waitForEmptyConsumerGroup`.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceProtocolMigrationIntegrationTest.java:
##########
@@ -134,6 +145,125 @@ public void 
shouldMigrateToAndFromStreamsRebalanceProtocol() throws Exception {
         processExactlyOneRecord(streamsBuilder, props, "3", "C");
     }
 
+    @Test
+    public void shouldMigrateFromClassicToStreamsAfterBrokerRestart() throws 
Exception {
+        // This test reproduces KAFKA-20254: after log compaction removes the
+        // GroupMetadata tombstone from __consumer_offsets, offset commit 
records
+        // (which precede the streams group records in the log) create a simple
+        // classic group during replay, and then the streams group records must
+        // handle this existing simple classic group.
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<String, String> input = streamsBuilder.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        input.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        final Properties props = props();
+        final String appId = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+
+        // Step 1: Run with the classic protocol and process a record.
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
+        processExactlyOneRecord(streamsBuilder, props, "1", "A");
+
+        // Wait for session to time out so the group becomes empty.
+        try (final Admin adminClient = 
Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()))) {
+            waitForEmptyConsumerGroup(adminClient, appId, 1000);
+        }
+
+        // Step 2: Commit an offset for an "orphan" topic using the same group 
ID.
+        // This offset commit record will precede the streams group records in 
the
+        // log and will survive compaction because the streams group never 
commits
+        // for this topic-partition.

Review Comment:
   It does commit for the input topic — but the orphan offset is for a 
different topic-partition that the streams app never subscribes to, so its 
offset commit record has a different key in `__consumer_offsets` and won't be 
overwritten. I've clarified the comment.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceProtocolMigrationIntegrationTest.java:
##########
@@ -134,6 +145,125 @@ public void 
shouldMigrateToAndFromStreamsRebalanceProtocol() throws Exception {
         processExactlyOneRecord(streamsBuilder, props, "3", "C");
     }
 
+    @Test
+    public void shouldMigrateFromClassicToStreamsAfterBrokerRestart() throws 
Exception {
+        // This test reproduces KAFKA-20254: after log compaction removes the
+        // GroupMetadata tombstone from __consumer_offsets, offset commit 
records
+        // (which precede the streams group records in the log) create a simple
+        // classic group during replay, and then the streams group records must
+        // handle this existing simple classic group.
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<String, String> input = streamsBuilder.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        input.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        final Properties props = props();
+        final String appId = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+
+        // Step 1: Run with the classic protocol and process a record.
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
+        processExactlyOneRecord(streamsBuilder, props, "1", "A");
+
+        // Wait for session to time out so the group becomes empty.
+        try (final Admin adminClient = 
Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()))) {
+            waitForEmptyConsumerGroup(adminClient, appId, 1000);
+        }
+
+        // Step 2: Commit an offset for an "orphan" topic using the same group 
ID.
+        // This offset commit record will precede the streams group records in 
the
+        // log and will survive compaction because the streams group never 
commits
+        // for this topic-partition.
+        final String orphanTopic = "orphan-" + safeTestName;
+        CLUSTER.createTopic(orphanTopic);
+        commitOrphanOffset(appId, orphanTopic);
+
+        // Step 3: Migrate to the streams protocol and process a record. This
+        // writes a GroupMetadata tombstone for the classic group followed by
+        // streams group records.
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
+        processExactlyOneRecord(streamsBuilder, props, "2", "B");
+
+        // Step 4: Configure aggressive compaction so that the GroupMetadata
+        // tombstone is removed, leaving only the orphan offset commit record
+        // before the streams group records.
+        configureAggressiveCompaction();
+
+        // Step 5: Flood __consumer_offsets with offset commits to trigger 
segment
+        // rotation and compaction.
+        floodConsumerOffsetsForCompaction();
+
+        // Wait for compaction to clean up the GroupMetadata tombstone.
+        Thread.sleep(5000);

Review Comment:
   Agreed, I couldn't find a reliable way to assert compaction has run from the 
client side. I've added a comment noting the false-positive risk. As you said, 
worst case is a missing test signal, not a flaky failure.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceProtocolMigrationIntegrationTest.java:
##########
@@ -134,6 +145,125 @@ public void 
shouldMigrateToAndFromStreamsRebalanceProtocol() throws Exception {
         processExactlyOneRecord(streamsBuilder, props, "3", "C");
     }
 
+    @Test
+    public void shouldMigrateFromClassicToStreamsAfterBrokerRestart() throws 
Exception {
+        // This test reproduces KAFKA-20254: after log compaction removes the
+        // GroupMetadata tombstone from __consumer_offsets, offset commit 
records
+        // (which precede the streams group records in the log) create a simple
+        // classic group during replay, and then the streams group records must
+        // handle this existing simple classic group.
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<String, String> input = streamsBuilder.stream(
+            inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        input.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        final Properties props = props();
+        final String appId = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+
+        // Step 1: Run with the classic protocol and process a record.
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
+        processExactlyOneRecord(streamsBuilder, props, "1", "A");
+
+        // Wait for session to time out so the group becomes empty.
+        try (final Admin adminClient = 
Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()))) {
+            waitForEmptyConsumerGroup(adminClient, appId, 1000);
+        }
+
+        // Step 2: Commit an offset for an "orphan" topic using the same group 
ID.
+        // This offset commit record will precede the streams group records in 
the
+        // log and will survive compaction because the streams group never 
commits
+        // for this topic-partition.
+        final String orphanTopic = "orphan-" + safeTestName;
+        CLUSTER.createTopic(orphanTopic);
+        commitOrphanOffset(appId, orphanTopic);
+
+        // Step 3: Migrate to the streams protocol and process a record. This
+        // writes a GroupMetadata tombstone for the classic group followed by
+        // streams group records.
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name());
+        processExactlyOneRecord(streamsBuilder, props, "2", "B");
+
+        // Step 4: Configure aggressive compaction so that the GroupMetadata
+        // tombstone is removed, leaving only the orphan offset commit record
+        // before the streams group records.
+        configureAggressiveCompaction();
+
+        // Step 5: Flood __consumer_offsets with offset commits to trigger 
segment
+        // rotation and compaction.
+        floodConsumerOffsetsForCompaction();
+
+        // Wait for compaction to clean up the GroupMetadata tombstone.
+        Thread.sleep(5000);
+
+        // Step 6: Restart the broker to force replay of __consumer_offsets.
+        CLUSTER.shutdownBroker(0);
+        CLUSTER.startBroker(0);
+
+        // Step 7: Verify the broker can still serve the streams group after 
replay.
+        processExactlyOneRecord(streamsBuilder, props, "3", "C");
+    }
+
+    private void commitOrphanOffset(final String groupId, final String topic) {
+        final Properties consumerProps = new Properties();
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        try (final org.apache.kafka.clients.consumer.KafkaConsumer<String, 
String> consumer =

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to