Till Rohrmann created FLINK-4829: ------------------------------------ Summary: Accumulators are not thread safe Key: FLINK-4829 URL: https://issues.apache.org/jira/browse/FLINK-4829 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.2.0 Reporter: Till Rohrmann Fix For: 1.2.0
Flink's {{Accumulators}} are not thread safe. With the introduction of live accumulator snapshots which are sent to the {{JobManager}}, we've introduced a concurrent access to accumulators without properly guard them against concurrent modifications. So if an accumulator snapshot is taken for an accumulator which is at the same time modified, it can cause an {{ConcurrentModificationException}} as it was reported by an user: {code} WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to serialize accumulators for task. java.util.ConcurrentModificationException at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) at java.util.TreeMap.writeObject(TreeMap.java:2436) at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.HashMap.internalWriteEntries(HashMap.java:1785) at java.util.HashMap.writeObject(HashMap.java:1362) at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52) at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58) at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)