发件人: Henry meng (孟令平)
发送时间: 2023年4月13日 15:27
收件人: '[email protected]' <[email protected]>
主题:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(60000);
SingleOutputStreamOperator<String> dataStream = env.addSource(new
FlinkRocketMQConsumer("10.164.15.31:9876","flink_data"))
.uid("source-id");
dataStream .print("**********");
DataStream<InverterPo> datStream =dataStream.process(new MyFunction());
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
datStream.print("=========>");
Table inputTable =
tableEnv.fromDataStream(datStream,"devId,identifier,dataValue,dataTime,tenantId");//"devId,identifier,dataValue,dataTime,tenantId"
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"select devId,'DAY_POWER_GENERATION' as identify,
(MAX(dataValue)-Min(dataValue)) value1,MAX(dataTime) datatime, " +
"tenantId from InputTable where
identifier='TOTAL_POWER_GENERATION' group by devId,tenantId");
// resultTable.print();
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
resultStream.print("------------>");
// tableEnv.createTemporaryView("resultTable", resultTable);
tableEnv.executeSql("CREATE TABLE print_table (`DEV_ID`
BIGINT,`IDENTIFIER` String," +
"`DATA_VALUE` Decimal(16,4),`DATA_TIME` TIMESTAMP,`TENANT_ID`
BIGINT) " +
"WITH ('connector' = 'print')");
// tableEnv.executeSql("insert into print_table select
devId,identifier,dataValue,dataTime,tenantId from resultTable");
inputTable.executeInsert("print_table");
env.execute("FlinkRocketMQConsumerDemo");}
如上代码所示,当我添加上env.execute()方法后,发现print_table不打印数据了这是什么原因?
StreamExecutionEnvironment.execute()和StreamTableEnvironment.executeSql()同时执行有什么问题吗?