Flink版本:1.10.2
使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。
本地测试的结果是一直重复输出数据。
请问一下DataStream 处理之后,怎么才能注册为 Table。
-------------------------------
代码如下:
// 异步redis处理
RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node,
aggProcessorArgs);
// 获取异步处理流
DataStream<Row> result = AsyncDataStream.orderedWait(
dataStream,
asyncFunction,
60L,
TimeUnit.SECONDS,
100).returns(outRowTypeInfo);
// 注册为临时 table
tabEnv.createTemporaryView("test_table", result,
outRowFields.stream().collect(Collectors.joining(",")));
//
result.print("out_table>>");
Table test_table = tabEnv.sqlQuery("select * from test_table");
// 查询临时table
tabEnv.toAppendStream(test_table, Row.class).print("test_table");
--
**************************************
tili
**************************************