Hi Ruby,
which Flink version are you using? When looking into the code of the
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase you
can see that the behavior for using partition discovery or not depends
on the Flink version.
Regards,
Timo
Am 15.05.18 um 02:01 schrieb Ruby Andrews:
Hello,
My team ran into some behavior we did not expect when we tried to get
an existing Flink app to read from a re-sized Kafka. Here are the
highlights:
- We are using the FlinkKafkaConsumer010.
- We re-partitioned (added partitions to) an existing topic that our
Flink app reads so that it the topic has 8 partitions. Following that,
we re-deployed our task managers. We thought that the task managers
would start reading new partitions.
- 8 task managers read from the topic, but they did NOT read all of
the partitions. 3 of the partitions had 2 task managers reading from
them and 3 of the partitions had 0 task managers reading from them. My
team had expected that Flink would automatically read from all
partitions, 1 task manager per partition.
- To force the app to read from all partitions, we added this property
to our kafka consumer properties:
*flink.partition-discovery.interval-millis* and re-deployed the task
managers. We expected this flag to cause Flink to discover (and start
reading) all partitions.
- We did not see a change in the Kafka readers — there were still 3
topics not being read.
- Finally, we changed the ID of the Flink operator that reads the
Kafka topic and re-deployed the task managers again.
- After changing the ID, the app started reading from all partitions.
What is the correct way to pick up partitions after re-partitioning a
Kafka topic?
Thanks,
Ruby