I saw one potential issue. Your timestamp assigner returns timestamp in second resolution while Flink requires millisecond resolution.
Best, Kezhu Wang On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote: I have simple flink stream program, where I am using socket as my continuous source I have window size of 2 seconds. Somehow my window process function is not triggering and even if I pass events in any order, flink is not ignoring I can see the output only when I kill my socket , please find the code snippet below final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); DataStream<Price> price = env.socketTextStream("localhost", 9998).uid("price source").map(new MapFunction<String, Price>() { @Override public Price map(String s) throws Exception { return new Price(s.split(",")[0], LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new BigDecimal(s.split(",")[3]), s.split(",")[4], new BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) ); } } ); DataStream<Price> priceStream = price .assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps() .withTimestampAssigner((p,timestamp) -> { ZoneId zoneId = ZoneId.systemDefault(); long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond(); System.out.println(epoch); return epoch; })) .keyBy(new KeySelector<Price, String>() { @Override public String getKey(Price price) throws Exception { return price.getPerformanceId(); } }).window(TumblingEventTimeWindows.of(Time.seconds(2))) .process(new ProcessWindowFunction<Price, Price, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<Price> iterable, Collector<Price> collector) throws Exception { System.out.println(context.window().getStart()+ "Current watermark: "+context.window().getEnd()); Price p1 = null ; for(Price p : iterable) { System.out.println(p.toString()); p1= p; } collector.collect(p1); } }); priceStream.writeAsText("c:\\ab.txt"); also data I am inputting are p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00 p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01 p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02 p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03 p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04 p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01 p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01 p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01 -- ---Regards--- Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.