Hi Ignacio, Yes, you are right that you need to define the watermark strategy explicitly in case of event time processing.
Regarding to *with_timestamp_assigner*, this is optional. If you don’t define it, it will generate watermark according to the timestamp extracted from the Kafka record (ConsumerRecord). If you want to generate watermark according to some column from the data, you need to define it explicitly. Regards, Dian > 2021年8月4日 上午2:10,Ignacio Taranto <ignacio.tara...@eclypsium.com> 写道: > > I assumed that the event time and watermarks were already handled by the > Kafka connector. > > So, basically, I need to do something like: > > stream.assign_timestamps_and_watermarks( > > WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(30)) > ) > > Do I also need to set the timestamps myself by calling > with_timestamp_assigner ? > Wasn't the event timestamps already written by the Kafka source? > > > On Tue, Aug 3, 2021 at 12:31 PM Nicolaus Weidner > <nicolaus.weid...@ververica.com <mailto:nicolaus.weid...@ververica.com>> > wrote: > Hi Ignacio, > > I have no experience with the Python API, but just to make sure: Your events > do contain some timestamp, and you defined how to assign a timestamp to an > element and generate watermarks? I can't find this in the Python API docs, so > [1] is a link to the Java Datastream API docs. > > Best, > Nico > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/ > > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/> > On Tue, Aug 3, 2021 at 2:34 PM Ignacio Taranto <ignacio.tara...@eclypsium.com > <mailto:ignacio.tara...@eclypsium.com>> wrote: > I'm trying to so a simple word count example, here's the main function: > > def main(): > parser = argparse.ArgumentParser() > parser.add_argument('--kafka-clients-jar', required=True) > parser.add_argument('--flink-connector-kafka-jar', required=True) > args = parser.parse_args() > > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars( > f'file://{args.kafka_clients_jar}', > f'file://{args.flink_connector_kafka_jar}', > ) > > stream = env.add_source( > FlinkKafkaConsumer( > 'flink_read_topic', > SimpleStringSchema(), > {'bootstrap.servers': 'f{broker_url}'}, > ), > ) > > stream = stream \ > .flat_map(lambda x: [word for word in x.split(' ')], Types.STRING()) \ > .key_by(lambda x: x) \ > .window(TumblingEventWindowAssigner(10000, 0, True)) \ > .process(CountWordsFunction(), Types.TUPLE([Types.STRING(), > Types.INT()])) \ > .map(lambda x: f'word: \'{x[0]}\' count: \'{x[1]}\'', Types.STRING()) > > stream.add_sink( > FlinkKafkaProducer( > 'flink_write_topic', > SimpleStringSchema(), > {'bootstrap.servers': 'f{broker_url}'}, > ), > ) > > env.execute('word_count') > > And here's my window function: > > class CountWordsFunction(ProcessWindowFunction[str, Tuple[str, int], str, > TimeWindow]): > def process( > self, > key: str, > content: ProcessWindowFunction.Context, > elements: Iterable[str], > ) -> Iterable[Tuple[str, int]]: > yield (key, sum(1 for elem in elements)) > > def clear(self, context: ProcessWindowFunction.Context) -> None: > pass > > > I put some prints to debug and it seems that the assign_windows method is > never called. > > On Mon, Aug 2, 2021 at 11:09 PM Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > Regarding "Kafka consumer doesn’t read any message”: I’m wondering about > this. Usually the processing logic should not affect the Kafka consumer. Did > you judge this as there is no output for the job? If so, I’m guessing that > it’s because the window wasn’t triggered in case of event-time. > > Could you share more code snippet? > > Regards, > Dian > >> 2021年8月2日 下午11:40,Ignacio Taranto <ignacio.tara...@eclypsium.com >> <mailto:ignacio.tara...@eclypsium.com>> 写道: >> >> I'm trying to use FlinkKafkaConsumer and a custom Trigger like explained >> here: >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge> >> >> This my window assigner implementation: >> >> class TumblingEventWindowAssigner(WindowAssigner[str, TimeWindow]): >> def __init__(self, size: int, offset: int, is_event_time: bool): >> self._size = size >> self._offset = offset >> self._is_event_time = is_event_time >> >> def assign_windows( >> self, >> element: str, >> timestamp: int, >> context: WindowAssigner.WindowAssignerContext, >> ) -> Collection[TimeWindow]: >> start = TimeWindow.get_window_start_with_offset(timestamp, >> self._offset, self._size) >> return [TimeWindow(start, start + self._size)] >> >> def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]: >> return EventTimeTrigger() >> >> def get_window_serializer(self) -> TypeSerializer[TimeWindow]: >> return TimeWindowSerializer() >> >> def is_event_time(self) -> bool: >> return self._is_event_time >> >> >> And this is my trigger implementation: >> >> class EventTimeTrigger(Trigger[str, TimeWindow]): >> def on_element( >> self, >> element: str, >> timestamp: int, >> window: TimeWindow, >> ctx: Trigger.TriggerContext, >> ) -> TriggerResult: >> return TriggerResult.CONTINUE >> >> def on_processing_time( >> self, >> timestamp: int, >> window: TimeWindow, >> ctx: Trigger.TriggerContext, >> ) -> TriggerResult: >> return TriggerResult.CONTINUE >> >> def on_event_time( >> self, >> timestamp: int, >> window: TimeWindow, >> ctx: Trigger.TriggerContext, >> ) -> TriggerResult: >> if timestamp >= window.max_timestamp(): >> return TriggerResult.FIRE_AND_PURGE >> else: >> return TriggerResult.CONTINUE >> >> def on_merge( >> self, >> window: TimeWindow, >> ctx: Trigger.OnMergeContext, >> ) -> None: >> pass >> >> def clear( >> self, >> window: TimeWindow, >> ctx: Trigger.TriggerContext, >> ) -> None: >> pass >> >> But the problem is, the Kafka consumer does not read any message unless I >> use process time instead and change the on_processing_time implementation to >> be the same as on_event_time. >> I'm I doing anything wrong here? How can I use event time properly? >> >> This e-mail and any attachments may contain information that is privileged, >> confidential, and/or exempt from disclosure under applicable law. If you >> are not the intended recipient, you are hereby notified that any disclosure, >> copying, distribution or use of any information contained herein is strictly >> prohibited. If you have received this transmission in error, please >> immediately notify the sender and destroy the original transmission and any >> attachments, whether in electronic or hard copy format, without reading or >> saving. >> >> > > > This e-mail and any attachments may contain information that is privileged, > confidential, and/or exempt from disclosure under applicable law. If you > are not the intended recipient, you are hereby notified that any disclosure, > copying, distribution or use of any information contained herein is strictly > prohibited. If you have received this transmission in error, please > immediately notify the sender and destroy the original transmission and any > attachments, whether in electronic or hard copy format, without reading or > saving. > > > > This e-mail and any attachments may contain information that is privileged, > confidential, and/or exempt from disclosure under applicable law. If you > are not the intended recipient, you are hereby notified that any disclosure, > copying, distribution or use of any information contained herein is strictly > prohibited. If you have received this transmission in error, please > immediately notify the sender and destroy the original transmission and any > attachments, whether in electronic or hard copy format, without reading or > saving. > >