Hi Yukun, I think you've found a bug in the code. The accumulators don't seem to be really thread safe. I've created an issue to fix this issue [1]. Thanks for reporting the problem :-)
[1] https://issues.apache.org/jira/browse/FLINK-4829 Cheers, Till On Fri, Oct 14, 2016 at 8:32 AM, Yukun Guo <[email protected]> wrote: > This happens when the TaskManager is serializing an > org.apache.flink.api.common.accumulators.Histogram by iterating through > the underlying TreeMap while a MapFunction for updating the accumulator > attempts to modify the TreeMap concurrently. How could I fix it? > > > The call stack: > > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - > Failed to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator. > nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeWriteObject( > ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1496) > at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1509) > at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject( > ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeWriteObject( > ObjectStreamClass.java:1028) > at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1496) > at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject( > ObjectOutputStream.java:348) > at org.apache.flink.util.InstantiationUtil.serializeObject( > InstantiationUtil.java:301) > at org.apache.flink.util.SerializedValue.<init>( > SerializedValue.java:52) > at org.apache.flink.runtime.accumulators. > AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58) > at org.apache.flink.runtime.accumulators.AccumulatorRegistry. > getSnapshot(AccumulatorRegistry.java:75) > at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$ > sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > ... >
