Hi. 大罗
试一下这个方法 org.apache.flink.table.api.StatementSet#execute
ss.execute();

大罗 <[email protected]> 于2020年9月9日周三 下午3:13写道:

> Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:
>
> 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
> :2},
> {"pid":"a", "val":1, "data_type": 1, "app_type" :2}]
>
> 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1,
> "app_type" :2}
>
> 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())
>
> 然后,再创建临时表,比如:
> tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator,
>                 $("pid"),  $("val"), $("app_type"), $("data_type"));
>
> 接着定义不同的sql,比如:
> String sql1 = "insert into ods_data_10 select pid, val where data_type = 1
> and app_type = 0"
> String sql2 = "insert into ods_data_11 select pid, val where data_type = 1
> and app_type = 1"
> String sql3 = "insert into ods_data_01 select pid, val where data_type = 0
> and app_type = 1"
> String sql4 = "insert into ods_data_00 select pid, val where data_type = 0
> and app_type = 0"
>
> 使用StatementSet运行它们:
> StatementSet ss = tableEnv.createStatementSet();
> ss.addInsertSql(sql1);
> ss.addInsertSql(sql2);
> ss.addInsertSql(sql3);
> ss.addInsertSql(sql4);
>
> 最后执行作业:
> env.execute(jobName);
>
> 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png>
>
>
> 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),
>
> 作业"insert-into_myhive.dw.ods_analog_sems
> *******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png>
>
>
> 其中,顶端的operator的定义如下:
> Source: Custom Source -> Map -> Flat Map -> Filter ->
> SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
> run_data_type]) ->
> (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
> _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
> _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))])
> ->
> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))])
> ->
> StreamingFileWriter)
>
> 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA"
> 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
> savepoint."
> 相应的停止作业jobB的时候也会生成这个savepoint。
>
> 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复