It is fairly simple requirement, if I changed it to PRocessing time it works fine , but not working with event time..help appreciated!
On Wed, Feb 24, 2021 at 10:51 AM sagar <sagarban...@gmail.com> wrote: > HI > > Corrected with below code, but still getting same issue > > Instant instant = > p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant(); > long timeInMillis = instant.toEpochMilli(); > System.out.println(timeInMillis); > return timeInMillis; > > > On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang <kez...@gmail.com> wrote: > >> I saw one potential issue. Your timestamp assigner returns timestamp in >> second resolution while Flink requires millisecond resolution. >> >> >> Best, >> Kezhu Wang >> >> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote: >> >> I have simple flink stream program, where I am using socket as my >> continuous source >> I have window size of 2 seconds. >> >> Somehow my window process function is not triggering and even if I pass >> events in any order, flink is not ignoring >> >> I can see the output only when I kill my socket , please find the code >> snippet below >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >> >> >> DataStream<Price> price = env.socketTextStream("localhost", >> 9998).uid("price source").map(new MapFunction<String, Price>() { >> @Override >> public Price map(String s) throws Exception { >> return new Price(s.split(",")[0], >> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new >> BigDecimal(s.split(",")[3]), s.split(",")[4], new >> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) ); >> } >> } >> ); >> >> DataStream<Price> priceStream = price >> >> >> .assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps() >> .withTimestampAssigner((p,timestamp) -> >> { >> ZoneId zoneId = ZoneId.systemDefault(); >> long epoch = >> p.getAsOfDateTime().atZone(zoneId).toEpochSecond(); >> System.out.println(epoch); >> return epoch; >> })) >> .keyBy(new KeySelector<Price, String>() { >> @Override >> public String getKey(Price price) throws Exception { >> return price.getPerformanceId(); >> } >> }).window(TumblingEventTimeWindows.of(Time.seconds(2))) >> .process(new ProcessWindowFunction<Price, Price, String, >> TimeWindow>() { >> >> @Override >> public void process(String s, Context context, >> Iterable<Price> iterable, Collector<Price> collector) throws Exception { >> System.out.println(context.window().getStart()+ >> "Current watermark: "+context.window().getEnd()); >> Price p1 = null ; >> for(Price p : iterable) >> { >> System.out.println(p.toString()); >> p1= p; >> } >> collector.collect(p1); >> } >> }); >> >> >> priceStream.writeAsText("c:\\ab.txt"); >> >> also data I am inputting are >> >> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00 >> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01 >> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02 >> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03 >> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04 >> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01 >> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01 >> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01 >> >> -- >> ---Regards--- >> >> Sagar Bandal >> >> This is confidential mail ,All Rights are Reserved.If you are not >> intended receipiant please ignore this email. >> >> > > -- > ---Regards--- > > Sagar Bandal > > This is confidential mail ,All Rights are Reserved.If you are not intended > receipiant please ignore this email. > -- ---Regards--- Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.