Hi David,

It happened due to lack of patience. I apologise.


On Sat, Jan 8, 2022 at 3:06 PM David Morávek <david.mora...@gmail.com>
wrote:

> I just have to answer to this.
>
> This was again cross posted on stackoverflow [1]. I think you seriously
> need to rethink your behavior here. The cross posting is one thing, but
> creating a second "fake" email, so you can repeat the behavior you've been
> discouraged from [2], makes me feel that you don't really appreciate the
> help of the community and the time investment from their side. The
> community is here to help and you shouldn't fight it in this weird way to
> get more attention then others. The mailing lists are designed for an
> asynchronous communication and you should allow community at least one
> workday to address your question.
>
> [1]
> https://stackoverflow.com/questions/70630493/how-to-get-elements-of-a-specific-window-for-flink-consumer-with-json-data
> [2] https://lists.apache.org/thread/5p8oognvlwsbmsvnfltqkh9q3htnp3b7
>
> Best,
> D.
>
> On Sat, Jan 8, 2022 at 7:32 AM Flink Lover <flinkbyhe...@gmail.com> wrote:
>
>> Hi,
>>
>> I tried the code below but it throws an error.
>> val src: DataStream[String] =
>> env.addSource(consumer).windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
>> // reading data and used data distribution strategy
>>     src.process(new JSONParsingProcessFunction()).uid("sink")
>>
>> Error:
>> type mismatch;
>>  found   :
>> org.apache.flink.streaming.api.scala.AllWindowedStream[String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
>>  required: org.apache.flink.streaming.api.scala.DataStream[String]
>>     val src: DataStream[String] =
>> env.addSource(consumer).windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
>> // reading data and used data distribution strategy
>>
>> How do I resolve this?
>>
>> Also, my question is why watermarks won't help me? Since, once the
>> element timestamp passes the window timestamp, the window will be triggered
>> and the elements passed.
>>
>> Thanks,
>> Martin O
>>
>> On Sat, Jan 8, 2022 at 4:07 AM Mariam Walid <mariamwalid2...@gmail.com>
>> wrote:
>>
>>> You can do this using the windowAll function, it works with non-keyed
>>> streams.
>>>
>>> On Friday, January 7, 2022, Flink Lover <flinkbyhe...@gmail.com> wrote:
>>>
>>>> Can somebody help me with this? I tried several examples where they
>>>> have extracted the key from Json and used windowing but as far as I have
>>>> learnt, with windowing I will have to use some kind of aggregation but in
>>>> my use case there is no aggregation to be performed. I just have to get
>>>> data for every 2 secs and call the process function.
>>>>
>>>> -
>>>> Martin O
>>>>
>>>> On Fri, Jan 7, 2022 at 11:37 PM Flink Lover <flinkbyhe...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have an incoming json data like below:
>>>>> {"custId": 1,"custFirstName":"Martin",
>>>>> "custLastName":"owen","edl_created_at":"2022-03-01 00:00:00"}
>>>>>
>>>>> Now, this record has been pushed successfully via producer to the
>>>>> consumer. But I am willing to get records of say 2 seconds window but I
>>>>> don't have any key to use in KeyBy() operation.In this case can I use
>>>>> Watermarks? Something like below:
>>>>>
>>>>> val consumer = new FlinkKafkaConsumer[String]("topic", new
>>>>> SimpleStringSchema(), getProperties())
>>>>>
>>>>> consumer.assignTimestampsAndWatermarks(
>>>>>       WatermarkStrategy
>>>>>         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>>
>>>>> Will this help me to get what I want?
>>>>>
>>>>> Thanks,
>>>>> Martin O.
>>>>>
>>>>

Reply via email to