Hi Shannon! The stack trace you pasted is independent of checkpointing - it seems to be from the regular processing. Does this only happen when checkpoints are activated?
Can you also share which checkpoint method you use? - FullyAsynchronous - SemiAsynchronous I think there are two possibilities for what can happen - There is a serialization inconsistency in the Serializers. If that is the case, this error should occur almost in a deterministic fashion. To debug that, would be good to know which data types you are using. - There is a bug in RocksDB (or Flink's wrapping of it) where data gets corrupted when using the snapshot feature. That would explain why this only occurs when checkpoints are happening. Greetings, Stephan On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <sca...@expedia.com> wrote: > It appears that when one of my jobs tries to checkpoint, the following > exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB > checkpoints are being saved to S3. > > java.lang.RuntimeException: Error while adding data to RocksDB > at org.apache.flink.contrib.streaming.state. > RocksDBFoldingState.add(RocksDBFoldingState.java:125) > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:382) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor. > processInput(StreamInputProcessor.java:176) > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask.run(OneInputStreamTask.java:66) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.EOFException: No more bytes left. > at org.apache.flink.api.java.typeutils.runtime. > NoFetchingInput.require(NoFetchingInput.java:77) > at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690) > at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685) > at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$ > UnsafeLongField.read(UnsafeCacheFields.java:160) > at com.esotericsoftware.kryo.serializers.FieldSerializer. > read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo. > java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:232) > at org.apache.flink.api.scala.typeutils.CaseClassSerializer. > deserialize(CaseClassSerializer.scala:113) > at org.apache.flink.api.scala.typeutils.CaseClassSerializer. > deserialize(CaseClassSerializer.scala:30) > at org.apache.flink.contrib.streaming.state. > RocksDBFoldingState.add(RocksDBFoldingState.java:118) > ... 6 more > > Thanks for any help! > > Shannon >