Regarding "Kafka consumer doesn’t read any message”: I’m wondering about this. 
Usually the processing logic should not affect the Kafka consumer. Did you 
judge this as there is no output for the job? If so, I’m guessing that it’s 
because the window wasn’t triggered in case of event-time.

Could you share more code snippet?

Regards,
Dian

> 2021年8月2日 下午11:40,Ignacio Taranto <ignacio.tara...@eclypsium.com> 写道:
> 
> I'm trying to use FlinkKafkaConsumer and a custom Trigger like explained here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge>
> 
> This my window assigner implementation:
> 
> class TumblingEventWindowAssigner(WindowAssigner[str, TimeWindow]):
>     def __init__(self, size: int, offset: int, is_event_time: bool):
>         self._size = size
>         self._offset = offset
>         self._is_event_time = is_event_time
> 
>     def assign_windows(
>         self,
>         element: str,
>         timestamp: int,
>         context: WindowAssigner.WindowAssignerContext,
>     ) -> Collection[TimeWindow]:
>         start = TimeWindow.get_window_start_with_offset(timestamp, 
> self._offset, self._size)
>         return [TimeWindow(start, start + self._size)]
> 
>     def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
>         return EventTimeTrigger()
> 
>     def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
>         return TimeWindowSerializer()
> 
>     def is_event_time(self) -> bool:
>         return self._is_event_time
> 
> 
> And this is my trigger implementation:
> 
> class EventTimeTrigger(Trigger[str, TimeWindow]):
>     def on_element(
>         self,
>         element: str,
>         timestamp: int,
>         window: TimeWindow,
>         ctx: Trigger.TriggerContext,
>     ) -> TriggerResult:
>         return TriggerResult.CONTINUE
> 
>     def on_processing_time(
>         self,
>         timestamp: int,
>         window: TimeWindow,
>         ctx: Trigger.TriggerContext,
>     ) -> TriggerResult:
>         return TriggerResult.CONTINUE
> 
>     def on_event_time(
>         self,
>         timestamp: int,
>         window: TimeWindow,
>         ctx: Trigger.TriggerContext,
>     ) -> TriggerResult:
>         if timestamp >= window.max_timestamp():
>             return TriggerResult.FIRE_AND_PURGE
>         else:
>             return TriggerResult.CONTINUE
> 
>     def on_merge(
>         self,
>         window: TimeWindow,
>         ctx: Trigger.OnMergeContext,
>     ) -> None:
>         pass
> 
>     def clear(
>         self,
>         window: TimeWindow,
>         ctx: Trigger.TriggerContext,
>     ) -> None:
>         pass
> 
> But the problem is, the Kafka consumer does not read any message unless I use 
> process time instead and change the on_processing_time implementation to be 
> the same as on_event_time.
> I'm I doing anything wrong here? How can I use event time properly?
> 
> This e-mail and any attachments may contain information that is privileged, 
> confidential,  and/or exempt from disclosure under applicable law.  If you 
> are not the intended recipient, you are hereby notified that any disclosure, 
> copying, distribution or use of any information contained herein is strictly 
> prohibited. If you have received this transmission in error, please 
> immediately notify the sender and destroy the original transmission and any 
> attachments, whether in electronic or hard copy format, without reading or 
> saving.
> 
> 

Reply via email to