[ 
https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668615#comment-16668615
 ] 

Michał Ciesielczyk commented on FLINK-10714:
--------------------------------------------

[~xiaogang.shi] I've tried other serializers but with no luck. In the example I 
just provided, the state value is a simple 
scala.collection.immutable.Map[String, Any], where the value may be an Int or 
String.

However, when I provide a uniform value type (e.g. Int) in the HashMap, the 
error does not seem to occur.

> java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10714
>                 URL: https://issues.apache.org/jira/browse/FLINK-10714
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 1.5.5, 1.6.2
>         Environment: Flink 1.6.2, FsStateBackend
>            Reporter: Michał Ciesielczyk
>            Priority: Blocker
>             Fix For: 1.7.0
>
>         Attachments: FailingJob.scala
>
>
> I'm sometimes getting error while creating a checkpointing using a filesystem 
> state backend. This ONLY happens when asynchronous snapshots are enabled 
> using the FileSystem State Backend. When RocksDB is enabled everything works 
> fine.
>  
> I'm using a simple KeyedStream,mapWithState function with a ValueState 
> holding a  hashmap (scala.collection.immutable.Map). It's hard to reproduce 
> the error using a simple code snippet, as the error occurs randomly.
>  
> This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), 
> but I'm still experiencing such behavior.
>   
> Stacktrace:
>  
> {code:java}
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>     at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172]
>     at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172]
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
>  ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>  ~[flink-core-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 
> [flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
> {code}
>  
> Edit:
> I've attached a code sample for a Flink job that should reproduce the error 
> (FailingJob.scala). Bare in mind, that the failures are not deterministic. On 
> my local env, Flink usually creates from 10 to 2000 checkpoints before it 
> fails.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to