Hi!

env.setRuntimeMode(RuntimeExecutionMode.BATCH); 要放在创建 table environment
之前,否则创建出来的 table environment 还是 streaming 模式。另外这个需要 Flink >= 1.14。



陈卓宇 <2572805...@qq.com.invalid> 于2022年1月10日周一 15:03写道:

> 代码逻辑简单描述:  我通过fromElements的方式简单构造了几条测试数据,然后将流转表,在表上
> 使用我自定义的聚合函数,进行聚合操作,最后打印
> 我的代码:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);  //设置调度模式为批
>
> DataStreamSource<Tuple2<String, Integer&gt;&gt; source =
> env.fromElements(Tuple2.of("aa", 1),
>         Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb",
> 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4));
> Table table = tenv.fromDataStream(source,
>         Schema.newBuilder()
>                 .column("f0", "STRING")
>                 .column("f1", "INTEGER")
>                 .build());
> tenv.createTemporaryView("test",table);
> //对表进行sql查询
> tenv.createTemporarySystemFunction("Average", avg5.Average.class);
> tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by
> f0").print();
>
> 结果:
> +----+--------------------------------+--------------------------------+
> | op |                             f0 |                            rbm |
> +----+--------------------------------+--------------------------------+
> | +I |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U |                             aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +I |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U |                             bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> +----+--------------------------------+--------------------------------+
> 我发现结果是一个可撤回流,这与我所预想批处理是不一致的
>
> 请问:我设置的batch,为什么调度模式还没有改变,我该如何解决这个问题,让他成为批处理?
>
> 最后:感谢之前几个问题上,社区同学的无私解答,因为自身邮箱出了一些问题,就不一一致谢了,望见谅。
>
> 陈卓宇
>
>
> &nbsp;

回复