The question is what defines your `10 seconds`. In event-time the
incoming events determine when 10 seconds have passed. Your description
sounds like you want to have results after 10 seconds
wall-clock/processing-time. So either you use a processing-time window
or you implement a custom trigger that triggers both on event-time or on
a timer that you have set after 10 s processing-time.
Timo
Am 02.08.17 um 16:20 schrieb Govindarajan Srinivasaraghavan:
Thanks Timo. The next message will arrive only after a minute or so.
Is there a way to evict whatever is there in window buffer just after
10 seconds irrespective of whether a new message arrives or not.
Thanks,
Govind
On Aug 2, 2017, at 6:56 AM, Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Govind,
if the window is not triggered, this usually indicates that your
timestamp and watermark assignment is not correct. According to your
description, I don't think that you need a custom trigger/evictor.
How often do events arrive from one device? There must be another
event from the same device that has a timestamp >10s in order to
trigger the window evaluation.
Instead of using the Kafka timestamp, maybe you could also convert
your timestamps to UTC in the TimestampExtractor.
There are no official limitation. However, each window comes with
some overhead. So you should choose your memory/state backends and
parallelism accordingly.
Hope that helps.
Timo
Am 02.08.17 um 06:54 schrieb Govindarajan Srinivasaraghavan:
Hi,
I have few questions regarding event time windowing. My scenario is
devices from various timezones will send messages with timestamp and
I need to create a window per device for 10 seconds. The messages
will mostly arrive in order.
Here is my sample code to perform windowing and aggregating the
messages after the window to further process it.
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("STREAM1",
new DeserializationSchema(),
kafkaConsumerProperties);
DataStream<Message> msgStream = streamEnv
.addSource(consumer)
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.of(100,
TimeUnit.MILLISECONDS))); // TimestampExtractor implements
BoundedOutOfOrdernessTimestampExtractor
KeyedStream<Message, String> keyByStream = msgStream.keyBy(new
CustomKeySelector());
WindowedStream<Message, String, TimeWindow> windowedStream =
keyByStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
SingleOutputStreamOperator<Message> aggregatedStream =
windowedStream.apply(new AggregateEntries());
My questions are
- In the above code, data gets passed till the window function but
even after window time the data is not received in the apply
function. Do I have to supply a custom evictor or trigger?
- Since the data is being received from multiple timezones and each
device will have some time difference, would it be ok to assign the
timestamp as that of received timestamp in the message at source
(kafka). Will there be any issues with this?
- Are there any limitations on the number of time windows that can
be created at any given time? In my scenario if there are 1 million
devices there will be 1 million tumbling windows.
Thanks,
Govind