adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031757437


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition 
sharePartition, Map<Long, L
         return errorMessage.toString();
     }
 
+    @Test
+    public void testFilterRecordBatchesFromAcquiredRecords() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        List<AcquiredRecords> acquiredRecords1 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches1 = List.of(
+            memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+            memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)),
+            
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, 
recordBatches1));
+
+        List<AcquiredRecords> acquiredRecords2 = List.of(
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
3),
+            new 
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 
3)
+        );
+        List<RecordBatch> recordBatches2 = List.of(
+            memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+            memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+        );
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+                new 
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 
2),
+                new 
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 
3)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
recordBatches2)
+        );
+
+        // Record batches is empty.
+        assertEquals(acquiredRecords2, 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
List.of()));
+
+        List<AcquiredRecords> acquiredRecords3 = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 
1)
+        );
+        List<RecordBatch> recordBatches3 = List.of(
+            memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+            memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+        );
+
+        assertEquals(
+            List.of(
+                new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 
1),
+                new 
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 
1)
+
+            ), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, 
recordBatches3)
+        );
+    }
+
+    @Test
+    public void testAcquireWithReadCommittedIsolationLevel() {
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 10).close();
+        memoryRecordsBuilder(buffer, 5, 15).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        memoryRecordsBuilder(buffer, 8, 50).close();
+        memoryRecordsBuilder(buffer, 10, 58).close();
+        memoryRecordsBuilder(buffer, 5, 70).close();
+
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        FetchPartitionData fetchPartitionData = fetchPartitionData(records, 
newAbortedTransactions());
+
+        // We are mocking the result of function 
fetchAbortedTransactionRecordBatches. The records present at these offsets need 
to be archived.
+        // We won't be utilizing the aborted transactions passed in 
fetchPartitionData.
+        
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
 fetchPartitionData.abortedTransactions.get())).thenReturn(
+            List.of(
+                memoryRecordsBuilder(5, 
10).build().batches().iterator().next(),
+                memoryRecordsBuilder(10, 
58).build().batches().iterator().next(),
+                memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+            )
+        );
+
+        List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+            sharePartition.acquire(
+                MEMBER_ID,
+                10 /* Batch size */,
+                100,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData,
+                FetchIsolation.TXN_COMMITTED),
+            45 /* Gap of 15 records will be added to second batch, gap of 2 
records will also be added to fourth batch */);
+
+        assertEquals(List.of(
+            new 
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(50).setLastOffset(57).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(68).setLastOffset(69).setDeliveryCount((short) 
1)
+        ), acquiredRecordsList);
+        assertEquals(75, sharePartition.nextFetchOffset());
+
+        // Checking cached state.
+        assertEquals(4, sharePartition.cachedState().size());
+        assertTrue(sharePartition.cachedState().containsKey(10L));
+        assertTrue(sharePartition.cachedState().containsKey(20L));
+        assertTrue(sharePartition.cachedState().containsKey(50L));
+        assertTrue(sharePartition.cachedState().containsKey(70L));
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertNotNull(sharePartition.cachedState().get(50L).offsetState());
+
+        assertEquals(19L, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(49L, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(69L, sharePartition.cachedState().get(50L).lastOffset());
+        assertEquals(74L, sharePartition.cachedState().get(70L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(70L).batchState());
+
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(20L).batchMemberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(70L).batchMemberId());
+
+        
assertNotNull(sharePartition.cachedState().get(20L).batchAcquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(70L).batchAcquisitionLockTimeoutTask());
+
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(17L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(18L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(19L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(11L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(12L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(13L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(10L).offsetState().get(14L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(15L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(16L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(17L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(18L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(10L).offsetState().get(19L).acquisitionLockTimeoutTask());
+
+        expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(50L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(51L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(52L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(53L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(54L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(55L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(56L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(57L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(58L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(59L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(60L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(61L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(62L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(63L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(64L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(65L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(66L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(67L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(68L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(69L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(50L).offsetState());
+
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(50L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(51L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(52L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(53L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(54L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(55L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(56L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(57L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(58L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(59L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(60L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(61L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(62L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(63L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(64L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(65L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(66L).acquisitionLockTimeoutTask());
+        
assertNull(sharePartition.cachedState().get(50L).offsetState().get(67L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(68L).acquisitionLockTimeoutTask());
+        
assertNotNull(sharePartition.cachedState().get(50L).offsetState().get(69L).acquisitionLockTimeoutTask());
+    }
+
+    @Test
+    public void testContainsAbortMarker() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Record batch is not a control batch.
+        RecordBatch recordBatch = mock(RecordBatch.class);
+        when(recordBatch.isControlBatch()).thenReturn(false);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch but doesn't contain any records..
+        recordBatch = mock(RecordBatch.class);
+        Iterator batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(false);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch which contains a record of type 
ControlRecordType.ABORT.
+        recordBatch = mock(RecordBatch.class);
+        batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(true);
+        DefaultRecord record = mock(DefaultRecord.class);
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Buffer has to be created in a way that 
ControlRecordType.parse(buffer) returns ControlRecordType.ABORT.
+        buffer.putShort((short) 5);
+        buffer.putShort(ControlRecordType.ABORT.type());
+        buffer.putInt(23432); // some field added in version 5
+        buffer.flip();
+        when(record.key()).thenReturn(buffer);
+        when(batchIterator.next()).thenReturn(record);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertTrue(sharePartition.containsAbortMarker(recordBatch));
+
+        // Record batch is a control batch which contains a record of type 
ControlRecordType.COMMIT.
+        recordBatch = mock(RecordBatch.class);
+        batchIterator = mock(Iterator.class);
+        when(batchIterator.hasNext()).thenReturn(true);
+        record = mock(DefaultRecord.class);
+        buffer = ByteBuffer.allocate(4096);
+        // Buffer has to be created in a way that 
ControlRecordType.parse(buffer) returns ControlRecordType.COMMIT.
+        buffer.putShort((short) 5);
+        buffer.putShort(ControlRecordType.COMMIT.type());
+        buffer.putInt(23432); // some field added in version 5
+        buffer.flip();
+        when(record.key()).thenReturn(buffer);
+        when(batchIterator.next()).thenReturn(record);
+        when(recordBatch.iterator()).thenReturn(batchIterator);
+        when(recordBatch.isControlBatch()).thenReturn(true);
+        assertFalse(sharePartition.containsAbortMarker(recordBatch));
+    }
+
+    @Test
+    public void 
testFetchAbortedTransactionRecordBatchesForOnlyAbortedTransactions() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Case 1 - Creating 10 transactional records in a single batch 
followed by a ABORT marker record for producerId 1.
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 1, 0);
+        buffer.flip();
+        Records records = MemoryRecords.readableRecords(buffer);
+
+        List<FetchResponseData.AbortedTransaction> abortedTransactions = 
List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+        );
+        // records from 0 to 9 should be archived because they are a part of 
aborted transactions.
+        List<RecordBatch> actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(1, actual.size());
+        assertEquals(0, actual.get(0).baseOffset());
+        assertEquals(9, actual.get(0).lastOffset());
+        assertEquals(1, actual.get(0).producerId());
+
+        // Case 2: 3 individual batches each followed by a ABORT marker record 
for producerId 1.
+        buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 0);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 2);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 1, 1, 4);
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        abortedTransactions = List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(2).setProducerId(1),
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(4).setProducerId(1)
+        );
+
+        actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(3, actual.size());
+        assertEquals(0, actual.get(0).baseOffset());
+        assertEquals(0, actual.get(0).lastOffset());
+        assertEquals(1, actual.get(0).producerId());
+        assertEquals(2, actual.get(1).baseOffset());
+        assertEquals(2, actual.get(1).lastOffset());
+        assertEquals(1, actual.get(1).producerId());
+        assertEquals(4, actual.get(2).baseOffset());
+        assertEquals(4, actual.get(2).lastOffset());
+        assertEquals(1, actual.get(2).producerId());
+
+        // Case 3: The producer id of records is different, so they should not 
be archived,
+        buffer = ByteBuffer.allocate(1024);
+        // We are creating 10 transactional records followed by a ABORT marker 
record for producerId 2.
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 10, 2, 0);
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        abortedTransactions = List.of(
+            new 
FetchResponseData.AbortedTransaction().setFirstOffset(0).setProducerId(1)
+        );
+
+        actual = 
sharePartition.fetchAbortedTransactionRecordBatches(records.batches(), 
abortedTransactions);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void 
testFetchAbortedTransactionRecordBatchesForAbortedAndCommittedTransactions() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 0);
+        newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 2, 3);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 2, 6);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 9);
+        newTransactionalRecords(buffer, ControlRecordType.COMMIT, 2, 1, 12);
+        newTransactionalRecords(buffer, ControlRecordType.ABORT, 2, 1, 15);
+        buffer.flip();
+        Records records = MemoryRecords.readableRecords(buffer);
+
+        // Case 1 - Aborted transactions does not contain the record batch 
from 4-5 with producer id 2.

Review Comment:
   Corrected the comment to `Case 1 - Aborted transactions does not contain the 
record batch from offsets 6-7 with producer id 2.` I added this item `new 
FetchResponseData.AbortedTransaction().setFirstOffset(6).setProducerId(1)` to 
`abortedTransactions` to show that even if offsets 6-7 is mentioned under 
`abortedTransactions`, they won't get filtered out  for archiving since they 
have a different `producerId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to