HI Corrected with below code, but still getting same issue
Instant instant = p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant(); long timeInMillis = instant.toEpochMilli(); System.out.println(timeInMillis); return timeInMillis; On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang <kez...@gmail.com> wrote: > I saw one potential issue. Your timestamp assigner returns timestamp in > second resolution while Flink requires millisecond resolution. > > > Best, > Kezhu Wang > > On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote: > > I have simple flink stream program, where I am using socket as my > continuous source > I have window size of 2 seconds. > > Somehow my window process function is not triggering and even if I pass > events in any order, flink is not ignoring > > I can see the output only when I kill my socket , please find the code > snippet below > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > > > DataStream<Price> price = env.socketTextStream("localhost", > 9998).uid("price source").map(new MapFunction<String, Price>() { > @Override > public Price map(String s) throws Exception { > return new Price(s.split(",")[0], > LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new > BigDecimal(s.split(",")[3]), s.split(",")[4], new > BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) ); > } > } > ); > > DataStream<Price> priceStream = price > > > .assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps() > .withTimestampAssigner((p,timestamp) -> > { > ZoneId zoneId = ZoneId.systemDefault(); > long epoch = > p.getAsOfDateTime().atZone(zoneId).toEpochSecond(); > System.out.println(epoch); > return epoch; > })) > .keyBy(new KeySelector<Price, String>() { > @Override > public String getKey(Price price) throws Exception { > return price.getPerformanceId(); > } > }).window(TumblingEventTimeWindows.of(Time.seconds(2))) > .process(new ProcessWindowFunction<Price, Price, String, > TimeWindow>() { > > @Override > public void process(String s, Context context, > Iterable<Price> iterable, Collector<Price> collector) throws Exception { > System.out.println(context.window().getStart()+ > "Current watermark: "+context.window().getEnd()); > Price p1 = null ; > for(Price p : iterable) > { > System.out.println(p.toString()); > p1= p; > } > collector.collect(p1); > } > }); > > > priceStream.writeAsText("c:\\ab.txt"); > > also data I am inputting are > > p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00 > p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01 > p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02 > p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03 > p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04 > p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01 > p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01 > p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01 > > -- > ---Regards--- > > Sagar Bandal > > This is confidential mail ,All Rights are Reserved.If you are not intended > receipiant please ignore this email. > > -- ---Regards--- Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.