Ruiqi Dong created KAFKA-20666:
----------------------------------

             Summary: Connect offset backing stores serialize 
ByteBuffer.array() instead of the buffer's remaining bytes
                 Key: KAFKA-20666
                 URL: https://issues.apache.org/jira/browse/KAFKA-20666
             Project: Kafka
          Issue Type: Bug
          Components: connect
            Reporter: Ruiqi Dong


*Summary*
Kafka Connect offset storage APIs use `ByteBuffer` keys and values. A 
`ByteBuffer`'s logical contents are its remaining bytes, not necessarily the 
entire backing array. `KafkaOffsetBackingStore` and `FileOffsetBackingStore` 
serialize offsets with `buffer.array()`. This ignores `position()`, `limit()`, 
and `arrayOffset()`, and also throws for direct buffers. As a result, sliced 
buffers can write different offset keys/values than the caller supplied.
 
*Affected code*
File: 
`connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java`
{code:java}
for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
    ByteBuffer key = entry.getKey();
    ByteBuffer value = entry.getValue();
    offsetLog.send(key == null ? null : key.array(), value == null ? null : 
value.array(), producerCallback);
} {code}
File: 
`connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java`

{code:java}
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
    byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
    byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() 
: null;
    raw.put(key, value);
    OffsetUtils.processPartitionKey(key, value, keyConverter, 
connectorPartitions);
} {code}
`OffsetStorageReaderImpl` has the same pattern when deserializing returned 
values:
{code:java}
valueConverter.toConnectData(namespace, rawEntry.getValue() != null ? 
rawEntry.getValue().array() : null); {code}
{*}Reproducer 1{*}: distributed offset store sends the wrong bytes
Add this test to 
`connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java`:
{code:java}
@Test
public void testSetUsesByteBufferRemainingBytes() {
    setup(false);
    store.configure(mockConfig(props));
    store.start();

    verify(storeLog).start();

    Map<ByteBuffer, ByteBuffer> offsets = Map.of(
            ByteBuffer.wrap("xkeyx".getBytes(), 1, 3),
            ByteBuffer.wrap("xvaluex".getBytes(), 1, 5)
    );

    Future<Void> setFuture = store.set(offsets, (error, result) -> { });
    assertFalse(setFuture.isDone());

    ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback = 
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
    verify(storeLog).send(aryEq(buffer("key").array()), 
aryEq(buffer("value").array()), callback.capture());

    store.stop();

    verify(storeLog).stop();
} {code}
Run:
{code:java}
./gradlew -q :connect:runtime:test \
  --tests KafkaOffsetBackingStoreTest.testSetUsesByteBufferRemainingBytes {code}
Observed behavior:
The test fails because `KafkaOffsetBackingStore` sends the whole backing arrays
{code:java}
Wanted: key=[0x6B, 0x65, 0x79], value=[0x76, 0x61, 0x6C, 0x75, 0x65]
Actual: key=[0x78, 0x6B, 0x65, 0x79, 0x78], value=[0x78, 0x76, 0x61, 0x6C, 
0x75, 0x65, 0x78] {code}
{*}Reproducer 2{*}: file offset store cannot restore a sliced-buffer offset by 
its logical key
Add this test to 
`connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java`
{code:java}
@Test
public void testSaveRestoreUsesByteBufferRemainingBytes() throws Exception {
    @SuppressWarnings("unchecked")
    Callback<Void> setCallback = mock(Callback.class);

    Map<ByteBuffer, ByteBuffer> offsets = Map.of(
            ByteBuffer.wrap("xkeyx".getBytes(), 1, 3),
            ByteBuffer.wrap("xvaluex".getBytes(), 1, 5)
    );

    store.set(offsets, setCallback).get();
    store.stop();

    FileOffsetBackingStore restore = new FileOffsetBackingStore(converter);
    restore.configure(config);
    restore.start();
    Map<ByteBuffer, ByteBuffer> values = 
restore.get(List.of(buffer("key"))).get();
    assertEquals(buffer("value"), values.get(buffer("key")));
    verify(setCallback).onCompletion(isNull(), isNull());
} {code}
Run:
{code:java}
./gradlew -q :connect:runtime:test \
  --tests 
FileOffsetBackingStoreTest.testSaveRestoreUsesByteBufferRemainingBytes {code}
Observed behavior:
The test fails. The file-backed store persists the physical arrays `xkeyx` and 
`xvaluex`, so the restored store has no offset under the logical key `key`.
 
 
*Expected behavior*
Offset backing stores should serialize exactly the remaining bytes of each 
`ByteBuffer`, respecting `position()`, `limit()`, and `arrayOffset()`. They 
should also work for direct buffers.


Kafka Connect offset keys determine source partition identity and offset 
recovery. Persisting a different byte sequence from the logical `ByteBuffer` 
contents can make a task fail to find its previous offsets after restart, 
duplicate work, skip records, or write tombstones under the wrong key. The fix 
direction is to duplicate the buffer and copy `remaining()` bytes into a new 
array instead of calling `array()` directly.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to