Hi,
When you join multiple stream with different watermarks,
the resulting stream's watermark will be the smallest of the input
watermark,
as long as you don't explicitly assign a new watermarks generator.
In your example, if small_topic has watermark at time t1, big_topic has
watermark at time t2,
with t1 > t2 due to small_topic being consumed faster. If you join the
two stream into a single big_and_small_stream,
then big_and_small_stream will have watermark at time t2, so no message
from big_topic will be lost due to lateness.
Regards,
Kien
On 11/22/2017 4:36 PM, Juho Autio wrote:
I would like to understand how FlinkKafkaConsumer treats "unbalanced"
topics.
We're using FlinkKafkaConsumer010 with 2 topics, say "small_topic" &
"big_topic".
After restoring from an old savepoint (4 hours before), I checked the
consumer offsets on Kafka (Flink commits offsets to kafka for
reference – checkpointing is enabled so in reality Flink manages the
offsets internally). It seems that the offsets for "small_topic" are
quickly caught up and committed to the latest current offset. However,
catching up with the "big_topic" takes much longer.
Is it possible that watermarks are determined based on the
"small_topic" and messages that are read from the "big_topic" are
getting discarded (ie. excluded from the triggered windows) if they
have too old event time timestamps?
Or how does FlinkKafkaConsumer handle this? Does it somehow
synchronize reading based on the extracted timestamp:
- across partitions of a single topic?
- across topics?
Our code is basically:
env
.addSource(new FlinkKafkaConsumer010<>(
Arrays.asList("big_topic", "small_topic"),
new EventMapSchema(),
props))
.assignTimestampsAndWatermarks(
new
OufOfOrderTimestampExtractor("timestamp_field",
Time.seconds(cfg.getMaxOutOfOrdernessInSeconds())));
Finally, I don't think this problem is only related to restoring state
and having to catch up. It just makes it more prominent. This could
also happen during normal streaming, if consuming from bigger topics
is slow enough?
Thanks!