Hi, I have a very simple program using the local execution environment, that throws NPE and other exceptions related to concurrent access when launching a count for a DataSet from different threads. The program is https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is basically this:
def doubleCollectConcurrent = { val env = ExecutionEnvironment.createLocalEnvironment(3) val xs = env.fromCollection(1 to 100).map{_+1} implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) val pendingActions = Seq.fill(10)( Future { println(s"xs.count = ${xs.count}") } ) val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) => println("pending action finished") Unit } Await.result(pendingActionsFinished, 10 seconds) ok } It looks like the issue is on OperatorTranslation.java at https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51, when a sink is added to the sinks list while that list is being traversed. I have the impression that this is by design, so I'd like to confirm that this is the expected behaviour, and whether this is happening only for the local execution environment, or if this affects all execution environments implementations. Other related questions I have are: - Is this documented somewhere? I'm quite new to Flink, so I might have missed this. Is there any known workaround for concurrently launching counts and other sink computations on the same DataSet? - Is it safe performing a sequence of calls to DataSet sink methods like count or collect, on the same DataSet, as long as they are performed from the same thread? From my experience it looks like it is, but I'd like to get a confirmation if possible. This might be related to https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink but I'm not sure. Thanks a lot for your help. Greetings, Juan