It is fairly simple requirement, if I changed it to PRocessing time it
works fine , but not working with event time..help appreciated!

On Wed, Feb 24, 2021 at 10:51 AM sagar <sagarban...@gmail.com> wrote:

> HI
>
> Corrected with below code, but still getting same issue
>
> Instant instant = 
> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
> long timeInMillis = instant.toEpochMilli();
> System.out.println(timeInMillis);
> return timeInMillis;
>
>
> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang <kez...@gmail.com> wrote:
>
>> I saw one potential issue. Your timestamp assigner returns timestamp in
>> second resolution while Flink requires millisecond resolution.
>>
>>
>> Best,
>> Kezhu Wang
>>
>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>
>> I have simple flink stream program, where I am using socket as my
>> continuous source
>> I have window size of 2 seconds.
>>
>> Somehow my window process function is not triggering and even if I pass
>> events in any order, flink is not ignoring
>>
>> I can see the output only when I kill my socket , please find the code
>> snippet below
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>>
>>         DataStream<Price> price = env.socketTextStream("localhost",
>> 9998).uid("price source").map(new MapFunction<String, Price>() {
>>             @Override
>>             public Price map(String s) throws Exception {
>>                 return new Price(s.split(",")[0],
>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>>             }
>>         }
>>         );
>>
>>         DataStream<Price> priceStream = price
>>
>>  
>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps()
>>         .withTimestampAssigner((p,timestamp) ->
>>         {
>>             ZoneId zoneId = ZoneId.systemDefault();
>>             long epoch =
>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>>             System.out.println(epoch);
>>              return epoch;
>>         }))
>>         .keyBy(new KeySelector<Price, String>() {
>>                     @Override
>>                     public String getKey(Price price) throws Exception {
>>                         return price.getPerformanceId();
>>                     }
>>                 }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>>                 .process(new ProcessWindowFunction<Price, Price, String,
>> TimeWindow>() {
>>
>>                     @Override
>>                     public void process(String s, Context context,
>> Iterable<Price> iterable, Collector<Price> collector) throws Exception {
>>                         System.out.println(context.window().getStart()+
>> "Current watermark: "+context.window().getEnd());
>>                         Price p1 = null ;
>>                         for(Price p : iterable)
>>                         {
>>                             System.out.println(p.toString());
>>                             p1= p;
>>                         }
>>                         collector.collect(p1);
>>                     }
>>                 });
>>
>>
>>         priceStream.writeAsText("c:\\ab.txt");
>>
>> also data I am inputting are
>>
>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.

Reply via email to