Ruiqi Dong created KAFKA-20659:
----------------------------------

             Summary: MirrorMaker Checkpoint and Heartbeat records can accept 
unsupported versions when assertions are disabled
                 Key: KAFKA-20659
                 URL: https://issues.apache.org/jira/browse/KAFKA-20659
             Project: Kafka
          Issue Type: Bug
          Components: connect
            Reporter: Ruiqi Dong


*Summary*
`Checkpoint.deserializeRecord(...)` and `Heartbeat.deserializeRecord(...)` read 
a version field from the record header, but the version check is implemented 
only as:
{code:java}
assert version == 0; {code}
This is not a reliable runtime validation mechanism. In normal production JVMs, 
assertions are disabled by default, so the check disappears, and an unknown 
future version is parsed with the v0 schema. In assertion-enabled test runs, 
the same input fails with `AssertionError`, which is still not an appropriate 
public failure mode for malformed record data.
*Affected code*
File: 
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
{code:java}
private static Schema valueSchema(short version) {
    assert version == 0;
    return VALUE_SCHEMA_V0;
} {code}
File: 
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
{code:java}
private static Schema valueSchema(short version) {
    assert version == 0;
    return VALUE_SCHEMA_V0;
} {code}
*Reproducer*
Add this test to 
connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorRecordVersionTest.java:

The test expects an explicit Kafka exception. In assertion-enabled test runs, 
it currently observes `AssertionError`; in normal `-da` JVMs, the `assert` is 
skipped and the unsupported version is interpreted as v0.
{code:java}
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;

import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;

import static org.junit.jupiter.api.Assertions.assertThrows;

public class MirrorRecordVersionTest {

    @Test
    public void checkpointDeserializeRejectsUnsupportedValueVersion() {
        Checkpoint checkpoint = new Checkpoint("group", new 
TopicPartition("topic", 0), 10L, 20L, "metadata");
        ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
                "checkpoints",
                0,
                0,
                checkpoint.recordKey(),
                checkpointValueWithVersion((short) (Checkpoint.VERSION + 1))
        );

        assertThrows(UnsupportedVersionException.class, () -> 
Checkpoint.deserializeRecord(record));
    }

    @Test
    public void heartbeatDeserializeRejectsUnsupportedValueVersion() {
        Heartbeat heartbeat = new Heartbeat("source", "target", 123L);
        ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
                "heartbeats",
                0,
                0,
                heartbeat.recordKey(),
                heartbeatValueWithVersion((short) (Heartbeat.VERSION + 1))
        );

        assertThrows(UnsupportedVersionException.class, () -> 
Heartbeat.deserializeRecord(record));
    }

    private static byte[] checkpointValueWithVersion(short version) {
        Struct header = new Struct(Checkpoint.HEADER_SCHEMA);
        header.set(Checkpoint.VERSION_KEY, version);
        Struct value = new Struct(Checkpoint.VALUE_SCHEMA_V0);
        value.set(Checkpoint.UPSTREAM_OFFSET_KEY, 10L);
        value.set(Checkpoint.DOWNSTREAM_OFFSET_KEY, 20L);
        value.set(Checkpoint.METADATA_KEY, "metadata");
        return write(Checkpoint.HEADER_SCHEMA, header, 
Checkpoint.VALUE_SCHEMA_V0, value);
    }

    private static byte[] heartbeatValueWithVersion(short version) {
        Struct header = new Struct(Heartbeat.HEADER_SCHEMA);
        header.set(Heartbeat.VERSION_KEY, version);
        Struct value = new Struct(Heartbeat.VALUE_SCHEMA_V0);
        value.set(Heartbeat.TIMESTAMP_KEY, 123L);
        return write(Heartbeat.HEADER_SCHEMA, header, 
Heartbeat.VALUE_SCHEMA_V0, value);
    }

    private static byte[] write(Schema headerSchema, Struct header, Schema 
valueSchema, Struct value) {
        ByteBuffer buffer = ByteBuffer.allocate(headerSchema.sizeOf(header) + 
valueSchema.sizeOf(value));
        headerSchema.write(buffer, header);
        valueSchema.write(buffer, value);
        buffer.flip();
        byte[] result = new byte[buffer.remaining()];
        buffer.get(result);
        return result;
    }
} {code}
Run:
{code:java}
./gradlew -q :connect:mirror-client:test --tests MirrorRecordVersionTest {code}
Observed behavior:
With assertions enabled, both tests fail. The stack traces point to 
`Checkpoint.valueSchema(...)` and `Heartbeat.valueSchema(...)`. With assertions 
disabled, the same version check is not executed, so the record is parsed using 
`VALUE_SCHEMA_V0`.
{code:java}
Unexpected exception type thrown,
expected: UnsupportedVersionException
but was: AssertionError {code}
Expected behavior:
Unsupported record versions should be rejected deterministically, regardless of 
the JVM assertion setting, with an explicit runtime exception such as 
`UnsupportedVersionException`.
MirrorMaker checkpoint records determine translated consumer offsets, and 
heartbeat records determine replication path/liveness. Silently interpreting a 
future or corrupt record version as v0 can misread replication metadata. The 
`AssertionError` observed under `-ea` is a secondary symptom; the main issue is 
that the runtime validation is absent under the default `-da` setting.
The fix direction is to replace the `assert` with an explicit runtime check:
{code:java}
if (version != VERSION) {
    throw new UnsupportedVersionException("Unsupported version " + version);
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to