Hi all,

I'm using protobufs as keys of a Flink stream using code copied from this
pull request <https://github.com/apache/flink/pull/7598>, but
deserialization is failing after checkpoint restore due to missing data.

I'm using HDFS and the RocksDB backend. I tried providing the path to a
previous retained checkpoint (i.e.,
.../flink-checkpoints/{jobIdHex}/chk-14/_metadata). The proto deserializer
failed on a serialized record that had been truncated and was missing its
last 20 bytes out of 77 total.

The same serializers work fine if I don't try restoring from a checkpoint,
worked fine for a different job, are fairly well unit-tested, and mostly
just delegate to the protobuf serde code so I'm pretty certain my
serializer is not the issue. Which means I'm doing something else wrong.

Questions:
1. Have others encountered issues like this?
2. How do I know when a checkpoint has been completed and is safe to
restore from? (Is checkpoint completion atomic?)

Thanks,

Jeff Martin
2019-11-13 12:16:35
java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.types.DeserializationException: Could not 
deserialize class com.mycompany.myproduct.persistence.InternalProto$SummariesKey
        at 
com.mycompany.myproduct.flink.proto.ProtoSerializer.lambda$ensureParseFromIsSet$0(ProtoSerializer.java:102)
        at 
com.mycompany.myproduct.flink.proto.ProtoSerializer.deserialize(ProtoSerializer.java:114)
        at 
com.mycompany.myproduct.flink.proto.ProtoKeySerializer.deserialize(ProtoKeySerializer.java:71)
        at 
com.mycompany.myproduct.flink.proto.ProtoKeySerializer.deserialize(ProtoKeySerializer.java:17)
        at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$LegacyTimerSerializer.deserialize(InternalTimersSnapshotReaderWriters.java:425)
        at 
org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotReader.readTimersSnapshot(InternalTimersSnapshotReaderWriters.java:284)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:115)
        at 
org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:71)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:153)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:224)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:158)
        ... 6 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedMethodAccessor1806.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
com.mycompany.myproduct.flink.proto.ProtoSerializer.lambda$ensureParseFromIsSet$0(ProtoSerializer.java:100)
        ... 16 more
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message 
contained an invalid tag (zero).
        at 
com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:102)
        at 
com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:627)
        at 
com.mycompany.myproduct.persistence.Proto$MyProductId.<init>(Proto.java:33346)
        at 
com.mycompany.myproduct.persistence.Proto$MyProductId.<init>(Proto.java:33307)
        at 
com.mycompany.myproduct.persistence.Proto$MyProductId$1.parsePartialFrom(Proto.java:33970)
        at 
com.mycompany.myproduct.persistence.Proto$MyProductId$1.parsePartialFrom(Proto.java:33964)
        at 
com.google.protobuf.CodedInputStream$ArrayDecoder.readMessage(CodedInputStream.java:888)
        at 
com.mycompany.myproduct.persistence.InternalProto$SummariesKey.<init>(InternalProto.java:1825)
        at 
com.mycompany.myproduct.persistence.InternalProto$SummariesKey.<init>(InternalProto.java:1765)
        at 
com.mycompany.myproduct.persistence.InternalProto$SummariesKey$1.parsePartialFrom(InternalProto.java:2526)
        at 
com.mycompany.myproduct.persistence.InternalProto$SummariesKey$1.parsePartialFrom(InternalProto.java:2520)
        at 
com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
        at 
com.mycompany.myproduct.persistence.InternalProto$SummariesKey.parseFrom(InternalProto.java:2021)
        ... 20 more

Reply via email to