fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590680460
########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ########## @@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { /** The callback runner. */ protected final CallbackRunner callbackRunner; - public StateFutureImpl(CallbackRunner callbackRunner) { + /** The exception handler that handles callback framework's error. */ + protected final CallbackExceptionHandler exceptionHandler; + + public StateFutureImpl( + CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; + this.exceptionHandler = exceptionHandler; } @Override - public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) { + public <U> StateFuture<U> thenApply( + FunctionWithException<? super T, ? extends U, ? extends Exception> fn) { callbackRegistered(); - try { - if (completableFuture.isDone()) { + + if (completableFuture.isDone()) { + try { U r = fn.apply(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedFuture(r); - } else { - StateFutureImpl<U> ret = makeNewStateFuture(); - completableFuture.thenAccept( - (t) -> { - callbackRunner.submit( - () -> { - ret.completeInCallbackRunner(fn.apply(t)); - callbackFinished(); - }); - }); - return ret; + } catch (Throwable e) { + exceptionHandler.handleException( + "Caught exception when processing completed StateFuture's callback.", e); + return null; } - } catch (Throwable e) { - throw new FlinkRuntimeException("Error binding or executing callback", e); + } else { + StateFutureImpl<U> ret = makeNewStateFuture(); + completableFuture + .thenAccept( + (t) -> { + callbackRunner.submit( + () -> { + ret.completeInCallbackRunner(fn.apply(t)); + callbackFinished(); + }); + }) + .exceptionally( + (e) -> { + exceptionHandler.handleException( + "Caught exception when submitting StateFuture's callback.", + e); + return null; + }); + return ret; } } @Override - public StateFuture<Void> thenAccept(Consumer<? super T> action) { + public StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends Exception> action) { callbackRegistered(); - try { - if (completableFuture.isDone()) { + if (completableFuture.isDone()) { + try { action.accept(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedVoidFuture(); Review Comment: Thanks for the suggestion, added some description here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org