Hi! env.setRuntimeMode(RuntimeExecutionMode.BATCH); 要放在创建 table environment 之前,否则创建出来的 table environment 还是 streaming 模式。另外这个需要 Flink >= 1.14。
陈卓宇 <2572805...@qq.com.invalid> 于2022年1月10日周一 15:03写道: > 代码逻辑简单描述: 我通过fromElements的方式简单构造了几条测试数据,然后将流转表,在表上 > 使用我自定义的聚合函数,进行聚合操作,最后打印 > 我的代码: > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); //设置调度模式为批 > > DataStreamSource<Tuple2<String, Integer>> source = > env.fromElements(Tuple2.of("aa", 1), > Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb", > 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4)); > Table table = tenv.fromDataStream(source, > Schema.newBuilder() > .column("f0", "STRING") > .column("f1", "INTEGER") > .build()); > tenv.createTemporaryView("test",table); > //对表进行sql查询 > tenv.createTemporarySystemFunction("Average", avg5.Average.class); > tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by > f0").print(); > > 结果: > +----+--------------------------------+--------------------------------+ > | op | f0 | rbm | > +----+--------------------------------+--------------------------------+ > | +I | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | -U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | +U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | -U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | +U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | +I | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | -U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | +U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | -U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > | +U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... | > +----+--------------------------------+--------------------------------+ > 我发现结果是一个可撤回流,这与我所预想批处理是不一致的 > > 请问:我设置的batch,为什么调度模式还没有改变,我该如何解决这个问题,让他成为批处理? > > 最后:感谢之前几个问题上,社区同学的无私解答,因为自身邮箱出了一些问题,就不一一致谢了,望见谅。 > > 陈卓宇 > > >