Sorry for the typo.

The dev@flink.apache.org <mailto:dev@flink.apache.org> is used to discuss Flink 
Development like new features, vote, and releases, it’s improper to discuss 
user question here.
Please discuss user question in u...@flink.apache.org 
<mailto:u...@flink.apache.org>, if you prefer Chinese, please discuss question 
in user...@flink.apache.org <mailto:user...@flink.apache.org> , you can 
refer[1] for more details.


> 在 2020年7月29日,14:33,Leonard Xu <xbjt...@gmail.com> 写道:
> 
> Hi, taochabglian
> 
> The u...@flink.apache.org <mailto:u...@flink.apache.org> is used to discuss 
> Flink Development like new features, vote, and releases, it’s improper to 
> discuss user question here.
> Please discuss user question in u...@flink.apache.org 
> <mailto:u...@flink.apache.org>, if you prefer Chinese, please discuss 
> question in user...@flink.apache.org <mailto:user...@flink.apache.org> , you 
> can refer[1] for more details.
> 
> Best
> Leonard Xu
> [1] https://flink.apache.org/community.html#mailing-lists 
> <https://flink.apache.org/community.html#mailing-lists>
> 
> 
> 
>> 在 2020年7月29日,13:55,hechao <13250818...@163.com <mailto:13250818...@163.com>> 
>> 写道:
>> 
>> 我们这都是大部分都是以使用eventtime进行处理居多,需要使用eventtime,则要在3个地方进行设置
>> 
>> 第一:
>> 
>> environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>> 
>> 
>> 
>> 第二:
>> 
>> SingleOutputStreamOperator<Object> add_event_time = 
>> hitchSPVLoggerSingleOutputStreamOperator.uid("add event 
>> time").assignTimestampsAndWatermarks(new 
>> BoundedOutOfOrdernessTimestampExtractor<Object>(org.apache.flink.streaming.api.windowing.time.Time.seconds(0))
>>  {
>> 
>>    @Override
>> 
>>    public long extractTimestamp(Object o) {
>> 
>>        return o.timestamp;
>> 
>>    }
>> 
>> }).setParallelism(sinkParallelism);
>> 
>> 
>> 
>> 
>> 第三:
>> 
>> tableEnv.registerDataStream("hitch_match_result", add_event_time, 
>> rowtime.rowtime as rt);
>> 
>> 
>> 最后使用rt即可。
>> 
>> 
>> 有什么不对的地方,请帮忙指出,谢谢。
>> 
>> 
>> | |
>> hechao
>> |
>> |
>> 13250818...@163.com <mailto:13250818...@163.com>
>> |
>> 签名由网易邮箱大师定制
>> 
>> 
>> 在2020年07月29日 09:57,111<taochangl...@163.com> 写道:
>> 
>> 
>> 
>> 您好,请教一个问题,谢谢:
>> 很简单的json,
>> {"num":100,"ts":1595949526874,"vin":"DDDD"}
>> {"num":200,"ts":1595949528874,"vin":"AAAA"}
>> {"num":200,"ts":1595949530880,"vin":"CCCC"}
>> {"num":300,"ts":1595949532883,"vin":"CCCC"}
>> {"num":100,"ts":1595949534888,"vin":"AAAA"}
>> {"num":300,"ts":1595949536892,"vin":"DDDD"}
>> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
>> public class FlinkKafka {
>> public static void main(String[] args) throws Exception{
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> final EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
>> settings);
>> 
>> String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
>> " ts BIGINT,\n" +
>> " num INT ,\n" +
>> " vin STRING ,\n" +
>> " pts AS PROCTIME() ,  \n" +  //处理时间
>> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), 
>> \n " +
>> "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka',\n" +
>> " 'topic' = 'kkb',\n" +
>> " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" 
>> +
>> " 'properties.group.id' = 'mm',\n" +
>> " 'format' = 'json',\n" +
>> " 'scan.startup.mode' = 'latest-offset' \n" +
>> ")";
>> tableEnv.executeSql(kafkaSourceTable);
>> 
>> String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by 
>> ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
>> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
>> 
>> windowAllTable.printSchema();
>> tableEnv.toAppendStream(windowAllTable, Row.class).print();
>> System.out.println("------------------------------------------------------");
>> env.execute("job");
>> 
>> }
>> 
>> }
>> 
>> 
>> ---------------------------
>> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group 
>> by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
>> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
>> 打印结果:
>> root
>> |-- ts: BIGINT
>> |-- num: INT
>> |-- vin: STRING
>> |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
>> |-- rowtime: TIMESTAMP(3) *ROWTIME*
>> 
>> 
>> ------------------------------------------------------
>> 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
>> 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
>> 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
>> 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
>> 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
>> 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
>> 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
>> 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43
>> 
>> 
>> 但是如果我使用TUMBLE(rowtime, INTERVAL '5' 
>> SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
>> 版本是flink1.11.0
>> 
>> 
>> 望指教,谢谢!
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 

Reply via email to