Hi. 大罗 试一下这个方法 org.apache.flink.table.api.StatementSet#execute ss.execute();
大罗 <[email protected]> 于2020年9月9日周三 下午3:13写道: > Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下: > > 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type" > :2}, > {"pid":"a", "val":1, "data_type": 1, "app_type" :2}] > > 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1, > "app_type" :2} > > 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink()) > > 然后,再创建临时表,比如: > tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator, > $("pid"), $("val"), $("app_type"), $("data_type")); > > 接着定义不同的sql,比如: > String sql1 = "insert into ods_data_10 select pid, val where data_type = 1 > and app_type = 0" > String sql2 = "insert into ods_data_11 select pid, val where data_type = 1 > and app_type = 1" > String sql3 = "insert into ods_data_01 select pid, val where data_type = 0 > and app_type = 1" > String sql4 = "insert into ods_data_00 select pid, val where data_type = 0 > and app_type = 0" > > 使用StatementSet运行它们: > StatementSet ss = tableEnv.createStatementSet(); > ss.addInsertSql(sql1); > ss.addInsertSql(sql2); > ss.addInsertSql(sql3); > ss.addInsertSql(sql4); > > 最后执行作业: > env.execute(jobName); > > 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图: > > < > http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png> > > > 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA), > > 作业"insert-into_myhive.dw.ods_analog_sems > *******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图: > > < > http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png> > > > 其中,顶端的operator的定义如下: > Source: Custom Source -> Map -> Flat Map -> Filter -> > SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et, > run_data_type]) -> > (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE > _UTF-16LE'BP.%')))]) -> StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE > _UTF-16LE'BP.%')))]) -> StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))]) > -> > StreamingFileWriter, > Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME > _UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH') > AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))]) > -> > StreamingFileWriter) > > 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA" > 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a > savepoint." > 相应的停止作业jobB的时候也会生成这个savepoint。 > > 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢? > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
