I'm trying to so a simple word count example, here's the main function: def main(): parser = argparse.ArgumentParser() parser.add_argument('--kafka-clients-jar', required=True) parser.add_argument('--flink-connector-kafka-jar', required=True) args = parser.parse_args()
env = StreamExecutionEnvironment.get_execution_environment() env.add_jars( f'file://{args.kafka_clients_jar}', f'file://{args.flink_connector_kafka_jar}', ) stream = env.add_source( FlinkKafkaConsumer( 'flink_read_topic', SimpleStringSchema(), {'bootstrap.servers': 'f{broker_url}'}, ), ) stream = stream \ .flat_map(lambda x: [word for word in x.split(' ')], Types.STRING()) \ .key_by(lambda x: x) \ .window(TumblingEventWindowAssigner(10000, 0, True)) \ .process(CountWordsFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \ .map(lambda x: f'word: \'{x[0]}\' count: \'{x[1]}\'', Types.STRING()) stream.add_sink( FlinkKafkaProducer( 'flink_write_topic', SimpleStringSchema(), {'bootstrap.servers': 'f{broker_url}'}, ), ) env.execute('word_count') And here's my window function: class CountWordsFunction(ProcessWindowFunction[str, Tuple[str, int], str, TimeWindow]): def process( self, key: str, content: ProcessWindowFunction.Context, elements: Iterable[str], ) -> Iterable[Tuple[str, int]]: yield (key, sum(1 for elem in elements)) def clear(self, context: ProcessWindowFunction.Context) -> None: pass I put some prints to debug and it seems that the assign_windows method is never called. On Mon, Aug 2, 2021 at 11:09 PM Dian Fu <dian0511...@gmail.com> wrote: > Regarding "Kafka consumer doesn’t read any message”: I’m wondering about > this. Usually the processing logic should not affect the Kafka consumer. > Did you judge this as there is no output for the job? If so, I’m guessing > that it’s because the window wasn’t triggered in case of event-time. > > Could you share more code snippet? > > Regards, > Dian > > 2021年8月2日 下午11:40,Ignacio Taranto <ignacio.tara...@eclypsium.com> 写道: > > I'm trying to use *FlinkKafkaConsumer* and a custom Trigger like > explained here: > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge > > This my *window assigner* implementation: > > class TumblingEventWindowAssigner(WindowAssigner[str, TimeWindow]): > def __init__(self, size: int, offset: int, is_event_time: bool): > self._size = size > self._offset = offset > self._is_event_time = is_event_time > > def assign_windows( > self, > element: str, > timestamp: int, > context: WindowAssigner.WindowAssignerContext, > ) -> Collection[TimeWindow]: > start = TimeWindow.get_window_start_with_offset(timestamp, > self._offset, self._size) > return [TimeWindow(start, start + self._size)] > > def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]: > return EventTimeTrigger() > > def get_window_serializer(self) -> TypeSerializer[TimeWindow]: > return TimeWindowSerializer() > > def is_event_time(self) -> bool: > return self._is_event_time > > > And this is my *trigger* implementation: > > class EventTimeTrigger(Trigger[str, TimeWindow]): > def on_element( > self, > element: str, > timestamp: int, > window: TimeWindow, > ctx: Trigger.TriggerContext, > ) -> TriggerResult: > return TriggerResult.CONTINUE > > def on_processing_time( > self, > timestamp: int, > window: TimeWindow, > ctx: Trigger.TriggerContext, > ) -> TriggerResult: > return TriggerResult.CONTINUE > > def on_event_time( > self, > timestamp: int, > window: TimeWindow, > ctx: Trigger.TriggerContext, > ) -> TriggerResult: > if timestamp >= window.max_timestamp(): > return TriggerResult.FIRE_AND_PURGE > else: > return TriggerResult.CONTINUE > > def on_merge( > self, > window: TimeWindow, > ctx: Trigger.OnMergeContext, > ) -> None: > pass > > def clear( > self, > window: TimeWindow, > ctx: Trigger.TriggerContext, > ) -> None: > pass > > But the problem is, the Kafka consumer does not read any message unless I > use process time instead and change the *on_processing_time* > implementation to be the same as *on_event_time*. > I'm I doing anything wrong here? How can I use event time properly? > > This e-mail and any attachments may contain information that is > privileged, confidential, and/or exempt from disclosure under applicable > law. If you are not the intended recipient, you are hereby notified > that any disclosure, copying, distribution or use of any information > contained herein is strictly prohibited. If you have received this > transmission in error, please immediately notify the sender and destroy the > original transmission and any attachments, whether in electronic or hard > copy format, without reading or saving. > > > -- This e-mail and any attachments may contain information that is privileged, confidential, and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution or use of any information contained herein is strictly prohibited. If you have received this transmission in error, please immediately notify the sender and destroy the original transmission and any attachments, whether in electronic or hard copy format, without reading or saving.