Thank you! That fixed the problem.

On Thu, Dec 22, 2022, 3:40 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:

> Yes, your understanding is correct. To handle this, you can define a
> watermark strategy that will detect idleness and mark an input as idle.
> Please refer to these two documents[1][2] for more details.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#idleness
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"deepthi s" <deepthi.sridha...@gmail.com>
> *收件人: *"User" <user@flink.apache.org>
> *发送时间: *星期四, 2022年 12 月 22日 上午 9:46:00
> *主题: *Using TumblingEventTimeWindows on low traffic kafka topic
>
> (Adding subject)
>
> On Wed, Dec 21, 2022 at 5:41 PM deepthi s <deepthi.sridha...@gmail.com>
> wrote:
>
>>  Hello, I am new to even-time processing and need some help.
>>
>>
>>
>> We have a kafka source with very low qps and multiple topic partitions
>> have no data for long periods of time. Additionally, data from the source
>> can come out of order (within bounds) and the application needs to process
>> the events in order per key.  So we wanted to try and sort the events in
>> the application.
>>
>>
>> I am using BoundedOutOfOrdernessWatermarks for generating the watermarks
>> and using TumblingEventTimeWindows to collect the keyed events and sort
>> them in the ProcessWindowFunction. I am seeing that the window doesn’t
>> close at all and based on my understanding it is because there are no
>> events for some source partitions. All operators have the same parallelism
>> as source kafka partition count.
>>
>>
>>
>> Pseudo code for my processing:
>>
>>
>>
>> SingleOutputStreamOperator<MyEvent> myStream =
>>
>>      env.fromSource(
>>
>>         *setupSource*(),
>>
>>        WatermarkStrategy.*noWatermarks*(),
>>        "Kafka Source",
>>
>>         TypeInformation.*of*(RowData.class))
>>     .map(rowData -> convertToMyEvent(rowData))
>>     .assignTimestampsAndWatermarks(WatermarkStrategy
>>         .<MyEvent>*forBoundedOutOfOrderness*(
>>             Duration.*ofMinutes*(misAlignmentThresholdMinutes))
>>         .withTimestampAssigner((event, timestamp) -> event.timestamp))
>>     // Key the events by urn which is the key used for the output kafka
>> topic
>>     .keyBy((event) -> event.urn.toString())
>>     // Set up a tumbling window of misAlignmentThresholdMinutes
>>
>>     .window(TumblingEventTimeWindows.*of*(Time.*of*
>> (misAlignmentThresholdMinutes, TimeUnit.*MINUTES*)))
>>     .process(new EventTimerOrderProcessFunction())
>>
>>     .sinkTo(setupSink());
>>
>>
>>
>> Is the understanding correct that the based on the WatermarkStrategy I
>> have, multiple operators will keep emitting *LONG.MIN_VALUE - threshold* if
>> no events are read for those partitions, causing the downstream keyBy
>> operator also to emit *LONG.MIN_VALUE - threshold* watermark (as the min
>> of all watermarks it sees from its input map operators) and so the window
>> doesn’t close at all? If yes, what is the right strategy to handle this? Is
>> there a way to combine EventTimeTrigger with ProcessingTimeoutTrigger?
>>
>>
>>
>>
>>
>> --
>> Regards,
>> Deepthi
>>
>
>
> --
> Regards,
> Deepthi
>
>

Reply via email to