@Shannon Concerning the issue with long checkpoints even though the
snapshot is very short:

I found a critical issue with the Flink Kafka 0.9 Consumer - on
low-throughput topics/partitions, it can lock up for a while, preventing
checkpoints to be triggered (barriers injected).

There is a fix going in now, probably a 1.1.3 release with that fix soon.

On Thu, Sep 29, 2016 at 9:12 PM, Shannon Carey <sca...@expedia.com> wrote:

> Hi Stephan!
>
> The failure appeared to occur every 10 minutes, which is also the interval
> for checkpointing. However, I agree with you that the stack trace appears
> to be independent. Could this perhaps be an issue with multithreading,
> where the checkpoint mechanism is somehow interfering with ongoing
> operation of the state backend? I've never seen this problem until now, so
> I am a little suspicious that it might be due to something in my code, but
> so far it's been difficult to figure out what that might be.
>
> I am using the default, SemiAsync snapshot mode.
>
> The classes of the data flow are a bit too large to put here in their
> entirety. We are using Scala case classes, Java classes generated by Avro,
> Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of
> these classes have been operational in other jobs before. I added a unit
> test for the class which contains a mutable.Map to see whether that was
> causing a problem. Does this look like a reasonable unit test to verify
> Flink serializability to you?
>
> it("roundtrip serializes in Flink") {
>   val millis: Long = TimeUnit.DAYS.toMillis(2)
>   val original: PreferredAirportDailySum = new 
> PreferredAirportDailySum(millis)
>   original.add("a", TimestampedAirportCount(4, 6))
>   original.add("b", TimestampedAirportCount(7, 8))
>
>   val deserialized: PreferredAirportDailySum = 
> serializationRoundTrip(original, 100)
>
>   deserialized.timestamp shouldBe millis
>   deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
>   deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
> }
>
> def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, 
> expectedMaxBytes: Int): T = {
>   val typeInfo = implicitly[TypeInformation[T]]
>   val serializer: TypeSerializer[T] = typeInfo.createSerializer(new 
> ExecutionConfig)
>
>   val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
>   val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
>   serializer.serialize(original, outputView)
>
>   out.size() should be <= expectedMaxBytes
>
>   val inputView: DataInputViewStreamWrapper =
>     new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
>   val deserialized: T = serializer.deserialize(inputView)
>
>   deserialized
> }
>
> I tried running my job in a local one-slot cluster with RocksDB enabled
> but checkpointing to local filesystem. Similar errors occur, but are more
> sporadic. I have not yet been able to capture the error while debugging,
> but if I do I will provide additional information.
>
> I noticed that locally, execution only reaches
> DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint
> completes. Also, the timing of checkpointing a bit odd: in the example
> below the checkpoint takes 200s to complete after being triggered even
> though RocksDB reports that it only took ~100ms.
>
> 2016-09-29 12:56:17,619 INFO  CheckpointCoordinator     - Triggering
> checkpoint 2 @ 1475171777619
> 2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB
> (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db)
> backup (synchronous part) took 7 ms.
> 2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB
> materialization from /var/folders/…/WindowOperator_
> 38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to
> file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2
> (asynchronous part) took 96 ms.
> 2016-09-29 12:59:38,333 INFO  CheckpointCoordinator     - Completed
> checkpoint 2 (in 200621 ms)
>
> Do you have any other advice?
>
> Exceptions from local execution:
>
> java.lang.RuntimeException: Error while adding data to RocksDB
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(
> RocksDBFoldingState.java:125)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:382)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:176)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> 'CLE
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(
> RocksDBFoldingState.java:118)
> ... 6 more
> Caused by: java.lang.ClassNotFoundException: 'CLE
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> ... 16 more
>
> After that one happened, this one happened many times:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to deserialize default value.
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:285)
> at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.ArrayList.readObject(ArrayList.java:791)
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:291)
> at org.apache.flink.util.SerializedValue.deserializeValue(
> SerializedValue.java:58)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
> ... 1 more
> Caused by: java.lang.StringIndexOutOfBoundsException: String index out of
> range: -2
> at java.lang.String.<init>(String.java:196)
> at com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:132)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:282)
> ... 44 more
>
> During another execution, this one occurred several times:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to deserialize default value.
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:285)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.ArrayList.readObject(ArrayList.java:791)
> at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:291)
> at org.apache.flink.util.SerializedValue.deserializeValue(
> SerializedValue.java:58)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
> ... 1 more
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> #
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:282)
> ... 45 more
> Caused by: java.lang.ClassNotFoundException: #
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> ... 52 more
>
>
>
> From: Stephan Ewen <se...@apache.org>
> Date: Wednesday, September 28, 2016 at 1:18 PM
> To: <user@flink.apache.org>
> Subject: Re: Error while adding data to RocksDB: No more bytes left
>
> Hi Shannon!
>
> The stack trace you pasted is independent of checkpointing - it seems to
> be from the regular processing. Does this only happen when checkpoints are
> activated?
>
> Can you also share which checkpoint method you use?
>   - FullyAsynchronous
>   - SemiAsynchronous
>
> I think there are two possibilities for what can happen
>   - There is a serialization inconsistency in the Serializers. If that is
> the case, this error should occur almost in a deterministic fashion. To
> debug that, would be good to know which data types you are using.
>   - There is a bug in RocksDB (or Flink's wrapping of it) where data gets
> corrupted when using the snapshot feature. That would explain why this only
> occurs when checkpoints are happening.
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <sca...@expedia.com> wrote:
>
>> It appears that when one of my jobs tries to checkpoint, the following
>> exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB
>> checkpoints are being saved to S3.
>>
>> java.lang.RuntimeException: Error while adding data to RocksDB
>>         at org.apache.flink.contrib.streaming.state.RocksDBFoldingState
>> .add(RocksDBFoldingState.java:125)
>>         at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.processElement(WindowOperator.java:382)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:176)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:66)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:266)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.EOFException: No more bytes left.
>>         at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.
>> require(NoFetchingInput.java:77)
>>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>         at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$Unsa
>> feLongField.read(UnsafeCacheFields.java:160)
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:
>> 761)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:232)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.des
>> erialize(CaseClassSerializer.scala:113)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.des
>> erialize(CaseClassSerializer.scala:30)
>>         at org.apache.flink.contrib.streaming.state.RocksDBFoldingState
>> .add(RocksDBFoldingState.java:118)
>>         ... 6 more
>>
>> Thanks for any help!
>>
>> Shannon
>>
>
>

Reply via email to