Ruiqi Dong created KAFKA-20659:
----------------------------------
Summary: MirrorMaker Checkpoint and Heartbeat records can accept
unsupported versions when assertions are disabled
Key: KAFKA-20659
URL: https://issues.apache.org/jira/browse/KAFKA-20659
Project: Kafka
Issue Type: Bug
Components: connect
Reporter: Ruiqi Dong
*Summary*
`Checkpoint.deserializeRecord(...)` and `Heartbeat.deserializeRecord(...)` read
a version field from the record header, but the version check is implemented
only as:
{code:java}
assert version == 0; {code}
This is not a reliable runtime validation mechanism. In normal production JVMs,
assertions are disabled by default, so the check disappears, and an unknown
future version is parsed with the v0 schema. In assertion-enabled test runs,
the same input fails with `AssertionError`, which is still not an appropriate
public failure mode for malformed record data.
*Affected code*
File:
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
{code:java}
private static Schema valueSchema(short version) {
assert version == 0;
return VALUE_SCHEMA_V0;
} {code}
File:
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
{code:java}
private static Schema valueSchema(short version) {
assert version == 0;
return VALUE_SCHEMA_V0;
} {code}
*Reproducer*
Add this test to
connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorRecordVersionTest.java:
The test expects an explicit Kafka exception. In assertion-enabled test runs,
it currently observes `AssertionError`; in normal `-da` JVMs, the `assert` is
skipped and the unsupported version is interpreted as v0.
{code:java}
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class MirrorRecordVersionTest {
@Test
public void checkpointDeserializeRejectsUnsupportedValueVersion() {
Checkpoint checkpoint = new Checkpoint("group", new
TopicPartition("topic", 0), 10L, 20L, "metadata");
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
"checkpoints",
0,
0,
checkpoint.recordKey(),
checkpointValueWithVersion((short) (Checkpoint.VERSION + 1))
);
assertThrows(UnsupportedVersionException.class, () ->
Checkpoint.deserializeRecord(record));
}
@Test
public void heartbeatDeserializeRejectsUnsupportedValueVersion() {
Heartbeat heartbeat = new Heartbeat("source", "target", 123L);
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
"heartbeats",
0,
0,
heartbeat.recordKey(),
heartbeatValueWithVersion((short) (Heartbeat.VERSION + 1))
);
assertThrows(UnsupportedVersionException.class, () ->
Heartbeat.deserializeRecord(record));
}
private static byte[] checkpointValueWithVersion(short version) {
Struct header = new Struct(Checkpoint.HEADER_SCHEMA);
header.set(Checkpoint.VERSION_KEY, version);
Struct value = new Struct(Checkpoint.VALUE_SCHEMA_V0);
value.set(Checkpoint.UPSTREAM_OFFSET_KEY, 10L);
value.set(Checkpoint.DOWNSTREAM_OFFSET_KEY, 20L);
value.set(Checkpoint.METADATA_KEY, "metadata");
return write(Checkpoint.HEADER_SCHEMA, header,
Checkpoint.VALUE_SCHEMA_V0, value);
}
private static byte[] heartbeatValueWithVersion(short version) {
Struct header = new Struct(Heartbeat.HEADER_SCHEMA);
header.set(Heartbeat.VERSION_KEY, version);
Struct value = new Struct(Heartbeat.VALUE_SCHEMA_V0);
value.set(Heartbeat.TIMESTAMP_KEY, 123L);
return write(Heartbeat.HEADER_SCHEMA, header,
Heartbeat.VALUE_SCHEMA_V0, value);
}
private static byte[] write(Schema headerSchema, Struct header, Schema
valueSchema, Struct value) {
ByteBuffer buffer = ByteBuffer.allocate(headerSchema.sizeOf(header) +
valueSchema.sizeOf(value));
headerSchema.write(buffer, header);
valueSchema.write(buffer, value);
buffer.flip();
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}
} {code}
Run:
{code:java}
./gradlew -q :connect:mirror-client:test --tests MirrorRecordVersionTest {code}
Observed behavior:
With assertions enabled, both tests fail. The stack traces point to
`Checkpoint.valueSchema(...)` and `Heartbeat.valueSchema(...)`. With assertions
disabled, the same version check is not executed, so the record is parsed using
`VALUE_SCHEMA_V0`.
{code:java}
Unexpected exception type thrown,
expected: UnsupportedVersionException
but was: AssertionError {code}
Expected behavior:
Unsupported record versions should be rejected deterministically, regardless of
the JVM assertion setting, with an explicit runtime exception such as
`UnsupportedVersionException`.
MirrorMaker checkpoint records determine translated consumer offsets, and
heartbeat records determine replication path/liveness. Silently interpreting a
future or corrupt record version as v0 can misread replication metadata. The
`AssertionError` observed under `-ea` is a secondary symptom; the main issue is
that the runtime validation is absent under the default `-da` setting.
The fix direction is to replace the `assert` with an explicit runtime check:
{code:java}
if (version != VERSION) {
throw new UnsupportedVersionException("Unsupported version " + version);
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)