Sundaram Ananthanarayanan created FLINK-20159:
-------------------------------------------------

             Summary: [FLIP-27 source] FutureNotifier does not return a new 
future when Future::future() is invoked within the returned future's callback
                 Key: FLINK-20159
                 URL: https://issues.apache.org/jira/browse/FLINK-20159
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.11.2
            Reporter: Sundaram Ananthanarayanan
             Fix For: 1.12.0


Here's the *problem*. FutureNotifier::future should return a new future every 
time the previous future was completed. That's the expectation. However, if the 
future is being requested from within the completion callback of the previous 
future, then it, instead of returning a new future, returns the existing 
future. This could potentially result in infinite recursions depending on how 
the callback method is implemented. Here's an example:

 
{code:java}
Consumer code:
void consumeDataOnce() {
  // get the data from the producer and check if it was empty
  Data data = producer.getData();
  // if data was empty, then grab the future and attach a callback as below
  if (data.isEmpty()) {
    producer.getCompletableFuture().thenRun(() -> consumeDataOnce());
  }
}
{code}
 

In the above method, let's say the producer notified the consumer (produced by 
FutureNofier::future), thinking that some data was available to be consumed. 
Now let's say the data returned from the producer was instead empty during the 
callback. In this case, the method goes on in an infinite loop when the future 
is completed. 

 

*Issue:* If you observe FutureNotifier::notifyComplete's implementation 
closely, you realize that the future is completed before the futureRef is 
swapped with null. 
{code:java}
public void notifyComplete() {
  CompletableFuture<Void> future = futureRef.get();
  // If there are multiple threads trying to complete the future, only the 
first one succeeds.
  if (future != null && future.complete(null)) {
    futureRef.compareAndSet(future, null);
  }
}
{code}
If we can change the ordering instead, where the future is swapped atomically 
first before being completed, then we can guarantee that the future returned by 
FutureNotifier::future will always be a new one if the previous one had 
completed. 

 

[~sewen] [~jqin] [~stevenz3wu]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to