HI

Corrected with below code, but still getting same issue

Instant instant =
p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
long timeInMillis = instant.toEpochMilli();
System.out.println(timeInMillis);
return timeInMillis;


On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang <kez...@gmail.com> wrote:

> I saw one potential issue. Your timestamp assigner returns timestamp in
> second resolution while Flink requires millisecond resolution.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>
> I have simple flink stream program, where I am using socket as my
> continuous source
> I have window size of 2 seconds.
>
> Somehow my window process function is not triggering and even if I pass
> events in any order, flink is not ignoring
>
> I can see the output only when I kill my socket , please find the code
> snippet below
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
>
>         DataStream<Price> price = env.socketTextStream("localhost",
> 9998).uid("price source").map(new MapFunction<String, Price>() {
>             @Override
>             public Price map(String s) throws Exception {
>                 return new Price(s.split(",")[0],
> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
> BigDecimal(s.split(",")[3]), s.split(",")[4], new
> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>             }
>         }
>         );
>
>         DataStream<Price> priceStream = price
>
>  
> .assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps()
>         .withTimestampAssigner((p,timestamp) ->
>         {
>             ZoneId zoneId = ZoneId.systemDefault();
>             long epoch =
> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>             System.out.println(epoch);
>              return epoch;
>         }))
>         .keyBy(new KeySelector<Price, String>() {
>                     @Override
>                     public String getKey(Price price) throws Exception {
>                         return price.getPerformanceId();
>                     }
>                 }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>                 .process(new ProcessWindowFunction<Price, Price, String,
> TimeWindow>() {
>
>                     @Override
>                     public void process(String s, Context context,
> Iterable<Price> iterable, Collector<Price> collector) throws Exception {
>                         System.out.println(context.window().getStart()+
> "Current watermark: "+context.window().getEnd());
>                         Price p1 = null ;
>                         for(Price p : iterable)
>                         {
>                             System.out.println(p.toString());
>                             p1= p;
>                         }
>                         collector.collect(p1);
>                     }
>                 });
>
>
>         priceStream.writeAsText("c:\\ab.txt");
>
> also data I am inputting are
>
> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.

Reply via email to