Ruiqi Dong created KAFKA-20657:
----------------------------------
Summary: JsonConverter serializes ByteBuffer BYTES values using
the backing array
Key: KAFKA-20657
URL: https://issues.apache.org/jira/browse/KAFKA-20657
Project: Kafka
Issue Type: Bug
Components: connect
Reporter: Ruiqi Dong
*Summary*
Kafka Connect BYTES values may be represented as either `byte[]` or
`ByteBuffer`. `JsonConverter` handles `ByteBuffer` by calling `.array()`. That
produces incorrect JSON for sliced buffers and throws for direct buffers.
*Affected code*
File:
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
{code:java}
case BYTES:
if (value instanceof byte[])
return JSON_NODE_FACTORY.binaryNode((byte[]) value);
else if (value instanceof ByteBuffer)
return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array());
else
throw new DataException("Invalid type for bytes type: " +
value.getClass());
{code}
The same module can rely on `Utils.toArray((ByteBuffer) value)` to preserve the
buffer's remaining bytes and support direct buffers.
*Reproducer*
Add these tests to
connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java:
{code:java}
@Test
public void slicedByteBufferToJsonUsesRemainingBytes() throws IOException {
ByteBuffer bytes = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
bytes.position(1);
bytes.limit(3);
JsonNode converted = parse(converter.fromConnectData(TOPIC,
Schema.BYTES_SCHEMA, bytes));
validateEnvelope(converted);
assertArrayEquals(new byte[] {2, 3},
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue());
}
@Test
public void directByteBufferToJson() throws IOException {
ByteBuffer bytes = ByteBuffer.allocateDirect(2);
bytes.put((byte) 1);
bytes.put((byte) 2);
bytes.flip();
JsonNode converted = parse(converter.fromConnectData(TOPIC,
Schema.BYTES_SCHEMA, bytes));
validateEnvelope(converted);
assertArrayEquals(new byte[] {1, 2},
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue());
} {code}
Run:
{code:java}
./gradlew -q :connect:json:test \
--tests
org.apache.kafka.connect.json.JsonConverterTest.slicedByteBufferToJsonUsesRemainingBytes
\
--tests org.apache.kafka.connect.json.JsonConverterTest.directByteBufferToJson
{code}
Observed behavior:
Both tests fail
{code:java}
java.lang.UnsupportedOperationException
array lengths differ, expected: <2> but was: <4> {code}
The sliced buffer serializes bytes `\{1,2,3,4}` instead of the logical
remaining bytes `\{2,3}`.
Expected behavior:
Serializing a Connect BYTES value represented as `ByteBuffer` should use the
buffer's logical remaining bytes, independent of whether the buffer is
heap-backed, sliced, or direct.
`ByteBuffer` is a valid Connect BYTES representation. `ByteBuffer.array()` is
not equivalent to the BYTES value because it ignores `position()` / `limit()`
and is unsupported for direct buffers.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)