Hi Juan,

count() and collect() trigger the execution of a job.
Since Flink does not cache intermediate results (yet), all operations from
the sink (count()/collect()) to the sources are executed.
So in a sense a DataSet is immutable (given that the input of the sources
do not change) but completely recomputed for every execution.

There are currently some efforts [1] on the way to improve Flink behavior
for interactive sessions.

Best, Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
[2]
https://lists.apache.org/thread.html/5f4961f1dfe23204631fd6f2b3227724ce9831f462737f51742a52c1@%3Cdev.flink.apache.org%3E

Am Fr., 26. Apr. 2019 um 17:03 Uhr schrieb Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi Timo,
>
> Thanks for your answer. I was surprised to have problems calling those
> methods concurrently, because I though data sets were immutable. Now I
> understand calling count or collect mutates the data set, not its contents
> but some kind of execution plan included in the data set.
>
> I suggest adding a remark about this lack of thread safety to the
> documentation. Maybe it’s already there but I haven’t seen it. I also
> understand repeated calls to collect and count the safe data set are ok as
> long as they are done sequentially, and not concurrently.
>
> Thanks,
>
> Juan
>
> On Fri, Apr 26, 2019 at 02:00 Timo Walther <twal...@apache.org> wrote:
>
>> Hi Juan,
>>
>> as far as I know we do not provide any concurrency guarantees for count()
>> or collect(). Those methods need to be used with caution anyways as the
>> result size must not exceed a certain threshold. I will loop in Fabian who
>> might know more about the internals of the execution?
>>
>> Regards,
>> Timo
>>
>>
>> Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
>>
>> Any thoughts on this?
>>
>> On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a very simple program using the local execution environment, that
>>> throws NPE and other exceptions related to concurrent access when launching
>>> a count for a DataSet from different threads. The program is
>>> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which
>>> is basically this:
>>>
>>> def doubleCollectConcurrent = {
>>>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>>>   val xs = env.fromCollection(1 to 100).map{_+1}
>>>   implicit val ec = 
>>> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>>>
>>>   val pendingActions = Seq.fill(10)(
>>>     Future { println(s"xs.count = ${xs.count}") }
>>>   )
>>>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) 
>>> =>
>>>     println("pending action finished")
>>>     Unit  }
>>>   Await.result(pendingActionsFinished, 10 seconds)
>>>
>>>   ok}
>>>
>>>
>>> It looks like the issue is on OperatorTranslation.java at
>>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
>>> when a sink is added to the sinks list while that list is being traversed.
>>> I have the impression that this is by design, so I'd like to confirm that
>>> this is the expected behaviour, and whether this is happening only for the
>>> local execution environment, or if this affects all execution environments
>>> implementations. Other related questions I have are:
>>>
>>>    - Is this documented somewhere? I'm quite new to Flink, so I might
>>>    have missed this. Is there any known workaround for concurrently 
>>> launching
>>>    counts and other sink computations on the same DataSet?
>>>    - Is it safe performing a sequence of calls to DataSet sink methods
>>>    like count or collect, on the same DataSet, as long as they are performed
>>>    from the same thread? From my experience it looks like it is, but I'd 
>>> like
>>>    to get a confirmation if possible.
>>>
>>> This might be related to
>>> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
>>> but I'm not sure.
>>>
>>> Thanks a lot for your help.
>>>
>>> Greetings,
>>>
>>> Juan
>>>
>>
>>

Reply via email to