Zakelly commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1577710395
########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ########## @@ -46,13 +46,18 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { /** The callback runner. */ protected final CallbackRunner callbackRunner; - public StateFutureImpl(CallbackRunner callbackRunner) { + protected final CallbackExceptionHandler exceptionHandler; + + public StateFutureImpl( + CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; + this.exceptionHandler = exceptionHandler; } @Override - public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) { + public <U> StateFuture<U> thenApply( Review Comment: Are you suggesting this? ``` thenCompose((v) -> StateFutureUtils.completedFuture(fn.apply(v))); ``` Well I'd suggest not doing so, since we will consider checkpointing the user-provided callback function, nested wrapping may make things more complex. We could consider optimize this if this doesn't affect that part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org