Hi Juan, count() and collect() trigger the execution of a job. Since Flink does not cache intermediate results (yet), all operations from the sink (count()/collect()) to the sources are executed. So in a sense a DataSet is immutable (given that the input of the sources do not change) but completely recomputed for every execution.
There are currently some efforts [1] on the way to improve Flink behavior for interactive sessions. Best, Fabian [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink [2] https://lists.apache.org/thread.html/5f4961f1dfe23204631fd6f2b3227724ce9831f462737f51742a52c1@%3Cdev.flink.apache.org%3E Am Fr., 26. Apr. 2019 um 17:03 Uhr schrieb Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com>: > Hi Timo, > > Thanks for your answer. I was surprised to have problems calling those > methods concurrently, because I though data sets were immutable. Now I > understand calling count or collect mutates the data set, not its contents > but some kind of execution plan included in the data set. > > I suggest adding a remark about this lack of thread safety to the > documentation. Maybe it’s already there but I haven’t seen it. I also > understand repeated calls to collect and count the safe data set are ok as > long as they are done sequentially, and not concurrently. > > Thanks, > > Juan > > On Fri, Apr 26, 2019 at 02:00 Timo Walther <twal...@apache.org> wrote: > >> Hi Juan, >> >> as far as I know we do not provide any concurrency guarantees for count() >> or collect(). Those methods need to be used with caution anyways as the >> result size must not exceed a certain threshold. I will loop in Fabian who >> might know more about the internals of the execution? >> >> Regards, >> Timo >> >> >> Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá: >> >> Any thoughts on this? >> >> On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá < >> juan.rodriguez.hort...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a very simple program using the local execution environment, that >>> throws NPE and other exceptions related to concurrent access when launching >>> a count for a DataSet from different threads. The program is >>> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which >>> is basically this: >>> >>> def doubleCollectConcurrent = { >>> val env = ExecutionEnvironment.createLocalEnvironment(3) >>> val xs = env.fromCollection(1 to 100).map{_+1} >>> implicit val ec = >>> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) >>> >>> val pendingActions = Seq.fill(10)( >>> Future { println(s"xs.count = ${xs.count}") } >>> ) >>> val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) >>> => >>> println("pending action finished") >>> Unit } >>> Await.result(pendingActionsFinished, 10 seconds) >>> >>> ok} >>> >>> >>> It looks like the issue is on OperatorTranslation.java at >>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51, >>> when a sink is added to the sinks list while that list is being traversed. >>> I have the impression that this is by design, so I'd like to confirm that >>> this is the expected behaviour, and whether this is happening only for the >>> local execution environment, or if this affects all execution environments >>> implementations. Other related questions I have are: >>> >>> - Is this documented somewhere? I'm quite new to Flink, so I might >>> have missed this. Is there any known workaround for concurrently >>> launching >>> counts and other sink computations on the same DataSet? >>> - Is it safe performing a sequence of calls to DataSet sink methods >>> like count or collect, on the same DataSet, as long as they are performed >>> from the same thread? From my experience it looks like it is, but I'd >>> like >>> to get a confirmation if possible. >>> >>> This might be related to >>> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink >>> but I'm not sure. >>> >>> Thanks a lot for your help. >>> >>> Greetings, >>> >>> Juan >>> >> >>