Ruiqi Dong created KAFKA-20656:
----------------------------------
Summary: Struct BYTES handling does not normalize ByteBuffer values
Key: KAFKA-20656
URL: https://issues.apache.org/jira/browse/KAFKA-20656
Project: Kafka
Issue Type: Bug
Components: connect
Reporter: Ruiqi Dong
*Summary*
Kafka Connect `Schema.Type.BYTES` explicitly accepts both `byte[]` and
`ByteBuffer`. `ConnectSchema` even recommends `ByteBuffer` because plain arrays
do not implement content-based `equals()` / `hashCode()`. `Struct` does not
consistently honor that contract:
- `getBytes(...)` calls `ByteBuffer.array()`, which returns the whole backing
array instead of the logical remaining bytes.
- `getBytes(...)` throws `UnsupportedOperationException` for direct buffers.
- `equals(...)` and `hashCode()` compare raw stored objects, so equivalent
`byte[]` and `ByteBuffer` BYTES values are not equal.
*Affected code*
File: connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
{code:java}
public byte[] getBytes(String fieldName) {
Object bytes = getCheckType(fieldName, Schema.Type.BYTES);
if (bytes instanceof ByteBuffer)
return ((ByteBuffer) bytes).array();
return (byte[]) bytes;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Struct struct = (Struct) o;
return Objects.equals(schema, struct.schema) &&
Arrays.deepEquals(values, struct.values);
} {code}
`Values.convertToBytes(...)` already uses `Utils.toArray((ByteBuffer) value)`,
which preserves the buffer's remaining bytes and supports direct buffers.
*Reproducer*
Add these tests to
connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java:
{code:java}
@Test
public void testGetBytesPreservesByteBufferRemainingBytes() {
ByteBuffer bytes = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
bytes.position(1);
bytes.limit(3);
Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", (long) 12)
.put("float32", 12.f)
.put("float64", 12.)
.put("boolean", true)
.put("string", "foobar")
.put("bytes", bytes);
assertArrayEquals(new byte[] {2, 3}, struct.getBytes("bytes"));
}
@Test
public void testGetBytesSupportsDirectByteBuffer() {
ByteBuffer bytes = ByteBuffer.allocateDirect(2);
bytes.put((byte) 1);
bytes.put((byte) 2);
bytes.flip();
Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", (long) 12)
.put("float32", 12.f)
.put("float64", 12.)
.put("boolean", true)
.put("string", "foobar")
.put("bytes", bytes);
assertArrayEquals(new byte[] {1, 2}, struct.getBytes("bytes"));
}
@Test
public void testEqualsAndHashCodeWithEquivalentByteArrayAndByteBufferValues() {
Struct byteArrayStruct = new Struct(FLAT_STRUCT_SCHEMA)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", (long) 12)
.put("float32", 12.f)
.put("float64", 12.)
.put("boolean", true)
.put("string", "foobar")
.put("bytes", "foobar".getBytes());
Struct byteBufferStruct = new Struct(FLAT_STRUCT_SCHEMA)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", (long) 12)
.put("float32", 12.f)
.put("float64", 12.)
.put("boolean", true)
.put("string", "foobar")
.put("bytes", ByteBuffer.wrap("foobar".getBytes()));
assertEquals(byteArrayStruct, byteBufferStruct);
assertEquals(byteArrayStruct.hashCode(), byteBufferStruct.hashCode());
} {code}
Run:
{code:java}
./gradlew -q :connect:api:test \
--tests
org.apache.kafka.connect.data.StructTest.testGetBytesPreservesByteBufferRemainingBytes
\
--tests
org.apache.kafka.connect.data.StructTest.testGetBytesSupportsDirectByteBuffer \
--tests
org.apache.kafka.connect.data.StructTest.testEqualsAndHashCodeWithEquivalentByteArrayAndByteBufferValues{code}
Observed behavior:
All three tests fail
{code:java}
array lengths differ, expected: <2> but was: <4>
java.lang.UnsupportedOperationException
expected Struct{...,bytes=[B@...} but was
Struct{...,bytes=java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]} {code}
Expected behavior:
`Struct.getBytes(...)` should return the logical BYTES value represented by the
buffer's current remaining bytes and should support direct buffers. Equivalent
BYTES values should compare consistently regardless of whether the value was
supplied as `byte[]` or `ByteBuffer`.
`ByteBuffer` is a documented valid representation for Connect BYTES values.
Using `ByteBuffer.array()` exposes storage details instead of the logical data
and rejects valid direct buffers. This makes a public typed getter unreliable
for one of the two supported BYTES representations.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)