Ahhh looks like I had simply misunderstood where that property should go.

The docs correctly say:
> To enable it, set a non-negative value for
flink.partition-discovery.interval-millis in the __provided properties
config__

So it should be set in the Properties that are passed in the constructor of
FlinkKafkaConsumer!

I had somehow assumed that this should go to flink-conf.yaml (maybe because
it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't
read that.

Sorry for the trouble. If anything, I guess a piece of example code
might've helped me avoid this mistake. The docs are clear though, I just
had become blind to this detail as I thought I had already read it.

On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio <juho.au...@rovio.com> wrote:

> Still not working after I had a fresh build from https://github.com/
> apache/flink/tree/release-1.5.
>
> When the job starts this is logged:
>
> 2018-04-05 09:29:38,157 INFO  
> org.apache.flink.configuration.GlobalConfiguration
>           - Loading configuration property: 
> flink.partition-discovery.interval-millis,
> 60000
>
> So that's 1 minute.
>
> As before, I added one more partition to a topic that is being consumed.
> Secor started consuming it as expected, but Flink didn't – or at least it
> isn't reporting anything about doing so. The new partition is not shown in
> Flink task metrics or consumer offsets committed by Flink.
>
> How could I investigate this further? How about that additional logging
> for partition discovery?
>
> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi,
>>
>> I think you’ve made a good point: there is currently no logs that tell
>> anything about discovering a new partition. We should probably add this.
>>
>> And yes, it would be great if you can report back on this using either
>> the latest master, release-1.5 or release-1.4 branches.
>>
>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.au...@rovio.com) wrote:
>>
>> Thanks, that sounds promising. I don't know how to check if it's
>> consuming all partitions? For example I couldn't find any logs about
>> discovering a new partition. However, did I understand correctly that this
>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try
>> again.
>>
>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
>> > wrote:
>>
>>> Hi Juho,
>>>
>>> Can you confirm that the new partition is consumed, but only that
>>> Flink’s reported metrics do not include them?
>>> If yes, then I think your observations can be explained by this issue:
>>> https://issues.apache.org/jira/browse/FLINK-8419
>>>
>>> <https://issues.apache.org/jira/browse/FLINK-8419>
>>> This issue should have been fixed in the recently released 1.4.2 version.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.au...@rovio.com) wrote:
>>>
>>> According to the docs*, flink.partition-discovery.interval-millis can
>>> be set to enable automatic partition discovery.
>>>
>>> I'm testing this, apparently it doesn't work.
>>>
>>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
>>> and FlinkKafkaConsumer010.
>>>
>>> I had my flink stream running, consuming an existing topic with 3
>>> partitions, among some other topics.
>>> I modified partitions of an existing topic: 3 -> 4**.
>>> I checked consumer offsets by secor: it's now consuming all 4 partitions.
>>> I checked consumer offset by my flink stream: it's still consuming only
>>> the 3 original partitions.
>>>
>>> I also checked the Task Metrics of this job from Flink UI and it only
>>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2).
>>>
>>> According to Flink UI > Job Manager > Configuration:
>>> flink.partition-discovery.interval-millis=60000
>>> – so that's just 1 minute. It's already more than 20 minutes since I
>>> added the new partition, so Flink should've picked it up.
>>>
>>> How to debug?
>>>
>>>
>>> Btw, this job has external checkpoints enabled, done once per minute.
>>> Those are also succeeding.
>>>
>>> *) https://ci.apache.org/projects/flink/flink-docs-master/dev/c
>>> onnectors/kafka.html#kafka-consumers-topic-and-partition-discovery
>>>
>>> **)
>>>
>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
>>> --topic my_topic
>>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:
>>>
>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
>>>  my_topic --partitions 4
>>> Adding partitions succeeded!
>>>
>>>
>>>
>>
>

Reply via email to