I just have to answer to this. This was again cross posted on stackoverflow [1]. I think you seriously need to rethink your behavior here. The cross posting is one thing, but creating a second "fake" email, so you can repeat the behavior you've been discouraged from [2], makes me feel that you don't really appreciate the help of the community and the time investment from their side. The community is here to help and you shouldn't fight it in this weird way to get more attention then others. The mailing lists are designed for an asynchronous communication and you should allow community at least one workday to address your question.
[1] https://stackoverflow.com/questions/70630493/how-to-get-elements-of-a-specific-window-for-flink-consumer-with-json-data [2] https://lists.apache.org/thread/5p8oognvlwsbmsvnfltqkh9q3htnp3b7 Best, D. On Sat, Jan 8, 2022 at 7:32 AM Flink Lover <flinkbyhe...@gmail.com> wrote: > Hi, > > I tried the code below but it throws an error. > val src: DataStream[String] = > env.addSource(consumer).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))) > // reading data and used data distribution strategy > src.process(new JSONParsingProcessFunction()).uid("sink") > > Error: > type mismatch; > found : > org.apache.flink.streaming.api.scala.AllWindowedStream[String,org.apache.flink.streaming.api.windowing.windows.TimeWindow] > required: org.apache.flink.streaming.api.scala.DataStream[String] > val src: DataStream[String] = > env.addSource(consumer).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))) > // reading data and used data distribution strategy > > How do I resolve this? > > Also, my question is why watermarks won't help me? Since, once the element > timestamp passes the window timestamp, the window will be triggered and the > elements passed. > > Thanks, > Martin O > > On Sat, Jan 8, 2022 at 4:07 AM Mariam Walid <mariamwalid2...@gmail.com> > wrote: > >> You can do this using the windowAll function, it works with non-keyed >> streams. >> >> On Friday, January 7, 2022, Flink Lover <flinkbyhe...@gmail.com> wrote: >> >>> Can somebody help me with this? I tried several examples where they have >>> extracted the key from Json and used windowing but as far as I have learnt, >>> with windowing I will have to use some kind of aggregation but in my use >>> case there is no aggregation to be performed. I just have to get data for >>> every 2 secs and call the process function. >>> >>> - >>> Martin O >>> >>> On Fri, Jan 7, 2022 at 11:37 PM Flink Lover <flinkbyhe...@gmail.com> >>> wrote: >>> >>>> I have an incoming json data like below: >>>> {"custId": 1,"custFirstName":"Martin", >>>> "custLastName":"owen","edl_created_at":"2022-03-01 00:00:00"} >>>> >>>> Now, this record has been pushed successfully via producer to the >>>> consumer. But I am willing to get records of say 2 seconds window but I >>>> don't have any key to use in KeyBy() operation.In this case can I use >>>> Watermarks? Something like below: >>>> >>>> val consumer = new FlinkKafkaConsumer[String]("topic", new >>>> SimpleStringSchema(), getProperties()) >>>> >>>> consumer.assignTimestampsAndWatermarks( >>>> WatermarkStrategy >>>> .forBoundedOutOfOrderness[String](Duration.ofSeconds(2))) >>>> >>>> Will this help me to get what I want? >>>> >>>> Thanks, >>>> Martin O. >>>> >>>