> On Feb 26, 2017, at 10:36 PM, Jaikiran Pai <jai.forums2...@gmail.com> wrote:
> 
> An update on one of the workaround I tried - I added some logic in our 
> consumption part to wait for the KafkaConsumer.partitionsFor() to return the 
> topic name in the list, before actually considering the KafkaConsumer ready 
> to consume/poll the messages. Something like this[1]. This does successfully 
> make the consumer wait till the presence of the topic is confirmed. However, 
> we still see the assignment of partitions being skipped:
> 
> 2017-02-26 21:02:10,823 [Thread-46] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - 
> Skipping assignment for topic foo-bar since no metadata is available
> 
> I looked at the code of KafkaConsumer.partitionsFor(...). It does indeed 
> fetch (remotely) the partitions info but it _doesn't_ (force) update the 
> "metadata" that the KafkaConsumer instance holds on to. This "metadata" is 
> what is used by the assignment logic and effectively it ends up considering 
> this topic absent. I am not sure why the "metadata" cannot be force updated 
> (internally) when a remote fetch is triggered via partitionsFor() call, I 
> guess there's a valid reason. In short, this approach of using partitionsFor, 
> although looked promising, won't work out. So, I'm going to go back to the 
> metadata.max.age.ms and fiddle with it a bit to keep it at a low value (was 
> trying to avoid this, since this then is applicable throughout the lifetime 
> of that consumer).
> 

Jaikiran,

What about 
1) create topic
2) create consumer1 and do consumer1.partitionsFor() until it succeeds
3) close consumer1
4) create consumer2 and do consumer2.subscribe()

-James


> 
> [1] https://gist.github.com/jaikiran/902c1eadbfdd66466c8d8ecbd81416bf
> 
> -Jaikiran
> On Friday 24 February 2017 12:29 PM, Jaikiran Pai wrote:
>> James, thank you very much for this explanation and I now understand the 
>> situation much more clearly. I wasn't aware that the consumer's 
>> metadata.max.age.ms could play a role in this. I was under the impression 
>> that the 5 minute timeout is some broker level config which was triggering 
>> this consumer group reevaluation.
>> 
>> Perhaps changing the metadata.max.age.ms might be what we will end up doing, 
>> but before doing that, I will check how the consumer.partitionsFor API (the 
>> one which you noted in your reply) behaves in practice. The javadoc on that 
>> method states that it will trigger a refresh of the metadata if none is 
>> found for the topic. There's also a note that it throws a TimeoutException 
>> if the metadata isn't available for the topic after a timeout period. I'll 
>> experiment a bit with this API to see if I can be assured of a empty list if 
>> the topic metadata (after a fetch) isn't available. That way, I can add some 
>> logic in our application which calls this API for a certain number of fixed 
>> times, before the consumer is considered "ready". If it does throw a 
>> TimeoutException, I think I'll just catch it and repeat the call for the 
>> fixed number of times.
>> 
>> Thanks again, this was really helpful - I was running out of ideas to come 
>> up with a clean enough workaround to this problem. Your explanation has 
>> given me a couple of ideas which I consider clean enough to try a few 
>> things. Once I have consistent working solution for this, I'll send a note 
>> on what approach I settled on.
>> 
>> 
>> -Jaikiran
>> 
>> On Friday 24 February 2017 12:08 PM, James Cheng wrote:
>>>> On Feb 23, 2017, at 10:03 PM, Jaikiran Pai <jai.forums2...@gmail.com> 
>>>> wrote:
>>>> 
>>>> (Re)posting this from the user mailing list to dev mailing list, hoping 
>>>> for some inputs from the Kafka dev team:
>>>> 
>>>> We are on Kafka 0.10.0.1 (server and client) and use Java 
>>>> consumer/producer APIs. We have an application where we create Kafka 
>>>> topics dynamically (using the AdminUtils Java API) and then start 
>>>> producing/consuming on those topics. The issue we frequently run into is 
>>>> this:
>>>> 
>>>> 1. Application process creates a topic "foo-bar" via 
>>>> AdminUtils.createTopic. This is sucessfully completed.
>>>> 2. Same application process then creates a consumer (using new Java 
>>>> consumer API) on that foo-bar topic as a next step.
>>>> 3. The consumer that gets created in step#2 however, doesn't seem to be 
>>>> enrolled in consumer group for this topic because of this (notice the last 
>>>> line in the log):
>>>> 
>>>> 2017-02-21 00:58:43,359 [Thread-6] DEBUG 
>>>> org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
>>>> 2017-02-21 00:58:43,360 [Thread-6] DEBUG 
>>>> org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): 
>>>> foo-bar
>>>> 2017-02-21 00:58:43,543 [Thread-6] DEBUG 
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received 
>>>> group coordinator response ClientResponse(receivedTimeMs=1487667523542, 
>>>> disconnected=false, request=ClientRequest(expectResponse=true, 
>>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f,
>>>>  
>>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
>>>>  body={group_id=my-app-group}), createdTimeMs=1487667523378, 
>>>> sendTimeMs=1487667523529), 
>>>> responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
>>>> 2017-02-21 00:58:43,543 [Thread-6] INFO 
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
>>>> Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for 
>>>> group my-app-group.
>>>> 2017-02-21 00:58:43,545 [Thread-6] INFO 
>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking 
>>>> previously assigned partitions [] for group my-app-group
>>>> 2017-02-21 00:58:43,545 [Thread-6] INFO 
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
>>>> (Re-)joining group my-app-group
>>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG 
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending 
>>>> JoinGroup 
>>>> ({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
>>>>  lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: 
>>>> null)
>>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG 
>>>> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
>>>> node-2147483647.bytes-sent
>>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG 
>>>> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
>>>> node-2147483647.bytes-received
>>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG 
>>>> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
>>>> node-2147483647.latency
>>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG 
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received 
>>>> successful join group response for group my-app-group: 
>>>> {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
>>>>  lim=59 cap=59]}]}
>>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG 
>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
>>>> Performing assignment for group my-app-group using strategy range with 
>>>> subscriptions 
>>>> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
>>>> *2017-02-21 00:58:43,552 [Thread-6] DEBUG 
>>>> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - 
>>>> Skipping assignment for topic foo-bar since no metadata is available*
>>>> 
>>>> 
>>>> 4. A few seconds later, a separate process, produces (via Java producer 
>>>> API) on the foo-bar topic, some messages.
>>>> 5. The consumer created in step#2 (although is waiting for messages) on 
>>>> the foo-bar topic, _doesn't_ consume these messages.
>>>> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which 
>>>> then successfully assigns partition(s) of this foo-bar topic to consumer 
>>>> created in step#2 and the consumer start consuming these messages.
>>>> 
>>>> This 5 minute delay in consuming messages from this dynamically created 
>>>> topic is what we want to avoid. Is there anyway I can deterministically 
>>>> do/force creation of a dynamic topic and be assured that upon completion 
>>>> of that call, I can create a consumer and start consuming of that topic 
>>>> such that it can receive messages as soon as the messages are produced on 
>>>> that topic, without having to wait for a 5 minute delay (or whatever the 
>>>> rebalance configuration is)? In essence, is there a way to ensure that the 
>>>> Kafka consumer does get the topic metadata for a topic that was created 
>>>> successfully by the same application, immediately?
>>>> 
>>>> 
>>> Summary: I don't think you can, but you can workaround this issue by 
>>> setting metadata.max.age.ms.
>>> 
>>> Details: I believe that AdminUtils.createTopic simply creates the necessary 
>>> topic/partition info in zookeeper and then returns (let's say this is 
>>> time=T1). The broker, at some point later, responds to it and creates 
>>> topic/partitions (let's say this is time=T2). Your consumer is being 
>>> instantiated between T1 and T2, and so the topic doesn't really "exist" yet 
>>> on the broker.
>>> 
>>> If you wanted to be sure that the topic was created, you would have to poll 
>>> the brokers until the metadata request suceeded. You *might* be able to 
>>> call consumer.partitionsFor(String topic) in order to trigger a metadata 
>>> refresh, but I'm not sure.
>>> 
>>> The 5 minute delay you are seeing is due to the consumer configuation value 
>>> metadata.max.age.ms, which defaults to 300000 (5 minutes). 
>>> metadata.max.age.ms is "The period of time in milliseconds after which we 
>>> force a refresh of metadata even if we haven't seen any partition 
>>> leadership changes to proactively discover any new brokers or partitions."
>>> 
>>> So the consumer is re-requesting the metadata after 5 minutes, is noticing 
>>> the partition, and is assigning it to the consumer.
>>> 
>>> If you are okay with the current behavior but simply wish to have a shorter 
>>> delay, you can configure your consumer with a lower value of 
>>> metadata.max.age.ms to get it to "notice" the new topic earlier. If you set 
>>> it to a low value, then it will notice the topic earlier. However, if the 
>>> topic is already created and you leave the value low, you will end up 
>>> unnecessarily refreshing more frequently.
>>> 
>>> -James
>>> 
>>> 
>>>> P.S: We have topic auto creation disabled, so this isn't really a auto 
>>>> topic creation issue. In our case we are explicitly invoking the create 
>>>> topic operation via the Java API.
>>>> 
>>>> -Jaikiran
>> 
> 

Reply via email to