Note that the built-in `BoundedOutOfOrdernessTimestampExtractor` generates
watermarks based only on the timestamp of incoming events.   Without new
events, `BoundedOutOfOrdernessTimestampExtractor` will not advance the
event-time clock. That explains why the window doesn't trigger immediately
after 10s; a new event must arrive to advance the watermark.

As Timo said, a different implementation of `AssignerWithPeriodicWatermark
s` could behave differently.   Have a look at the implementation of `
TimestampsAndPeriodicWatermarksOperator` within Flink to better understand
how `AssignerWithPeriodicWatermarks` is invoked.

On Wed, Aug 2, 2017 at 8:13 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Thanks Timo. Basically my requirement is based on event time the window
> has to be created but the trigger has to happen either when it has received
> the next element >10s or 10s has passed. Exactly the same way as you
> described. Let me try the AssignerWithPeriodicWatermarks approach.
>
> Thanks,
> Govind
>
> On Aug 2, 2017, at 7:46 AM, Timo Walther <twal...@apache.org> wrote:
>
> I forgot about the AssignerWithPeriodicWatermarks [1]. I think it could
> solve your problem easily.
>
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_
> timestamps_watermarks.html#with-periodic-watermarks
>
> Am 02.08.17 um 16:30 schrieb Timo Walther:
>
> The question is what defines your `10 seconds`. In event-time the incoming
> events determine when 10 seconds have passed. Your description sounds like
> you want to have results after 10 seconds wall-clock/processing-time. So
> either you use a processing-time window or you implement a custom trigger
> that triggers both on event-time or on a timer that you have set after 10 s
> processing-time.
>
> Timo
>
>
> Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan:
>
> Thanks Timo. The next message will arrive only after a minute or so. Is
> there a way to evict whatever is there in window buffer just after 10
> seconds irrespective of whether a new message arrives or not.
>
> Thanks,
> Govind
>
> On Aug 2, 2017, at 6:56 AM, Timo Walther <twal...@apache.org> wrote:
>
> Hi Govind,
>
> if the window is not triggered, this usually indicates that your timestamp
> and watermark assignment is not correct. According to your description, I
> don't think that you need a custom trigger/evictor. How often do events
> arrive from one device? There must be another event from the same device
> that has a timestamp >10s in order to trigger the window evaluation.
>
> Instead of using the Kafka timestamp, maybe you could also convert your
> timestamps to UTC in the TimestampExtractor.
>
> There are no official limitation. However, each window comes with some
> overhead. So you should choose your memory/state backends and parallelism
> accordingly.
>
> Hope that helps.
>
> Timo
>
>
> Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:
>
> Hi,
>
> I have few questions regarding event time windowing. My scenario is
> devices from various timezones will send messages with timestamp and I need
> to create a window per device for 10 seconds. The messages will mostly
> arrive in order.
>
> Here is my sample code to perform windowing and aggregating the messages
> after the window to further process it.
>
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
>                     new DeserializationSchema(),
>                     kafkaConsumerProperties);
>
> DataStream<Message> msgStream = streamEnv
> .addSource(consumer)
> .assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100,
> TimeUnit.MILLISECONDS))); // TimestampExtractor implements
> BoundedOutOfOrdernessTimestampExtractor
>
> KeyedStream<Message, String> keyByStream = msgStream.keyBy(new
> CustomKeySelector());
>
> WindowedStream<Message, String, TimeWindow> windowedStream =
>         keyByStream.window(TumblingEventTimeWindows.of(
> org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
>
> SingleOutputStreamOperator<Message> aggregatedStream =
> windowedStream.apply(new AggregateEntries());
>
> My questions are
>
> - In the above code, data gets passed till the window function but even
> after window time the data is not received in the apply function. Do I have
> to supply a custom evictor or trigger?
>
> - Since the data is being received from multiple timezones and each device
> will have some time difference, would it be ok to assign the timestamp as
> that of received timestamp in the message at source (kafka). Will there be
> any issues with this?
>
> - Are there any limitations on the number of time windows that can be
> created at any given time? In my scenario if there are 1 million devices
> there will be 1 million tumbling windows.
>
> Thanks,
> Govind
>
>
>
>
>

Reply via email to