I'm trying to use *FlinkKafkaConsumer* and a custom Trigger like explained
here:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge

This my *window assigner* implementation:

class TumblingEventWindowAssigner(WindowAssigner[str, TimeWindow]):
    def __init__(self, size: int, offset: int, is_event_time: bool):
        self._size = size
        self._offset = offset
        self._is_event_time = is_event_time

    def assign_windows(
        self,
        element: str,
        timestamp: int,
        context: WindowAssigner.WindowAssignerContext,
    ) -> Collection[TimeWindow]:
        start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._size)
        return [TimeWindow(start, start + self._size)]

    def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
        return EventTimeTrigger()

    def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
        return TimeWindowSerializer()

    def is_event_time(self) -> bool:
        return self._is_event_time


And this is my *trigger* implementation:

class EventTimeTrigger(Trigger[str, TimeWindow]):
    def on_element(
        self,
        element: str,
        timestamp: int,
        window: TimeWindow,
        ctx: Trigger.TriggerContext,
    ) -> TriggerResult:
        return TriggerResult.CONTINUE

    def on_processing_time(
        self,
        timestamp: int,
        window: TimeWindow,
        ctx: Trigger.TriggerContext,
    ) -> TriggerResult:
        return TriggerResult.CONTINUE

    def on_event_time(
        self,
        timestamp: int,
        window: TimeWindow,
        ctx: Trigger.TriggerContext,
    ) -> TriggerResult:
        if timestamp >= window.max_timestamp():
            return TriggerResult.FIRE_AND_PURGE
        else:
            return TriggerResult.CONTINUE

    def on_merge(
        self,
        window: TimeWindow,
        ctx: Trigger.OnMergeContext,
    ) -> None:
        pass

    def clear(
        self,
        window: TimeWindow,
        ctx: Trigger.TriggerContext,
    ) -> None:
        pass

But the problem is, the Kafka consumer does not read any message unless I
use process time instead and change the *on_processing_time* implementation
to be the same as *on_event_time*.
I'm I doing anything wrong here? How can I use event time properly?

-- 


This e-mail and any attachments may contain information that is 
privileged, confidential,  and/or exempt from disclosure under applicable 
law.  If you are not the intended recipient, you are hereby notified that 
any disclosure, copying, distribution or use of any information contained 
herein is strictly prohibited. If you have received this transmission in 
error, please immediately notify the sender and destroy the original 
transmission and any attachments, whether in electronic or hard copy 
format, without reading or saving.












Reply via email to