@Shannon Concerning the issue with long checkpoints even though the snapshot is very short:
I found a critical issue with the Flink Kafka 0.9 Consumer - on low-throughput topics/partitions, it can lock up for a while, preventing checkpoints to be triggered (barriers injected). There is a fix going in now, probably a 1.1.3 release with that fix soon. On Thu, Sep 29, 2016 at 9:12 PM, Shannon Carey <sca...@expedia.com> wrote: > Hi Stephan! > > The failure appeared to occur every 10 minutes, which is also the interval > for checkpointing. However, I agree with you that the stack trace appears > to be independent. Could this perhaps be an issue with multithreading, > where the checkpoint mechanism is somehow interfering with ongoing > operation of the state backend? I've never seen this problem until now, so > I am a little suspicious that it might be due to something in my code, but > so far it's been difficult to figure out what that might be. > > I am using the default, SemiAsync snapshot mode. > > The classes of the data flow are a bit too large to put here in their > entirety. We are using Scala case classes, Java classes generated by Avro, > Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of > these classes have been operational in other jobs before. I added a unit > test for the class which contains a mutable.Map to see whether that was > causing a problem. Does this look like a reasonable unit test to verify > Flink serializability to you? > > it("roundtrip serializes in Flink") { > val millis: Long = TimeUnit.DAYS.toMillis(2) > val original: PreferredAirportDailySum = new > PreferredAirportDailySum(millis) > original.add("a", TimestampedAirportCount(4, 6)) > original.add("b", TimestampedAirportCount(7, 8)) > > val deserialized: PreferredAirportDailySum = > serializationRoundTrip(original, 100) > > deserialized.timestamp shouldBe millis > deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6) > deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8) > } > > def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, > expectedMaxBytes: Int): T = { > val typeInfo = implicitly[TypeInformation[T]] > val serializer: TypeSerializer[T] = typeInfo.createSerializer(new > ExecutionConfig) > > val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes) > val outputView: DataOutputView = new DataOutputViewStreamWrapper(out) > serializer.serialize(original, outputView) > > out.size() should be <= expectedMaxBytes > > val inputView: DataInputViewStreamWrapper = > new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)) > val deserialized: T = serializer.deserialize(inputView) > > deserialized > } > > I tried running my job in a local one-slot cluster with RocksDB enabled > but checkpointing to local filesystem. Similar errors occur, but are more > sporadic. I have not yet been able to capture the error while debugging, > but if I do I will provide additional information. > > I noticed that locally, execution only reaches > DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint > completes. Also, the timing of checkpointing a bit odd: in the example > below the checkpoint takes 200s to complete after being triggered even > though RocksDB reports that it only took ~100ms. > > 2016-09-29 12:56:17,619 INFO CheckpointCoordinator - Triggering > checkpoint 2 @ 1475171777619 > 2016-09-29 12:59:38,079 INFO RocksDBStateBackend - RocksDB > (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db) > backup (synchronous part) took 7 ms. > 2016-09-29 12:59:38,214 INFO RocksDBStateBackend - RocksDB > materialization from /var/folders/…/WindowOperator_ > 38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to > file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2 > (asynchronous part) took 96 ms. > 2016-09-29 12:59:38,333 INFO CheckpointCoordinator - Completed > checkpoint 2 (in 200621 ms) > > Do you have any other advice? > > Exceptions from local execution: > > java.lang.RuntimeException: Error while adding data to RocksDB > at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add( > RocksDBFoldingState.java:125) > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:382) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor. > processInput(StreamInputProcessor.java:176) > at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask.java:66) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: > 'CLE > at com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:138) > at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > DefaultClassResolver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( > CollectionSerializer.java:116) > at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( > CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:232) > at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize( > CaseClassSerializer.scala:113) > at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize( > CaseClassSerializer.scala:30) > at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add( > RocksDBFoldingState.java:118) > ... 6 more > Caused by: java.lang.ClassNotFoundException: 'CLE > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:136) > ... 16 more > > After that one happened, this one happened many times: > > java.lang.RuntimeException: Failed to deserialize state handle and setup > initial operator state. > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Unable to deserialize default value. > at org.apache.flink.api.common.state.StateDescriptor. > readObject(StateDescriptor.java:285) > at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at java.util.ArrayList.readObject(ArrayList.java:791) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at java.util.HashMap.readObject(HashMap.java:1396) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:291) > at org.apache.flink.util.SerializedValue.deserializeValue( > SerializedValue.java:58) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542) > ... 1 more > Caused by: java.lang.StringIndexOutOfBoundsException: String index out of > range: -2 > at java.lang.String.<init>(String.java:196) > at com.esotericsoftware.kryo.io.Input.readString(Input.java:466) > at com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:132) > at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > DefaultClassResolver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( > CollectionSerializer.java:116) > at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( > CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:232) > at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize( > CaseClassSerializer.scala:113) > at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize( > CaseClassSerializer.scala:30) > at org.apache.flink.api.common.state.StateDescriptor. > readObject(StateDescriptor.java:282) > ... 44 more > > During another execution, this one occurred several times: > > java.lang.RuntimeException: Failed to deserialize state handle and setup > initial operator state. > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Unable to deserialize default value. > at org.apache.flink.api.common.state.StateDescriptor. > readObject(StateDescriptor.java:285) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at java.util.ArrayList.readObject(ArrayList.java:791) > at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at java.util.HashMap.readObject(HashMap.java:1396) > at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:291) > at org.apache.flink.util.SerializedValue.deserializeValue( > SerializedValue.java:58) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542) > ... 1 more > Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: > # > at com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:138) > at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > DefaultClassResolver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:232) > at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize( > CaseClassSerializer.scala:113) > at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize( > CaseClassSerializer.scala:30) > at org.apache.flink.api.common.state.StateDescriptor. > readObject(StateDescriptor.java:282) > ... 45 more > Caused by: java.lang.ClassNotFoundException: # > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:136) > ... 52 more > > > > From: Stephan Ewen <se...@apache.org> > Date: Wednesday, September 28, 2016 at 1:18 PM > To: <user@flink.apache.org> > Subject: Re: Error while adding data to RocksDB: No more bytes left > > Hi Shannon! > > The stack trace you pasted is independent of checkpointing - it seems to > be from the regular processing. Does this only happen when checkpoints are > activated? > > Can you also share which checkpoint method you use? > - FullyAsynchronous > - SemiAsynchronous > > I think there are two possibilities for what can happen > - There is a serialization inconsistency in the Serializers. If that is > the case, this error should occur almost in a deterministic fashion. To > debug that, would be good to know which data types you are using. > - There is a bug in RocksDB (or Flink's wrapping of it) where data gets > corrupted when using the snapshot feature. That would explain why this only > occurs when checkpoints are happening. > > Greetings, > Stephan > > > On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <sca...@expedia.com> wrote: > >> It appears that when one of my jobs tries to checkpoint, the following >> exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB >> checkpoints are being saved to S3. >> >> java.lang.RuntimeException: Error while adding data to RocksDB >> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState >> .add(RocksDBFoldingState.java:125) >> at org.apache.flink.streaming.runtime.operators.windowing.Windo >> wOperator.processElement(WindowOperator.java:382) >> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p >> rocessInput(StreamInputProcessor.java:176) >> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask. >> run(OneInputStreamTask.java:66) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:266) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.EOFException: No more bytes left. >> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput. >> require(NoFetchingInput.java:77) >> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690) >> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685) >> at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$Unsa >> feLongField.read(UnsafeCacheFields.java:160) >> at com.esotericsoftware.kryo.serializers.FieldSerializer.read( >> FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java: >> 761) >> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali >> zer.deserialize(KryoSerializer.java:232) >> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.des >> erialize(CaseClassSerializer.scala:113) >> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.des >> erialize(CaseClassSerializer.scala:30) >> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState >> .add(RocksDBFoldingState.java:118) >> ... 6 more >> >> Thanks for any help! >> >> Shannon >> > >