[ https://issues.apache.org/jira/browse/FLINK-20159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231924#comment-17231924 ]
Jiangjie Qin commented on FLINK-20159: -------------------------------------- [~sundaram] Thanks for reporting the issue. This part of the code has been refactored in 1.12, so the problem should have been solved in FLINK-19223. But we may need to fix this in 1.11.3. Will you submit a patch against release-1.11 branch? > [FLIP-27 source] FutureNotifier does not return a new future when > Future::future() is invoked within the returned future's callback > ----------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-20159 > URL: https://issues.apache.org/jira/browse/FLINK-20159 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.11.2 > Reporter: Sundaram Ananthanarayanan > Priority: Minor > Fix For: 1.12.0 > > > Here's the *problem*. FutureNotifier::future should return a new future every > time the previous future was completed. That's the expectation. However, if > the future is being requested from within the completion callback of the > previous future, then it, instead of returning a new future, returns the > existing future. This could potentially result in infinite recursions > depending on how the callback method is implemented. Here's an example: > > {code:java} > Consumer code: > void consumeDataOnce() { > // get the data from the producer and check if it was empty > Data data = producer.getData(); > // if data was empty, then grab the future and attach a callback as below > if (data.isEmpty()) { > producer.getCompletableFuture().thenRun(() -> consumeDataOnce()); > } > } > {code} > > In the above method, let's say the producer notified the consumer (produced > by FutureNofier::future), thinking that some data was available to be > consumed. Now let's say the data returned from the producer was instead empty > during the callback. In this case, the method goes on in an infinite loop > when the future is completed. > > *Issue:* If you observe FutureNotifier::notifyComplete's implementation > closely, you realize that the future is completed before the futureRef is > swapped with null. > {code:java} > public void notifyComplete() { > CompletableFuture<Void> future = futureRef.get(); > // If there are multiple threads trying to complete the future, only the > first one succeeds. > if (future != null && future.complete(null)) { > futureRef.compareAndSet(future, null); > } > } > {code} > If we can change the ordering instead, where the future is swapped atomically > first before being completed, then we can guarantee that the future returned > by FutureNotifier::future will always be a new one if the previous one had > completed. > > [~sewen] [~jqin] [~stevenz3wu] -- This message was sent by Atlassian Jira (v8.3.4#803005)