hello,kenyore.
我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。

公共抽象类AsyncTableFunction <T> 扩展了UserDefinedFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html>

AsyncTableFunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“
eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。

对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T)
<http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletableFuture.html?is-external=true#complete-T->。对于每个异步操作,调用“
eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。

代码示例:

   public void eval(CompletableFuture<Collection<String>> result,
String rowkey) {
            Get get = new Get(Bytes.toBytes(rowkey));
            ListenableFuture<Result> future = hbase.asyncGet(get);
            Futures.addCallback(future, new FutureCallback<Result>() {
                public void onSuccess(Result result) {
                    List<String> ret = process(result);
                    result.complete(ret);
                }

                public void onFailure(Throwable thrown) {
                    result.completeExceptionally(thrown);
                }
            });
        }

参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html
kenyore <[email protected]> 于2021年1月12日周二 下午3:29写道:

> 感谢如此详尽的回复!
> 但是我的场景似乎无法直接使用维表join。
> 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

回复