Hi Steve,

I’m assuming you are using Flink 1.2.x? If yes, then I’m afraid you 
re-discovered this issue: https://issues.apache.org/jira/browse/FLINK-6435 
<https://issues.apache.org/jira/browse/FLINK-6435>. It was fixed in Flink 
1.3.0. Is it possible for you to update to that version or do you think it’s 
important that we back port that fix to the Flink 1.2.x line?

Best,
Aljoscha

> On 6. Jun 2017, at 19:34, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Ok, thanks for letting us know. I’ll investigate.
>> On 6. Jun 2017, at 19:28, Steve Robert <srob...@qualys.com 
>> <mailto:srob...@qualys.com>> wrote:
>> 
>> Hi Aljoscha ,
>> 
>> thank you for your reply,
>>  yes the queue being filled up and no more elements are being processed.(In 
>> relation to the limit defined at the "orderedWait" function call).
>> To add additional information, if I run the test on a local cluster I can 
>> see that the job never ends because the AsyncFunction stay blocked As if 
>> there was no call to  the "collect" method
>> Best,
>> Steve
>> 
>> On Tue, Jun 6, 2017 at 4:56 PM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Hi,
>> 
>> As far as I know calling collect(Throwable) should also finish the promise 
>> that would otherwise fulfilled by successfully collecting a result. If not 
>> then you might have found a bug. What makes you think that the Thread is not 
>> being released? Is your queue being filled up and no more elements are being 
>> processed?
>> 
>> Regarding your other question, yes, you can collect an empty Collection for 
>> signalling that there was no result.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 8. May 2017, at 21:47, Steve Robert <srob...@qualys.com 
>>> <mailto:srob...@qualys.com>> wrote:
>>> 
>>> Hi guys, 
>>> 
>>> AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
>>> This scenario may be problematic when calling an external API
>>> In the case of a timeout error there is no data to collect.
>>> 
>>> for example :
>>> 
>>>   CompletableFuture.supplyAsync(() -> asyncCallTask(input))
>>>             .thenAccept((Collection<Tuple3<String, streamDTO, Integer>> 
>>> result) -> {
>>> 
>>>                 this.tupleEmited.getAndIncrement();
>>> 
>>>                 asyncCollector.collect(result);
>>>             })
>>>             .exceptionally((ex) -> {
>>>                 asyncCollector.collect(ex);
>>>                 return null;
>>>             });
>>> }
>>> it is possible to create an empty Collection and collect this empty 
>>> collection to force the Thread to be released but this workflow seems 
>>> strange to me.
>>> thank for your help
>>>  
>>> 
>>> -- 
>>> Steve Robert  <https://www.linkedin.com/company/qualys>
>>> Software Engineer
>>> srob...@qualys.com <mailto:srob...@qualys.com>
>>> T <>
>>> Qualys, Inc. – Continuous Security
>>> Blog <https://qualys.com/blog> | Community <https://community.qualys.com/> 
>>> | Twitter <https://twitter.com/qualys>
>>>  <https://www.qualys.com/email-banner>
>> 
>> 
>> 
>> -- 
>> Steve Robert  <https://www.linkedin.com/company/qualys>
>> Software Engineer
>> srob...@qualys.com <mailto:srob...@qualys.com>
>> T <>
>> Qualys, Inc. – Continuous Security
>> Blog <https://qualys.com/blog> | Community <https://community.qualys.com/> | 
>> Twitter <https://twitter.com/qualys>
>>  <https://www.qualys.com/email-banner>

Reply via email to