Hey guys, I am using Flink Table API recently. I want to use EventTime and use a Kakfa Table Connector. I think in my code Flink cannot recognize event time timestamp field. Here is my code :
public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); tEnv // 使用connect函数连接外部系统 .connect( new Kafka() .version("universal") // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal" .topic("user_behavior") // 必填,Topic名 .startFromLatest() // 首次消费时数据读取的位置 .property("zookeeper.connect", "localhost:2181") // Kafka连接参数 .property("bootstrap.servers", "localhost:9092") ) // 序列化方式 可以是JSON、Avro等 .withFormat(new Json()) // 数据的Schema .withSchema( new Schema() .field("user_id", DataTypes.BIGINT()) .field("item_id", DataTypes.BIGINT()) .field("category_id", DataTypes.BIGINT()) .field("behavior", DataTypes.STRING()) .field("ts", DataTypes.TIMESTAMP(3)) .rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicAscending()) ) // 临时表的表名,后续可以在SQL语句中使用这个表名 .createTemporaryTable("user_behavior"); Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" + "\tuser_id, \n" + "\tCOUNT(behavior) AS behavior_cnt, \n" + "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" + "FROM user_behavior\n" + "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)"); DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(tumbleGroupByUserId, Row.class); result.print(); env.execute("table api"); } As shown in the code above, I use rowtime() method when I want to define a Schema. When I try to run, I get the following error: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. I tried another method based on a DLL, and it worked. So it is not my Kafka source problem. tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" + " user_id BIGINT,\n" + " item_id BIGINT,\n" + " category_id BIGINT,\n" + " behavior STRING,\n" + " ts TIMESTAMP(3),\n" + // " proctime as PROCTIME(), -- 通过计算列产生一个处理时间列\n" + " WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列\n" + ") WITH (\n" + " 'connector.type' = 'kafka', -- 使用 kafka connector\n" + " 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本\n" + " 'connector.topic' = 'user_behavior', -- kafka topic\n" + " 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址\n" + " 'format.type' = 'json' -- 数据源格式为 json\n" + ")"); Hope anyone can give some suggestions. Thanks.