[ https://issues.apache.org/jira/browse/FLINK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937915#comment-17937915 ]
Ian Stewart commented on FLINK-33933: ------------------------------------- I am able to reproduce this on Flink 1.20.1. The error occurs anytime you have an AsyncLookupFunction, {*}with caching enabled{*}, and return Future which completes exceptionally, as [~KarlManong] demos. The root cause of the stack overflow is inside the constructor of SerializedThrowable. The implementation contains checks for a cyclic references in the causal chain, however it does not account for a cycle in the suppressed exception. This results in a stack overflow when ExceptionA is suppressing ExceptionB and ExceptionB has ExceptionA as it's cause. This appears to always be the case when an AsyncLookupFunction completes exceptionally and is wrapped in the CachingAsyncLookupFunction. This exception ends up suppressing the RuntimeException thrown here[1] which has the exception from the AsyncFunction as it's cause. [1] https://github.com/apache/flink/blob/release-1.20.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/CachingAsyncLookupFunction.java#L99-L101 > SerializedThrowable will be java.lang.StackOverflowError when > AsyncLookupFunction throw an exception > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-33933 > URL: https://issues.apache.org/jira/browse/FLINK-33933 > Project: Flink > Issue Type: Bug > Environment: tested from 1.16 to 1.18 , the same behavior > Reporter: KarlManong > Priority: Minor > > Here is a simple example > {code:java} > // example > public class TableA implements LookupTableSource { > @Nullable > private final LookupCache cache; > public TableA(@Nullable LookupCache cache) { > this.cache = cache; > } > @Override > public LookupRuntimeProvider > getLookupRuntimeProvider(LookupContext context) { > FunctionA lookupFunction = new FunctionA(); > if (cache != null) { > return PartialCachingAsyncLookupProvider.of(lookupFunction, cache); > } else { > return AsyncLookupFunctionProvider.of(lookupFunction); > } > } > @Override > public DynamicTableSource copy() { > return new TableA(cache); > } > @Override > public String asSummaryString() { > return "Async Table"; > } > } > public class LookupFunctionA extends AsyncLookupFunction { > @Override > public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) { > CompletableFuture<Collection<RowData>> future = new > CompletableFuture<>(); > future.completeExceptionally(new IOException("request failed")); > return future; > } > } > {code} > When using TableA, StackOverflowError occurs > > {code:java} > // code placeholder > java.lang.StackOverflowError > at java.base/java.lang.Exception.<init>(Exception.java:66) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:66) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > at > org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > at > org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)