I saw one potential issue. Your timestamp assigner returns timestamp in
second resolution while Flink requires millisecond resolution.


Best,
Kezhu Wang

On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:

I have simple flink stream program, where I am using socket as my
continuous source
I have window size of 2 seconds.

Somehow my window process function is not triggering and even if I pass
events in any order, flink is not ignoring

I can see the output only when I kill my socket , please find the code
snippet below

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);


        DataStream<Price> price = env.socketTextStream("localhost",
9998).uid("price source").map(new MapFunction<String, Price>() {
            @Override
            public Price map(String s) throws Exception {
                return new Price(s.split(",")[0],
LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
BigDecimal(s.split(",")[3]), s.split(",")[4], new
BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
            }
        }
        );

        DataStream<Price> priceStream = price

 
.assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps()
        .withTimestampAssigner((p,timestamp) ->
        {
            ZoneId zoneId = ZoneId.systemDefault();
            long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
            System.out.println(epoch);
             return epoch;
        }))
        .keyBy(new KeySelector<Price, String>() {
                    @Override
                    public String getKey(Price price) throws Exception {
                        return price.getPerformanceId();
                    }
                }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .process(new ProcessWindowFunction<Price, Price, String,
TimeWindow>() {

                    @Override
                    public void process(String s, Context context,
Iterable<Price> iterable, Collector<Price> collector) throws Exception {
                        System.out.println(context.window().getStart()+
"Current watermark: "+context.window().getEnd());
                        Price p1 = null ;
                        for(Price p : iterable)
                        {
                            System.out.println(p.toString());
                            p1= p;
                        }
                        collector.collect(p1);
                    }
                });


        priceStream.writeAsText("c:\\ab.txt");

also data I am inputting are

p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.

Reply via email to