Hi all, I'm using protobufs as keys of a Flink stream using code copied from this pull request <https://github.com/apache/flink/pull/7598>, but deserialization is failing after checkpoint restore due to missing data.
I'm using HDFS and the RocksDB backend. I tried providing the path to a previous retained checkpoint (i.e., .../flink-checkpoints/{jobIdHex}/chk-14/_metadata). The proto deserializer failed on a serialized record that had been truncated and was missing its last 20 bytes out of 77 total. The same serializers work fine if I don't try restoring from a checkpoint, worked fine for a different job, are fairly well unit-tested, and mostly just delegate to the protobuf serde code so I'm pretty certain my serializer is not the issue. Which means I'm doing something else wrong. Questions: 1. Have others encountered issues like this? 2. How do I know when a checkpoint has been completed and is safe to restore from? (Is checkpoint completion atomic?) Thanks, Jeff Martin
2019-11-13 12:16:35 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.types.DeserializationException: Could not deserialize class com.mycompany.myproduct.persistence.InternalProto$SummariesKey at com.mycompany.myproduct.flink.proto.ProtoSerializer.lambda$ensureParseFromIsSet$0(ProtoSerializer.java:102) at com.mycompany.myproduct.flink.proto.ProtoSerializer.deserialize(ProtoSerializer.java:114) at com.mycompany.myproduct.flink.proto.ProtoKeySerializer.deserialize(ProtoKeySerializer.java:71) at com.mycompany.myproduct.flink.proto.ProtoKeySerializer.deserialize(ProtoKeySerializer.java:17) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$LegacyTimerSerializer.deserialize(InternalTimersSnapshotReaderWriters.java:425) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotReader.readTimersSnapshot(InternalTimersSnapshotReaderWriters.java:284) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:115) at org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:71) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:153) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:224) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:158) ... 6 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor1806.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.mycompany.myproduct.flink.proto.ProtoSerializer.lambda$ensureParseFromIsSet$0(ProtoSerializer.java:100) ... 16 more Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:102) at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:627) at com.mycompany.myproduct.persistence.Proto$MyProductId.<init>(Proto.java:33346) at com.mycompany.myproduct.persistence.Proto$MyProductId.<init>(Proto.java:33307) at com.mycompany.myproduct.persistence.Proto$MyProductId$1.parsePartialFrom(Proto.java:33970) at com.mycompany.myproduct.persistence.Proto$MyProductId$1.parsePartialFrom(Proto.java:33964) at com.google.protobuf.CodedInputStream$ArrayDecoder.readMessage(CodedInputStream.java:888) at com.mycompany.myproduct.persistence.InternalProto$SummariesKey.<init>(InternalProto.java:1825) at com.mycompany.myproduct.persistence.InternalProto$SummariesKey.<init>(InternalProto.java:1765) at com.mycompany.myproduct.persistence.InternalProto$SummariesKey$1.parsePartialFrom(InternalProto.java:2526) at com.mycompany.myproduct.persistence.InternalProto$SummariesKey$1.parsePartialFrom(InternalProto.java:2520) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) at com.mycompany.myproduct.persistence.InternalProto$SummariesKey.parseFrom(InternalProto.java:2021) ... 20 more