Hi David, It happened due to lack of patience. I apologise.
On Sat, Jan 8, 2022 at 3:06 PM David Morávek <david.mora...@gmail.com> wrote: > I just have to answer to this. > > This was again cross posted on stackoverflow [1]. I think you seriously > need to rethink your behavior here. The cross posting is one thing, but > creating a second "fake" email, so you can repeat the behavior you've been > discouraged from [2], makes me feel that you don't really appreciate the > help of the community and the time investment from their side. The > community is here to help and you shouldn't fight it in this weird way to > get more attention then others. The mailing lists are designed for an > asynchronous communication and you should allow community at least one > workday to address your question. > > [1] > https://stackoverflow.com/questions/70630493/how-to-get-elements-of-a-specific-window-for-flink-consumer-with-json-data > [2] https://lists.apache.org/thread/5p8oognvlwsbmsvnfltqkh9q3htnp3b7 > > Best, > D. > > On Sat, Jan 8, 2022 at 7:32 AM Flink Lover <flinkbyhe...@gmail.com> wrote: > >> Hi, >> >> I tried the code below but it throws an error. >> val src: DataStream[String] = >> env.addSource(consumer).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))) >> // reading data and used data distribution strategy >> src.process(new JSONParsingProcessFunction()).uid("sink") >> >> Error: >> type mismatch; >> found : >> org.apache.flink.streaming.api.scala.AllWindowedStream[String,org.apache.flink.streaming.api.windowing.windows.TimeWindow] >> required: org.apache.flink.streaming.api.scala.DataStream[String] >> val src: DataStream[String] = >> env.addSource(consumer).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))) >> // reading data and used data distribution strategy >> >> How do I resolve this? >> >> Also, my question is why watermarks won't help me? Since, once the >> element timestamp passes the window timestamp, the window will be triggered >> and the elements passed. >> >> Thanks, >> Martin O >> >> On Sat, Jan 8, 2022 at 4:07 AM Mariam Walid <mariamwalid2...@gmail.com> >> wrote: >> >>> You can do this using the windowAll function, it works with non-keyed >>> streams. >>> >>> On Friday, January 7, 2022, Flink Lover <flinkbyhe...@gmail.com> wrote: >>> >>>> Can somebody help me with this? I tried several examples where they >>>> have extracted the key from Json and used windowing but as far as I have >>>> learnt, with windowing I will have to use some kind of aggregation but in >>>> my use case there is no aggregation to be performed. I just have to get >>>> data for every 2 secs and call the process function. >>>> >>>> - >>>> Martin O >>>> >>>> On Fri, Jan 7, 2022 at 11:37 PM Flink Lover <flinkbyhe...@gmail.com> >>>> wrote: >>>> >>>>> I have an incoming json data like below: >>>>> {"custId": 1,"custFirstName":"Martin", >>>>> "custLastName":"owen","edl_created_at":"2022-03-01 00:00:00"} >>>>> >>>>> Now, this record has been pushed successfully via producer to the >>>>> consumer. But I am willing to get records of say 2 seconds window but I >>>>> don't have any key to use in KeyBy() operation.In this case can I use >>>>> Watermarks? Something like below: >>>>> >>>>> val consumer = new FlinkKafkaConsumer[String]("topic", new >>>>> SimpleStringSchema(), getProperties()) >>>>> >>>>> consumer.assignTimestampsAndWatermarks( >>>>> WatermarkStrategy >>>>> .forBoundedOutOfOrderness[String](Duration.ofSeconds(2))) >>>>> >>>>> Will this help me to get what I want? >>>>> >>>>> Thanks, >>>>> Martin O. >>>>> >>>>