(Adding subject) On Wed, Dec 21, 2022 at 5:41 PM deepthi s <deepthi.sridha...@gmail.com> wrote:
> Hello, I am new to even-time processing and need some help. > > > > We have a kafka source with very low qps and multiple topic partitions > have no data for long periods of time. Additionally, data from the source > can come out of order (within bounds) and the application needs to process > the events in order per key. So we wanted to try and sort the events in > the application. > > > I am using BoundedOutOfOrdernessWatermarks for generating the watermarks > and using TumblingEventTimeWindows to collect the keyed events and sort > them in the ProcessWindowFunction. I am seeing that the window doesn’t > close at all and based on my understanding it is because there are no > events for some source partitions. All operators have the same parallelism > as source kafka partition count. > > > > Pseudo code for my processing: > > > > SingleOutputStreamOperator<MyEvent> myStream = > > env.fromSource( > > *setupSource*(), > > WatermarkStrategy.*noWatermarks*(), > "Kafka Source", > > TypeInformation.*of*(RowData.class)) > .map(rowData -> convertToMyEvent(rowData)) > .assignTimestampsAndWatermarks(WatermarkStrategy > .<MyEvent>*forBoundedOutOfOrderness*( > Duration.*ofMinutes*(misAlignmentThresholdMinutes)) > .withTimestampAssigner((event, timestamp) -> event.timestamp)) > // Key the events by urn which is the key used for the output kafka > topic > .keyBy((event) -> event.urn.toString()) > // Set up a tumbling window of misAlignmentThresholdMinutes > > .window(TumblingEventTimeWindows.*of*(Time.*of* > (misAlignmentThresholdMinutes, TimeUnit.*MINUTES*))) > .process(new EventTimerOrderProcessFunction()) > > .sinkTo(setupSink()); > > > > Is the understanding correct that the based on the WatermarkStrategy I > have, multiple operators will keep emitting *LONG.MIN_VALUE - threshold* if > no events are read for those partitions, causing the downstream keyBy > operator also to emit *LONG.MIN_VALUE - threshold* watermark (as the min > of all watermarks it sees from its input map operators) and so the window > doesn’t close at all? If yes, what is the right strategy to handle this? Is > there a way to combine EventTimeTrigger with ProcessingTimeoutTrigger? > > > > > > -- > Regards, > Deepthi > -- Regards, Deepthi