我们这都是大部分都是以使用eventtime进行处理居多,需要使用eventtime,则要在3个地方进行设置

第一:

environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);




第二:

SingleOutputStreamOperator<Object> add_event_time = 
hitchSPVLoggerSingleOutputStreamOperator.uid("add event 
time").assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Object>(org.apache.flink.streaming.api.windowing.time.Time.seconds(0))
 {

    @Override

    public long extractTimestamp(Object o) {

        return o.timestamp;

    }

}).setParallelism(sinkParallelism);




第三:

tableEnv.registerDataStream("hitch_match_result", add_event_time, 
rowtime.rowtime as rt);


最后使用rt即可。


有什么不对的地方,请帮忙指出,谢谢。


| |
hechao
|
|
13250818...@163.com
|
签名由网易邮箱大师定制


在2020年07月29日 09:57,111<taochangl...@163.com> 写道:



您好,请教一个问题,谢谢:
很简单的json,
{"num":100,"ts":1595949526874,"vin":"DDDD"}
{"num":200,"ts":1595949528874,"vin":"AAAA"}
{"num":200,"ts":1595949530880,"vin":"CCCC"}
{"num":300,"ts":1595949532883,"vin":"CCCC"}
{"num":100,"ts":1595949534888,"vin":"AAAA"}
{"num":300,"ts":1595949536892,"vin":"DDDD"}
我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
public class FlinkKafka {
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);

String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
" ts BIGINT,\n" +
" num INT ,\n" +
" vin STRING ,\n" +
" pts AS PROCTIME() ,  \n" +  //处理时间
" rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n 
" +
"  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kkb',\n" +
" 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
" 'properties.group.id' = 'mm',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'latest-offset' \n" +
")";
tableEnv.executeSql(kafkaSourceTable);

String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by 
ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);

windowAllTable.printSchema();
tableEnv.toAppendStream(windowAllTable, Row.class).print();
System.out.println("------------------------------------------------------");
env.execute("job");

}

}


---------------------------
请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by 
ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
打印结果:
root
|-- ts: BIGINT
|-- num: INT
|-- vin: STRING
|-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
|-- rowtime: TIMESTAMP(3) *ROWTIME*


------------------------------------------------------
11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43


但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
版本是flink1.11.0


望指教,谢谢!








Reply via email to