(Adding subject)

On Wed, Dec 21, 2022 at 5:41 PM deepthi s <deepthi.sridha...@gmail.com>
wrote:

>  Hello, I am new to even-time processing and need some help.
>
>
>
> We have a kafka source with very low qps and multiple topic partitions
> have no data for long periods of time. Additionally, data from the source
> can come out of order (within bounds) and the application needs to process
> the events in order per key.  So we wanted to try and sort the events in
> the application.
>
>
> I am using BoundedOutOfOrdernessWatermarks for generating the watermarks
> and using TumblingEventTimeWindows to collect the keyed events and sort
> them in the ProcessWindowFunction. I am seeing that the window doesn’t
> close at all and based on my understanding it is because there are no
> events for some source partitions. All operators have the same parallelism
> as source kafka partition count.
>
>
>
> Pseudo code for my processing:
>
>
>
> SingleOutputStreamOperator<MyEvent> myStream =
>
>      env.fromSource(
>
>         *setupSource*(),
>
>        WatermarkStrategy.*noWatermarks*(),
>        "Kafka Source",
>
>         TypeInformation.*of*(RowData.class))
>     .map(rowData -> convertToMyEvent(rowData))
>     .assignTimestampsAndWatermarks(WatermarkStrategy
>         .<MyEvent>*forBoundedOutOfOrderness*(
>             Duration.*ofMinutes*(misAlignmentThresholdMinutes))
>         .withTimestampAssigner((event, timestamp) -> event.timestamp))
>     // Key the events by urn which is the key used for the output kafka
> topic
>     .keyBy((event) -> event.urn.toString())
>     // Set up a tumbling window of misAlignmentThresholdMinutes
>
>     .window(TumblingEventTimeWindows.*of*(Time.*of*
> (misAlignmentThresholdMinutes, TimeUnit.*MINUTES*)))
>     .process(new EventTimerOrderProcessFunction())
>
>     .sinkTo(setupSink());
>
>
>
> Is the understanding correct that the based on the WatermarkStrategy I
> have, multiple operators will keep emitting *LONG.MIN_VALUE - threshold* if
> no events are read for those partitions, causing the downstream keyBy
> operator also to emit *LONG.MIN_VALUE - threshold* watermark (as the min
> of all watermarks it sees from its input map operators) and so the window
> doesn’t close at all? If yes, what is the right strategy to handle this? Is
> there a way to combine EventTimeTrigger with ProcessingTimeoutTrigger?
>
>
>
>
>
> --
> Regards,
> Deepthi
>


-- 
Regards,
Deepthi

Reply via email to