Ahhh looks like I had simply misunderstood where that property should go. The docs correctly say: > To enable it, set a non-negative value for flink.partition-discovery.interval-millis in the __provided properties config__
So it should be set in the Properties that are passed in the constructor of FlinkKafkaConsumer! I had somehow assumed that this should go to flink-conf.yaml (maybe because it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't read that. Sorry for the trouble. If anything, I guess a piece of example code might've helped me avoid this mistake. The docs are clear though, I just had become blind to this detail as I thought I had already read it. On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio <juho.au...@rovio.com> wrote: > Still not working after I had a fresh build from https://github.com/ > apache/flink/tree/release-1.5. > > When the job starts this is logged: > > 2018-04-05 09:29:38,157 INFO > org.apache.flink.configuration.GlobalConfiguration > - Loading configuration property: > flink.partition-discovery.interval-millis, > 60000 > > So that's 1 minute. > > As before, I added one more partition to a topic that is being consumed. > Secor started consuming it as expected, but Flink didn't – or at least it > isn't reporting anything about doing so. The new partition is not shown in > Flink task metrics or consumer offsets committed by Flink. > > How could I investigate this further? How about that additional logging > for partition discovery? > > On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi, >> >> I think you’ve made a good point: there is currently no logs that tell >> anything about discovering a new partition. We should probably add this. >> >> And yes, it would be great if you can report back on this using either >> the latest master, release-1.5 or release-1.4 branches. >> >> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.au...@rovio.com) wrote: >> >> Thanks, that sounds promising. I don't know how to check if it's >> consuming all partitions? For example I couldn't find any logs about >> discovering a new partition. However, did I understand correctly that this >> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try >> again. >> >> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org >> > wrote: >> >>> Hi Juho, >>> >>> Can you confirm that the new partition is consumed, but only that >>> Flink’s reported metrics do not include them? >>> If yes, then I think your observations can be explained by this issue: >>> https://issues.apache.org/jira/browse/FLINK-8419 >>> >>> <https://issues.apache.org/jira/browse/FLINK-8419> >>> This issue should have been fixed in the recently released 1.4.2 version. >>> >>> Cheers, >>> Gordon >>> >>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.au...@rovio.com) wrote: >>> >>> According to the docs*, flink.partition-discovery.interval-millis can >>> be set to enable automatic partition discovery. >>> >>> I'm testing this, apparently it doesn't work. >>> >>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508 >>> and FlinkKafkaConsumer010. >>> >>> I had my flink stream running, consuming an existing topic with 3 >>> partitions, among some other topics. >>> I modified partitions of an existing topic: 3 -> 4**. >>> I checked consumer offsets by secor: it's now consuming all 4 partitions. >>> I checked consumer offset by my flink stream: it's still consuming only >>> the 3 original partitions. >>> >>> I also checked the Task Metrics of this job from Flink UI and it only >>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2). >>> >>> According to Flink UI > Job Manager > Configuration: >>> flink.partition-discovery.interval-millis=60000 >>> – so that's just 1 minute. It's already more than 20 minutes since I >>> added the new partition, so Flink should've picked it up. >>> >>> How to debug? >>> >>> >>> Btw, this job has external checkpoints enabled, done once per minute. >>> Those are also succeeding. >>> >>> *) https://ci.apache.org/projects/flink/flink-docs-master/dev/c >>> onnectors/kafka.html#kafka-consumers-topic-and-partition-discovery >>> >>> **) >>> >>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe >>> --topic my_topic >>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs: >>> >>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic >>> my_topic --partitions 4 >>> Adding partitions succeeded! >>> >>> >>> >> >