Ruiqi Dong created KAFKA-20656:
----------------------------------

             Summary: Struct BYTES handling does not normalize ByteBuffer values
                 Key: KAFKA-20656
                 URL: https://issues.apache.org/jira/browse/KAFKA-20656
             Project: Kafka
          Issue Type: Bug
          Components: connect
            Reporter: Ruiqi Dong


*Summary*
Kafka Connect `Schema.Type.BYTES` explicitly accepts both `byte[]` and 
`ByteBuffer`. `ConnectSchema` even recommends `ByteBuffer` because plain arrays 
do not implement content-based `equals()` / `hashCode()`. `Struct` does not 
consistently honor that contract:
- `getBytes(...)` calls `ByteBuffer.array()`, which returns the whole backing 
array instead of the logical remaining bytes.
- `getBytes(...)` throws `UnsupportedOperationException` for direct buffers.
- `equals(...)` and `hashCode()` compare raw stored objects, so equivalent 
`byte[]` and `ByteBuffer` BYTES values are not equal.
 
*Affected code*
File: connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
{code:java}
public byte[] getBytes(String fieldName) {
    Object bytes = getCheckType(fieldName, Schema.Type.BYTES);
    if (bytes instanceof ByteBuffer)
        return ((ByteBuffer) bytes).array();
    return (byte[]) bytes;
}

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    Struct struct = (Struct) o;
    return Objects.equals(schema, struct.schema) &&
            Arrays.deepEquals(values, struct.values);
} {code}
`Values.convertToBytes(...)` already uses `Utils.toArray((ByteBuffer) value)`, 
which preserves the buffer's remaining bytes and supports direct buffers.
 
*Reproducer*
Add these tests to 
connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java:
{code:java}
@Test
public void testGetBytesPreservesByteBufferRemainingBytes() {
    ByteBuffer bytes = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
    bytes.position(1);
    bytes.limit(3);

    Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
            .put("int8", (byte) 12)
            .put("int16", (short) 12)
            .put("int32", 12)
            .put("int64", (long) 12)
            .put("float32", 12.f)
            .put("float64", 12.)
            .put("boolean", true)
            .put("string", "foobar")
            .put("bytes", bytes);

    assertArrayEquals(new byte[] {2, 3}, struct.getBytes("bytes"));
}

@Test
public void testGetBytesSupportsDirectByteBuffer() {
    ByteBuffer bytes = ByteBuffer.allocateDirect(2);
    bytes.put((byte) 1);
    bytes.put((byte) 2);
    bytes.flip();

    Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
            .put("int8", (byte) 12)
            .put("int16", (short) 12)
            .put("int32", 12)
            .put("int64", (long) 12)
            .put("float32", 12.f)
            .put("float64", 12.)
            .put("boolean", true)
            .put("string", "foobar")
            .put("bytes", bytes);

    assertArrayEquals(new byte[] {1, 2}, struct.getBytes("bytes"));
}

@Test
public void testEqualsAndHashCodeWithEquivalentByteArrayAndByteBufferValues() {
    Struct byteArrayStruct = new Struct(FLAT_STRUCT_SCHEMA)
            .put("int8", (byte) 12)
            .put("int16", (short) 12)
            .put("int32", 12)
            .put("int64", (long) 12)
            .put("float32", 12.f)
            .put("float64", 12.)
            .put("boolean", true)
            .put("string", "foobar")
            .put("bytes", "foobar".getBytes());

    Struct byteBufferStruct = new Struct(FLAT_STRUCT_SCHEMA)
            .put("int8", (byte) 12)
            .put("int16", (short) 12)
            .put("int32", 12)
            .put("int64", (long) 12)
            .put("float32", 12.f)
            .put("float64", 12.)
            .put("boolean", true)
            .put("string", "foobar")
            .put("bytes", ByteBuffer.wrap("foobar".getBytes()));

    assertEquals(byteArrayStruct, byteBufferStruct);
    assertEquals(byteArrayStruct.hashCode(), byteBufferStruct.hashCode());
} {code}
Run:
{code:java}
./gradlew -q :connect:api:test \
  --tests 
org.apache.kafka.connect.data.StructTest.testGetBytesPreservesByteBufferRemainingBytes
 \
  --tests 
org.apache.kafka.connect.data.StructTest.testGetBytesSupportsDirectByteBuffer \
  --tests 
org.apache.kafka.connect.data.StructTest.testEqualsAndHashCodeWithEquivalentByteArrayAndByteBufferValues{code}
Observed behavior:
All three tests fail
{code:java}
array lengths differ, expected: <2> but was: <4>
java.lang.UnsupportedOperationException
expected Struct{...,bytes=[B@...} but was 
Struct{...,bytes=java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]} {code}
Expected behavior:
`Struct.getBytes(...)` should return the logical BYTES value represented by the 
buffer's current remaining bytes and should support direct buffers. Equivalent 
BYTES values should compare consistently regardless of whether the value was 
supplied as `byte[]` or `ByteBuffer`.
 
`ByteBuffer` is a documented valid representation for Connect BYTES values. 
Using `ByteBuffer.array()` exposes storage details instead of the logical data 
and rejects valid direct buffers. This makes a public typed getter unreliable 
for one of the two supported BYTES representations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to