Thanks Kezhu, It worked!!! On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang <kez...@gmail.com> wrote:
> Try `env.setParallelism(1)`. Default parallelism for local environment is > `Runtime.getRuntime.availableProcessors`. > > You test data set are so small that when they are scatter cross multiple > parallel instances, there will be no data with event time assigned to > trigger downstream computation. > > Or you could try `WatermarkStrategy.withIdleness`. > > > Best, > Kezhu Wang > > On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote: > > It is fairly simple requirement, if I changed it to PRocessing time it > works fine , but not working with event time..help appreciated! > > On Wed, Feb 24, 2021 at 10:51 AM sagar <sagarban...@gmail.com> wrote: > >> HI >> >> Corrected with below code, but still getting same issue >> >> Instant instant = >> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant(); >> long timeInMillis = instant.toEpochMilli(); >> System.out.println(timeInMillis); >> return timeInMillis; >> >> >> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang <kez...@gmail.com> wrote: >> >>> I saw one potential issue. Your timestamp assigner returns timestamp in >>> second resolution while Flink requires millisecond resolution. >>> >>> >>> Best, >>> Kezhu Wang >>> >>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote: >>> >>> I have simple flink stream program, where I am using socket as my >>> continuous source >>> I have window size of 2 seconds. >>> >>> Somehow my window process function is not triggering and even if I pass >>> events in any order, flink is not ignoring >>> >>> I can see the output only when I kill my socket , please find the code >>> snippet below >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >>> >>> >>> DataStream<Price> price = env.socketTextStream("localhost", >>> 9998).uid("price source").map(new MapFunction<String, Price>() { >>> @Override >>> public Price map(String s) throws Exception { >>> return new Price(s.split(",")[0], >>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new >>> BigDecimal(s.split(",")[3]), s.split(",")[4], new >>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) ); >>> } >>> } >>> ); >>> >>> DataStream<Price> priceStream = price >>> >>> >>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps() >>> .withTimestampAssigner((p,timestamp) -> >>> { >>> ZoneId zoneId = ZoneId.systemDefault(); >>> long epoch = >>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond(); >>> System.out.println(epoch); >>> return epoch; >>> })) >>> .keyBy(new KeySelector<Price, String>() { >>> @Override >>> public String getKey(Price price) throws Exception { >>> return price.getPerformanceId(); >>> } >>> }).window(TumblingEventTimeWindows.of(Time.seconds(2))) >>> .process(new ProcessWindowFunction<Price, Price, String, >>> TimeWindow>() { >>> >>> @Override >>> public void process(String s, Context context, >>> Iterable<Price> iterable, Collector<Price> collector) throws Exception { >>> System.out.println(context.window().getStart()+ >>> "Current watermark: "+context.window().getEnd()); >>> Price p1 = null ; >>> for(Price p : iterable) >>> { >>> System.out.println(p.toString()); >>> p1= p; >>> } >>> collector.collect(p1); >>> } >>> }); >>> >>> >>> priceStream.writeAsText("c:\\ab.txt"); >>> >>> also data I am inputting are >>> >>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00 >>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01 >>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02 >>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03 >>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04 >>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01 >>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01 >>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01 >>> >>> -- >>> ---Regards--- >>> >>> Sagar Bandal >>> >>> This is confidential mail ,All Rights are Reserved.If you are not >>> intended receipiant please ignore this email. >>> >>> >> >> -- >> ---Regards--- >> >> Sagar Bandal >> >> This is confidential mail ,All Rights are Reserved.If you are not >> intended receipiant please ignore this email. >> > > > -- > ---Regards--- > > Sagar Bandal > > This is confidential mail ,All Rights are Reserved.If you are not intended > receipiant please ignore this email. > > -- ---Regards--- Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email.