I assumed that the event time and watermarks were already handled by the
Kafka connector.

So, basically, I need to do something like:

stream.assign_timestamps_and_watermarks(

WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(30))
)

Do I also need to set the timestamps myself by calling
with_timestamp_assigner ?
Wasn't the event timestamps already written by the Kafka source?


On Tue, Aug 3, 2021 at 12:31 PM Nicolaus Weidner <
nicolaus.weid...@ververica.com> wrote:

> Hi Ignacio,
>
> I have no experience with the Python API, but just to make sure: Your
> events do contain some timestamp, and you defined how to assign a timestamp
> to an element and generate watermarks? I can't find this in the Python API
> docs, so [1] is a link to the Java Datastream API docs.
>
> Best,
> Nico
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/
>
> On Tue, Aug 3, 2021 at 2:34 PM Ignacio Taranto <
> ignacio.tara...@eclypsium.com> wrote:
>
>> I'm trying to so a simple word count example,  here's the main function:
>>
>> def main():
>>     parser = argparse.ArgumentParser()
>>     parser.add_argument('--kafka-clients-jar', required=True)
>>     parser.add_argument('--flink-connector-kafka-jar', required=True)
>>     args = parser.parse_args()
>>
>>     env = StreamExecutionEnvironment.get_execution_environment()
>>     env.add_jars(
>>         f'file://{args.kafka_clients_jar}',
>>         f'file://{args.flink_connector_kafka_jar}',
>>     )
>>
>>     stream = env.add_source(
>>         FlinkKafkaConsumer(
>>             'flink_read_topic',
>>             SimpleStringSchema(),
>>             {'bootstrap.servers': 'f{broker_url}'},
>>         ),
>>     )
>>
>>     stream = stream \
>>         .flat_map(lambda x: [word for word in x.split(' ')],
>> Types.STRING()) \
>>         .key_by(lambda x: x) \
>>         .window(TumblingEventWindowAssigner(10000, 0, True)) \
>>         .process(CountWordsFunction(), Types.TUPLE([Types.STRING(),
>> Types.INT()])) \
>>         .map(lambda x: f'word: \'{x[0]}\' count: \'{x[1]}\'',
>> Types.STRING())
>>
>>     stream.add_sink(
>>         FlinkKafkaProducer(
>>             'flink_write_topic',
>>             SimpleStringSchema(),
>>             {'bootstrap.servers': 'f{broker_url}'},
>>         ),
>>     )
>>
>>     env.execute('word_count')
>>
>> And here's my window function:
>>
>> class CountWordsFunction(ProcessWindowFunction[str, Tuple[str, int], str,
>> TimeWindow]):
>>     def process(
>>         self,
>>         key: str,
>>         content: ProcessWindowFunction.Context,
>>         elements: Iterable[str],
>>     ) -> Iterable[Tuple[str, int]]:
>>         yield (key, sum(1 for elem in elements))
>>
>>     def clear(self, context: ProcessWindowFunction.Context) -> None:
>>         pass
>>
>>
>> I put some prints to debug and it seems that the assign_windows method
>> is never called.
>>
>> On Mon, Aug 2, 2021 at 11:09 PM Dian Fu <dian0511...@gmail.com> wrote:
>>
>>> Regarding "Kafka consumer doesn’t read any message”: I’m wondering about
>>> this. Usually the processing logic should not affect the Kafka consumer.
>>> Did you judge this as there is no output for the job? If so, I’m guessing
>>> that it’s because the window wasn’t triggered in case of event-time.
>>>
>>> Could you share more code snippet?
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年8月2日 下午11:40,Ignacio Taranto <ignacio.tara...@eclypsium.com> 写道:
>>>
>>> I'm trying to use *FlinkKafkaConsumer* and a custom Trigger like
>>> explained here:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge
>>>
>>> This my *window assigner* implementation:
>>>
>>> class TumblingEventWindowAssigner(WindowAssigner[str, TimeWindow]):
>>>     def __init__(self, size: int, offset: int, is_event_time: bool):
>>>         self._size = size
>>>         self._offset = offset
>>>         self._is_event_time = is_event_time
>>>
>>>     def assign_windows(
>>>         self,
>>>         element: str,
>>>         timestamp: int,
>>>         context: WindowAssigner.WindowAssignerContext,
>>>     ) -> Collection[TimeWindow]:
>>>         start = TimeWindow.get_window_start_with_offset(timestamp,
>>> self._offset, self._size)
>>>         return [TimeWindow(start, start + self._size)]
>>>
>>>     def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
>>>         return EventTimeTrigger()
>>>
>>>     def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
>>>         return TimeWindowSerializer()
>>>
>>>     def is_event_time(self) -> bool:
>>>         return self._is_event_time
>>>
>>>
>>> And this is my *trigger* implementation:
>>>
>>> class EventTimeTrigger(Trigger[str, TimeWindow]):
>>>     def on_element(
>>>         self,
>>>         element: str,
>>>         timestamp: int,
>>>         window: TimeWindow,
>>>         ctx: Trigger.TriggerContext,
>>>     ) -> TriggerResult:
>>>         return TriggerResult.CONTINUE
>>>
>>>     def on_processing_time(
>>>         self,
>>>         timestamp: int,
>>>         window: TimeWindow,
>>>         ctx: Trigger.TriggerContext,
>>>     ) -> TriggerResult:
>>>         return TriggerResult.CONTINUE
>>>
>>>     def on_event_time(
>>>         self,
>>>         timestamp: int,
>>>         window: TimeWindow,
>>>         ctx: Trigger.TriggerContext,
>>>     ) -> TriggerResult:
>>>         if timestamp >= window.max_timestamp():
>>>             return TriggerResult.FIRE_AND_PURGE
>>>         else:
>>>             return TriggerResult.CONTINUE
>>>
>>>     def on_merge(
>>>         self,
>>>         window: TimeWindow,
>>>         ctx: Trigger.OnMergeContext,
>>>     ) -> None:
>>>         pass
>>>
>>>     def clear(
>>>         self,
>>>         window: TimeWindow,
>>>         ctx: Trigger.TriggerContext,
>>>     ) -> None:
>>>         pass
>>>
>>> But the problem is, the Kafka consumer does not read any message unless
>>> I use process time instead and change the *on_processing_time*
>>> implementation to be the same as *on_event_time*.
>>> I'm I doing anything wrong here? How can I use event time properly?
>>>
>>> This e-mail and any attachments may contain information that is
>>> privileged, confidential,  and/or exempt from disclosure under applicable
>>> law.  If you are not the intended recipient, you are hereby notified
>>> that any disclosure, copying, distribution or use of any information
>>> contained herein is strictly prohibited. If you have received this
>>> transmission in error, please immediately notify the sender and destroy the
>>> original transmission and any attachments, whether in electronic or hard
>>> copy format, without reading or saving.
>>>
>>>
>>>
>> This e-mail and any attachments may contain information that is
>> privileged, confidential,  and/or exempt from disclosure under applicable
>> law.  If you are not the intended recipient, you are hereby notified
>> that any disclosure, copying, distribution or use of any information
>> contained herein is strictly prohibited. If you have received this
>> transmission in error, please immediately notify the sender and destroy the
>> original transmission and any attachments, whether in electronic or hard
>> copy format, without reading or saving.
>>
>>

-- 


This e-mail and any attachments may contain information that is 
privileged, confidential,  and/or exempt from disclosure under applicable 
law.  If you are not the intended recipient, you are hereby notified that 
any disclosure, copying, distribution or use of any information contained 
herein is strictly prohibited. If you have received this transmission in 
error, please immediately notify the sender and destroy the original 
transmission and any attachments, whether in electronic or hard copy 
format, without reading or saving.












Reply via email to