Ruiqi Dong created KAFKA-20658:
----------------------------------
Summary: Cast SMT encodes ByteBuffer BYTES from index 0 instead of
remaining bytes
Key: KAFKA-20658
URL: https://issues.apache.org/jira/browse/KAFKA-20658
Project: Kafka
Issue Type: Bug
Components: connect
Reporter: Ruiqi Dong
*Summary*
The `Cast` single message transform converts BYTES values to strings by
Base64-encoding the bytes. For `ByteBuffer` values, it calls
`Utils.readBytes(byteBuffer)`. `Utils.readBytes(ByteBuffer)` reads from index
`0` to `limit`, not from the buffer's current `position` to `limit`. As a
result, a sliced or partially consumed `ByteBuffer` is cast using bytes outside
the logical BYTES value.
*Affected code*
File:
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
{code:java}
private static String castToString(Object value) {
if (value instanceof java.util.Date dateValue) {
return Values.dateFormatFor(dateValue).format(dateValue);
} else if (value instanceof ByteBuffer byteBuffer) {
return Base64.getEncoder().encodeToString(Utils.readBytes(byteBuffer));
} else if (value instanceof byte[] rawBytes) {
return Base64.getEncoder().encodeToString(rawBytes);
} else {
return value.toString();
}
} {code}
File: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
{code:java}
public static byte[] readBytes(ByteBuffer buffer) {
return Utils.readBytes(buffer, 0, buffer.limit());
} {code}
*Reproducer*
Add this test to
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java:
{code:java}
@Test
public void castSlicedByteBufferFieldToStringUsesRemainingBytes() {
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "bytes:string"));
Schema schema = SchemaBuilder.struct()
.field("bytes", Schema.BYTES_SCHEMA)
.build();
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
byteBuffer.position(1);
byteBuffer.limit(3);
Struct value = new Struct(schema).put("bytes", byteBuffer);
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
"topic", 0, schema, value));
assertEquals("AgM=", ((Struct) transformed.value()).get("bytes"));
} {code}
Run:
{code:java}
./gradlew -q :connect:transforms:test \
--tests
org.apache.kafka.connect.transforms.CastTest.castSlicedByteBufferFieldToStringUsesRemainingBytes
{code}
Observed behavior:
The test fails. `AgM=` is Base64 for `\{2,3}`. `AQID` is Base64 for `\{1,2,3}`.
{code:java}
expected: <AgM=> but was: <AQID> {code}
Expected behavior:
Casting a Connect BYTES `ByteBuffer` to string should Base64-encode the
buffer's remaining bytes, matching the logical BYTES value.
Connect BYTES supports `ByteBuffer`. Other Connect conversion paths use
`Utils.toArray(ByteBuffer)` for this exact representation. `Cast` currently
includes bytes before the buffer position, so it can emit a string for data
that is not part of the value being transformed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)