Ran Tao created FLINK-30935:
-------------------------------

             Summary: Add KafkaSerializer deserialize check when using 
SimpleVersionedSerializer
                 Key: FLINK-30935
                 URL: https://issues.apache.org/jira/browse/FLINK-30935
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.16.1
            Reporter: Ran Tao


{code:java}
@Override
public int getVersion() {
    return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
    try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
            DataInputStream in = new DataInputStream(bais)) {
        String topic = in.readUTF();
        int partition = in.readInt();
        long offset = in.readLong();
        long stoppingOffset = in.readLong();
        return new KafkaPartitionSplit(
                new TopicPartition(topic, partition), offset, stoppingOffset);
    }
} {code}
Current kafka serializers do not deal with version check. I think we can add it 
like many other connectors in case of incompatible or corrupt state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to