简单示例:
public class TableA implements LookupTableSource {
@Nullable
private final LookupCache cache;
public TableA(@Nullable LookupCache cache) {
this.cache = cache;
}
@Override
public LookupRuntimeProvider
getLookupRuntimeProvider(LookupContext context) {
FunctionA lookupFunction = new FunctionA(false);
if (cache != null) {
return PartialCachingAsyncLookupProvider.of(lookupFunction, cache);
} else {
return AsyncLookupFunctionProvider.of(lookupFunction);
}
}
@Override
public DynamicTableSource copy() {
return new TableA(cache);
}
@Override
public String asSummaryString() {
return "Async Table";
}
}
public class LookupFunctionA extends AsyncLookupFunction {
@Override
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
CompletableFuture<Collection<RowData>> future = new
CompletableFuture<>();
future.completeExceptionally(new IOException("request failed"));
return future;
}
}
会出现:
java.lang.StackOverflowError
at
org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:66)
at
org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
at
org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)。
Flink版本1.16~1.18都测试过,一样的报错。
从SerializedThrowable源码看,addAllSuppressed方法没有传递alreadySeen