Hi 從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector 目前是不支持流式數據源的 你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink checkpoint 還不支持在 FINISHED task 上執行
你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka 消費的數據會實時的去查 hbase table 的當前數據做關聯。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#processing-time-temporal-join chang li <[email protected]> 於 2021年9月10日 週五 下午7:40寫道: > 没有开启Checkpoint > execEnv.enableCheckpointing(checkpointInterval); > > On 2021/09/10 07:41:10, "[email protected]" <[email protected]> wrote: > > Hi: > > 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume > 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下 > > > > DataStream<String> kafkaSource = env.addSource(source); > > Map<String, OutputTag<TmpTable>> sideOutStreamMap = new HashMap<>(); > > for (RowToColumnBean bean : lists) { > > OutputTag<TmpTable> app = new > OutputTag<TmpTable>(bean.getMainTable()) { > > }; > > sideOutStreamMap.put(bean.getMainTable(), app); > > } > > > > RowToNumberProcessFunction rowToNumberProcessFunction = new > RowToNumberProcessFunction(sideOutStreamMap, lists); > > SingleOutputStreamOperator<TmpTable> process = > kafkaSource.process(rowToNumberProcessFunction); > > > > EnvironmentSettings settings = EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .build(); > > > > StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, > settings, new TableConfig()); > > //设置checkpoint > > > tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval", > "10 s"); > > > > for (RowToColumnBean bean : lists) { > > DataStream<TmpTable> dataStream = > process.getSideOutput(sideOutStreamMap.get(bean.getMainTable())); > > > > String mainTable = bean.getMainTable().split(" > ")[0].split("\\.")[1].toLowerCase(); > > > > //Table tmpTable = tableEnv.fromDataStream(dataStream, > StrUtil.list2Str(bean.getQueryColumns())); > > > > tableEnv.createTemporaryView(mainTable, dataStream); > > > > String joinTable = mainTable + "_join"; > > tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" + > > "rowkey STRING,\n" + > > "info ROW<formid STRING>,\n" + > > "PRIMARY KEY (rowkey) NOT ENFORCED\n" + > > ") WITH (\n" + > > "'connector' = 'hbase-2.2',\n" + > > "'table-name' = > 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" + > > "'zookeeper.quorum' = '192.168.0.115:2181',\n" + > > "'zookeeper.znode.parent' = '/hbase'\n" + > > ")"); > > > > > > //查询数据 > > //Table table = tableEnv.sqlQuery("select b.* from tmp a left join > dformfiled b on a.key = b.rowkey"); > > Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + > " a left join " + joinTable + " b on a.key = lower(b.rowkey) and > b.formid='550' where b.rowkey is not null"); > > > > TableSchema schema = table.getSchema(); > > schema.getTableColumns().forEach(column -> { > > > > System.err.println(column.asSummaryString()); > > }); > > > > DataStream<Tuple2<Boolean, Row>> tuple2DataStream = > tableEnv.toRetractStream(table, Row.class); > > tuple2DataStream.print(mainTable); > > dataStream.print(mainTable); > > } > > > > > > [email protected] > > >
