Ruiqi Dong created KAFKA-20657:
----------------------------------

             Summary: JsonConverter serializes ByteBuffer BYTES values using 
the backing array
                 Key: KAFKA-20657
                 URL: https://issues.apache.org/jira/browse/KAFKA-20657
             Project: Kafka
          Issue Type: Bug
          Components: connect
            Reporter: Ruiqi Dong


*Summary*
Kafka Connect BYTES values may be represented as either `byte[]` or 
`ByteBuffer`. `JsonConverter` handles `ByteBuffer` by calling `.array()`. That 
produces incorrect JSON for sliced buffers and throws for direct buffers.
 
*Affected code*
File: 
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
{code:java}
case BYTES:
    if (value instanceof byte[])
        return JSON_NODE_FACTORY.binaryNode((byte[]) value);
    else if (value instanceof ByteBuffer)
        return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array());
    else
        throw new DataException("Invalid type for bytes type: " + 
value.getClass());
 {code}
The same module can rely on `Utils.toArray((ByteBuffer) value)` to preserve the 
buffer's remaining bytes and support direct buffers.
 
*Reproducer*
Add these tests to 
connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java:
{code:java}
@Test
public void slicedByteBufferToJsonUsesRemainingBytes() throws IOException {
    ByteBuffer bytes = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
    bytes.position(1);
    bytes.limit(3);

    JsonNode converted = parse(converter.fromConnectData(TOPIC, 
Schema.BYTES_SCHEMA, bytes));
    validateEnvelope(converted);
    assertArrayEquals(new byte[] {2, 3}, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue());
}

@Test
public void directByteBufferToJson() throws IOException {
    ByteBuffer bytes = ByteBuffer.allocateDirect(2);
    bytes.put((byte) 1);
    bytes.put((byte) 2);
    bytes.flip();

    JsonNode converted = parse(converter.fromConnectData(TOPIC, 
Schema.BYTES_SCHEMA, bytes));
    validateEnvelope(converted);
    assertArrayEquals(new byte[] {1, 2}, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue());
} {code}
Run:
{code:java}
./gradlew -q :connect:json:test \
  --tests 
org.apache.kafka.connect.json.JsonConverterTest.slicedByteBufferToJsonUsesRemainingBytes
 \
  --tests org.apache.kafka.connect.json.JsonConverterTest.directByteBufferToJson
 {code}
Observed behavior:
Both tests fail
{code:java}
java.lang.UnsupportedOperationException
array lengths differ, expected: <2> but was: <4> {code}
The sliced buffer serializes bytes `\{1,2,3,4}` instead of the logical 
remaining bytes `\{2,3}`.
 
Expected behavior:
Serializing a Connect BYTES value represented as `ByteBuffer` should use the 
buffer's logical remaining bytes, independent of whether the buffer is 
heap-backed, sliced, or direct.
 
 
`ByteBuffer` is a valid Connect BYTES representation. `ByteBuffer.array()` is 
not equivalent to the BYTES value because it ignores `position()` / `limit()` 
and is unsupported for direct buffers.
 
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to