Ruiqi Dong created KAFKA-20658:
----------------------------------

             Summary: Cast SMT encodes ByteBuffer BYTES from index 0 instead of 
remaining bytes
                 Key: KAFKA-20658
                 URL: https://issues.apache.org/jira/browse/KAFKA-20658
             Project: Kafka
          Issue Type: Bug
          Components: connect
            Reporter: Ruiqi Dong


*Summary*
The `Cast` single message transform converts BYTES values to strings by 
Base64-encoding the bytes. For `ByteBuffer` values, it calls 
`Utils.readBytes(byteBuffer)`. `Utils.readBytes(ByteBuffer)` reads from index 
`0` to `limit`, not from the buffer's current `position` to `limit`. As a 
result, a sliced or partially consumed `ByteBuffer` is cast using bytes outside 
the logical BYTES value.
 
*Affected code*
File: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
{code:java}
private static String castToString(Object value) {
    if (value instanceof java.util.Date dateValue) {
        return Values.dateFormatFor(dateValue).format(dateValue);
    } else if (value instanceof ByteBuffer byteBuffer) {
        return Base64.getEncoder().encodeToString(Utils.readBytes(byteBuffer));
    } else if (value instanceof byte[] rawBytes) {
        return Base64.getEncoder().encodeToString(rawBytes);
    } else {
        return value.toString();
    }
} {code}
File: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
{code:java}
public static byte[] readBytes(ByteBuffer buffer) {
    return Utils.readBytes(buffer, 0, buffer.limit());
} {code}
 
*Reproducer*
Add this test to 
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java:
{code:java}
@Test
public void castSlicedByteBufferFieldToStringUsesRemainingBytes() {
    xformValue.configure(Map.of(Cast.SPEC_CONFIG, "bytes:string"));

    Schema schema = SchemaBuilder.struct()
            .field("bytes", Schema.BYTES_SCHEMA)
            .build();
    ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
    byteBuffer.position(1);
    byteBuffer.limit(3);

    Struct value = new Struct(schema).put("bytes", byteBuffer);
    SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, 
"topic", 0, schema, value));

    assertEquals("AgM=", ((Struct) transformed.value()).get("bytes"));
} {code}
Run:
{code:java}
./gradlew -q :connect:transforms:test \
  --tests 
org.apache.kafka.connect.transforms.CastTest.castSlicedByteBufferFieldToStringUsesRemainingBytes
 {code}
Observed behavior:
The test fails. `AgM=` is Base64 for `\{2,3}`. `AQID` is Base64 for `\{1,2,3}`.
{code:java}
expected: <AgM=> but was: <AQID> {code}
Expected behavior:
Casting a Connect BYTES `ByteBuffer` to string should Base64-encode the 
buffer's remaining bytes, matching the logical BYTES value.
 
 
Connect BYTES supports `ByteBuffer`. Other Connect conversion paths use 
`Utils.toArray(ByteBuffer)` for this exact representation. `Cast` currently 
includes bytes before the buffer position, so it can emit a string for data 
that is not part of the value being transformed.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to