Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:

首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
:2},
{"pid":"a", "val":1, "data_type": 1, "app_type" :2}]

然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1,
"app_type" :2}

把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())

然后,再创建临时表,比如:
tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator,
                $("pid"),  $("val"), $("app_type"), $("data_type"));

接着定义不同的sql,比如:
String sql1 = "insert into ods_data_10 select pid, val where data_type = 1
and app_type = 0"
String sql2 = "insert into ods_data_11 select pid, val where data_type = 1
and app_type = 1"
String sql3 = "insert into ods_data_01 select pid, val where data_type = 0
and app_type = 1"
String sql4 = "insert into ods_data_00 select pid, val where data_type = 0
and app_type = 0"

使用StatementSet运行它们:
StatementSet ss = tableEnv.createStatementSet();
ss.addInsertSql(sql1);
ss.addInsertSql(sql2);
ss.addInsertSql(sql3);
ss.addInsertSql(sql4);

最后执行作业:
env.execute(jobName);

一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:

<http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png> 

作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),

作业"insert-into_myhive.dw.ods_analog_sems
*******"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:

<http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png> 

其中,顶端的operator的定义如下:
Source: Custom Source -> Map -> Flat Map -> Filter ->
SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
run_data_type]) -> 
(Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
_UTF-16LE'BP.%')))]) -> StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
_UTF-16LE'BP.%')))]) -> StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))]) ->
StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'yyyy-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))]) ->
StreamingFileWriter)

我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m xxxx:8081 jobA"
会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
savepoint."
相应的停止作业jobB的时候也会生成这个savepoint。

我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复