Hi Timo,

Thanks for your answer. I was surprised to have problems calling those
methods concurrently, because I though data sets were immutable. Now I
understand calling count or collect mutates the data set, not its contents
but some kind of execution plan included in the data set.

I suggest adding a remark about this lack of thread safety to the
documentation. Maybe it’s already there but I haven’t seen it. I also
understand repeated calls to collect and count the safe data set are ok as
long as they are done sequentially, and not concurrently.

Thanks,

Juan

On Fri, Apr 26, 2019 at 02:00 Timo Walther <twal...@apache.org> wrote:

> Hi Juan,
>
> as far as I know we do not provide any concurrency guarantees for count()
> or collect(). Those methods need to be used with caution anyways as the
> result size must not exceed a certain threshold. I will loop in Fabian who
> might know more about the internals of the execution?
>
> Regards,
> Timo
>
>
> Am 26.04.19 um 03:13 schrieb Juan Rodríguez Hortalá:
>
> Any thoughts on this?
>
> On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a very simple program using the local execution environment, that
>> throws NPE and other exceptions related to concurrent access when launching
>> a count for a DataSet from different threads. The program is
>> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is
>> basically this:
>>
>> def doubleCollectConcurrent = {
>>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>>   val xs = env.fromCollection(1 to 100).map{_+1}
>>   implicit val ec = 
>> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>>
>>   val pendingActions = Seq.fill(10)(
>>     Future { println(s"xs.count = ${xs.count}") }
>>   )
>>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) =>
>>     println("pending action finished")
>>     Unit  }
>>   Await.result(pendingActionsFinished, 10 seconds)
>>
>>   ok}
>>
>>
>> It looks like the issue is on OperatorTranslation.java at
>> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
>> when a sink is added to the sinks list while that list is being traversed.
>> I have the impression that this is by design, so I'd like to confirm that
>> this is the expected behaviour, and whether this is happening only for the
>> local execution environment, or if this affects all execution environments
>> implementations. Other related questions I have are:
>>
>>    - Is this documented somewhere? I'm quite new to Flink, so I might
>>    have missed this. Is there any known workaround for concurrently launching
>>    counts and other sink computations on the same DataSet?
>>    - Is it safe performing a sequence of calls to DataSet sink methods
>>    like count or collect, on the same DataSet, as long as they are performed
>>    from the same thread? From my experience it looks like it is, but I'd like
>>    to get a confirmation if possible.
>>
>> This might be related to
>> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
>> but I'm not sure.
>>
>> Thanks a lot for your help.
>>
>> Greetings,
>>
>> Juan
>>
>
>

Reply via email to