mas-chen commented on code in PR #20215: URL: https://github.com/apache/flink/pull/20215#discussion_r947506841
########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java: ########## @@ -231,16 +236,7 @@ private void setCurrentReader(int index) { reader.start(); currentSourceIndex = index; currentReader = reader; - currentReader - .isAvailable() - .whenComplete( - (result, ex) -> { - if (ex == null) { - availabilityFuture.complete(result); - } else { - availabilityFuture.completeExceptionally(ex); - } - }); Review Comment: If I understand the process in the alternative solution: 1. A new reader is set. 2. When the reader future is done, renew the future. 3. Complete the future when the current reader hits end of input 4. Repeat process with new reader However, this doesn't solve the case where there is no end of input for the streaming source. And to handle this case, there needs to be a process running recursively outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity). Additionally, even in the bounded sources, this can create tight loop since data is not always available if the external system has extended latency. It's better to let the underlying reader signal availability Also in the code snippet above, I think there could be a need for synchronization on the future variable, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to. ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java: ########## @@ -231,16 +236,7 @@ private void setCurrentReader(int index) { reader.start(); currentSourceIndex = index; currentReader = reader; - currentReader - .isAvailable() - .whenComplete( - (result, ex) -> { - if (ex == null) { - availabilityFuture.complete(result); - } else { - availabilityFuture.completeExceptionally(ex); - } - }); Review Comment: If I understand the process in the alternative solution: 1. A new reader is set. 2. When the reader future is done, renew the future. 3. Complete the future when the current reader hits end of input. 4. Repeat process with new reader. However, this doesn't solve the case where there is no end of input for the streaming source. And to handle this case, there needs to be a process running recursively outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity). Additionally, even in the bounded sources, this can create tight loop since data is not always available if the external system has extended latency. It's better to let the underlying reader signal availability. Also in the code snippet above, I think there could be a need for synchronization on the future variable, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org