Thank you! That fixed the problem. On Thu, Dec 22, 2022, 3:40 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
> Yes, your understanding is correct. To handle this, you can define a > watermark strategy that will detect idleness and mark an input as idle. > Please refer to these two documents[1][2] for more details. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#idleness > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources > > Best regards, > Yuxia > > ------------------------------ > *发件人: *"deepthi s" <deepthi.sridha...@gmail.com> > *收件人: *"User" <user@flink.apache.org> > *发送时间: *星期四, 2022年 12 月 22日 上午 9:46:00 > *主题: *Using TumblingEventTimeWindows on low traffic kafka topic > > (Adding subject) > > On Wed, Dec 21, 2022 at 5:41 PM deepthi s <deepthi.sridha...@gmail.com> > wrote: > >> Hello, I am new to even-time processing and need some help. >> >> >> >> We have a kafka source with very low qps and multiple topic partitions >> have no data for long periods of time. Additionally, data from the source >> can come out of order (within bounds) and the application needs to process >> the events in order per key. So we wanted to try and sort the events in >> the application. >> >> >> I am using BoundedOutOfOrdernessWatermarks for generating the watermarks >> and using TumblingEventTimeWindows to collect the keyed events and sort >> them in the ProcessWindowFunction. I am seeing that the window doesn’t >> close at all and based on my understanding it is because there are no >> events for some source partitions. All operators have the same parallelism >> as source kafka partition count. >> >> >> >> Pseudo code for my processing: >> >> >> >> SingleOutputStreamOperator<MyEvent> myStream = >> >> env.fromSource( >> >> *setupSource*(), >> >> WatermarkStrategy.*noWatermarks*(), >> "Kafka Source", >> >> TypeInformation.*of*(RowData.class)) >> .map(rowData -> convertToMyEvent(rowData)) >> .assignTimestampsAndWatermarks(WatermarkStrategy >> .<MyEvent>*forBoundedOutOfOrderness*( >> Duration.*ofMinutes*(misAlignmentThresholdMinutes)) >> .withTimestampAssigner((event, timestamp) -> event.timestamp)) >> // Key the events by urn which is the key used for the output kafka >> topic >> .keyBy((event) -> event.urn.toString()) >> // Set up a tumbling window of misAlignmentThresholdMinutes >> >> .window(TumblingEventTimeWindows.*of*(Time.*of* >> (misAlignmentThresholdMinutes, TimeUnit.*MINUTES*))) >> .process(new EventTimerOrderProcessFunction()) >> >> .sinkTo(setupSink()); >> >> >> >> Is the understanding correct that the based on the WatermarkStrategy I >> have, multiple operators will keep emitting *LONG.MIN_VALUE - threshold* if >> no events are read for those partitions, causing the downstream keyBy >> operator also to emit *LONG.MIN_VALUE - threshold* watermark (as the min >> of all watermarks it sees from its input map operators) and so the window >> doesn’t close at all? If yes, what is the right strategy to handle this? Is >> there a way to combine EventTimeTrigger with ProcessingTimeoutTrigger? >> >> >> >> >> >> -- >> Regards, >> Deepthi >> > > > -- > Regards, > Deepthi > >