Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa 
Table Connector. I think in my code Flink cannot recognize event time timestamp 
field. Here is my code :

public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
fsSettings);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        tEnv
            // 使用connect函数连接外部系统
            .connect(
                new Kafka()
                .version("universal")     // 必填,合法的参数有"0.8", "0.9", "0.10", 
"0.11"或"universal"
                .topic("user_behavior")   // 必填,Topic名
                .startFromLatest()        // 首次消费时数据读取的位置
                .property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
                .property("bootstrap.servers", "localhost:9092")
            )
            // 序列化方式 可以是JSON、Avro等
            .withFormat(new Json())
            // 数据的Schema
            .withSchema(
                new Schema()
                    .field("user_id", DataTypes.BIGINT())
                    .field("item_id", DataTypes.BIGINT())
                    .field("category_id", DataTypes.BIGINT())
                    .field("behavior", DataTypes.STRING())
                    .field("ts", DataTypes.TIMESTAMP(3))
                    .rowtime(new 
Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
            )
            // 临时表的表名,后续可以在SQL语句中使用这个表名
            .createTemporaryTable("user_behavior");

        Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
                "\tuser_id, \n" +
                "\tCOUNT(behavior) AS behavior_cnt, \n" +
                "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
                "FROM user_behavior\n" +
                "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
        DataStream<Tuple2<Boolean, Row>> result = 
tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
        result.print();

        env.execute("table api");
    }

As shown in the code above, I use rowtime() method when I want to define a 
Schema. When I try to run, I get the following error: Window aggregate can only 
be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka 
source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3),\n" +
//                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
                "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 
在ts上定义watermark,ts成为事件时间列\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
                "    'connector.version' = 'universal',  -- kafka 版本,universal 
支持 0.11 以上的版本\n" +
                "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
                "    'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 
开始读取\n" +
                "    'connector.properties.zookeeper.connect' = 
'localhost:2181',  -- zookeeper 地址\n" +
                "    'connector.properties.bootstrap.servers' = 
'localhost:9092',  -- kafka broker 地址\n" +
                "    'format.type' = 'json'  -- 数据源格式为 json\n" +
                ")");

Hope anyone can give some suggestions. Thanks.

Reply via email to