[ 
https://issues.apache.org/jira/browse/FLINK-20159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231924#comment-17231924
 ] 

Jiangjie Qin commented on FLINK-20159:
--------------------------------------

[~sundaram] Thanks for reporting the issue. This part of the code has been 
refactored in 1.12, so the problem should have been solved in FLINK-19223. But 
we may need to fix this in 1.11.3. Will you submit a patch against release-1.11 
branch?

> [FLIP-27 source] FutureNotifier does not return a new future when 
> Future::future() is invoked within the returned future's callback
> -----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20159
>                 URL: https://issues.apache.org/jira/browse/FLINK-20159
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.11.2
>            Reporter: Sundaram Ananthanarayanan
>            Priority: Minor
>             Fix For: 1.12.0
>
>
> Here's the *problem*. FutureNotifier::future should return a new future every 
> time the previous future was completed. That's the expectation. However, if 
> the future is being requested from within the completion callback of the 
> previous future, then it, instead of returning a new future, returns the 
> existing future. This could potentially result in infinite recursions 
> depending on how the callback method is implemented. Here's an example:
>  
> {code:java}
> Consumer code:
> void consumeDataOnce() {
>   // get the data from the producer and check if it was empty
>   Data data = producer.getData();
>   // if data was empty, then grab the future and attach a callback as below
>   if (data.isEmpty()) {
>     producer.getCompletableFuture().thenRun(() -> consumeDataOnce());
>   }
> }
> {code}
>  
> In the above method, let's say the producer notified the consumer (produced 
> by FutureNofier::future), thinking that some data was available to be 
> consumed. Now let's say the data returned from the producer was instead empty 
> during the callback. In this case, the method goes on in an infinite loop 
> when the future is completed. 
>  
> *Issue:* If you observe FutureNotifier::notifyComplete's implementation 
> closely, you realize that the future is completed before the futureRef is 
> swapped with null. 
> {code:java}
> public void notifyComplete() {
>   CompletableFuture<Void> future = futureRef.get();
>   // If there are multiple threads trying to complete the future, only the 
> first one succeeds.
>   if (future != null && future.complete(null)) {
>     futureRef.compareAndSet(future, null);
>   }
> }
> {code}
> If we can change the ordering instead, where the future is swapped atomically 
> first before being completed, then we can guarantee that the future returned 
> by FutureNotifier::future will always be a new one if the previous one had 
> completed. 
>  
> [~sewen] [~jqin] [~stevenz3wu]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to