I'm trying to use *FlinkKafkaConsumer* and a custom Trigger like explained here: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge
This my *window assigner* implementation: class TumblingEventWindowAssigner(WindowAssigner[str, TimeWindow]): def __init__(self, size: int, offset: int, is_event_time: bool): self._size = size self._offset = offset self._is_event_time = is_event_time def assign_windows( self, element: str, timestamp: int, context: WindowAssigner.WindowAssignerContext, ) -> Collection[TimeWindow]: start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._size) return [TimeWindow(start, start + self._size)] def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]: return EventTimeTrigger() def get_window_serializer(self) -> TypeSerializer[TimeWindow]: return TimeWindowSerializer() def is_event_time(self) -> bool: return self._is_event_time And this is my *trigger* implementation: class EventTimeTrigger(Trigger[str, TimeWindow]): def on_element( self, element: str, timestamp: int, window: TimeWindow, ctx: Trigger.TriggerContext, ) -> TriggerResult: return TriggerResult.CONTINUE def on_processing_time( self, timestamp: int, window: TimeWindow, ctx: Trigger.TriggerContext, ) -> TriggerResult: return TriggerResult.CONTINUE def on_event_time( self, timestamp: int, window: TimeWindow, ctx: Trigger.TriggerContext, ) -> TriggerResult: if timestamp >= window.max_timestamp(): return TriggerResult.FIRE_AND_PURGE else: return TriggerResult.CONTINUE def on_merge( self, window: TimeWindow, ctx: Trigger.OnMergeContext, ) -> None: pass def clear( self, window: TimeWindow, ctx: Trigger.TriggerContext, ) -> None: pass But the problem is, the Kafka consumer does not read any message unless I use process time instead and change the *on_processing_time* implementation to be the same as *on_event_time*. I'm I doing anything wrong here? How can I use event time properly? -- This e-mail and any attachments may contain information that is privileged, confidential, and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution or use of any information contained herein is strictly prohibited. If you have received this transmission in error, please immediately notify the sender and destroy the original transmission and any attachments, whether in electronic or hard copy format, without reading or saving.