James, thank you very much for this explanation and I now understand the situation much more clearly. I wasn't aware that the consumer's metadata.max.age.ms could play a role in this. I was under the impression that the 5 minute timeout is some broker level config which was triggering this consumer group reevaluation.

Perhaps changing the metadata.max.age.ms might be what we will end up doing, but before doing that, I will check how the consumer.partitionsFor API (the one which you noted in your reply) behaves in practice. The javadoc on that method states that it will trigger a refresh of the metadata if none is found for the topic. There's also a note that it throws a TimeoutException if the metadata isn't available for the topic after a timeout period. I'll experiment a bit with this API to see if I can be assured of a empty list if the topic metadata (after a fetch) isn't available. That way, I can add some logic in our application which calls this API for a certain number of fixed times, before the consumer is considered "ready". If it does throw a TimeoutException, I think I'll just catch it and repeat the call for the fixed number of times.

Thanks again, this was really helpful - I was running out of ideas to come up with a clean enough workaround to this problem. Your explanation has given me a couple of ideas which I consider clean enough to try a few things. Once I have consistent working solution for this, I'll send a note on what approach I settled on.


-Jaikiran

On Friday 24 February 2017 12:08 PM, James Cheng wrote:
On Feb 23, 2017, at 10:03 PM, Jaikiran Pai <jai.forums2...@gmail.com> wrote:

(Re)posting this from the user mailing list to dev mailing list, hoping for 
some inputs from the Kafka dev team:

We are on Kafka 0.10.0.1 (server and client) and use Java consumer/producer 
APIs. We have an application where we create Kafka topics dynamically (using 
the AdminUtils Java API) and then start producing/consuming on those topics. 
The issue we frequently run into is this:

1. Application process creates a topic "foo-bar" via AdminUtils.createTopic. 
This is sucessfully completed.
2. Same application process then creates a consumer (using new Java consumer 
API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to be 
enrolled in consumer group for this topic because of this (notice the last line 
in the log):

2017-02-21 00:58:43,359 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): 
foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received 
group coordinator response ClientResponse(receivedTimeMs=1487667523542, 
disconnected=false, request=ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f,
 
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
 body={group_id=my-app-group}), createdTimeMs=1487667523378, 
sendTimeMs=1487667523529), 
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered 
coordinator localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking 
previously assigned partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining 
group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending 
JoinGroup 
({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
 lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received 
successful join group response for group my-app-group: 
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
 lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing 
assignment for group my-app-group using strategy range with subscriptions 
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - 
Skipping assignment for topic foo-bar since no metadata is available*


4. A few seconds later, a separate process, produces (via Java producer API) on 
the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages) on the 
foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance which then 
successfully assigns partition(s) of this foo-bar topic to consumer created in 
step#2 and the consumer start consuming these messages.

This 5 minute delay in consuming messages from this dynamically created topic 
is what we want to avoid. Is there anyway I can deterministically do/force 
creation of a dynamic topic and be assured that upon completion of that call, I 
can create a consumer and start consuming of that topic such that it can 
receive messages as soon as the messages are produced on that topic, without 
having to wait for a 5 minute delay (or whatever the rebalance configuration 
is)? In essence, is there a way to ensure that the Kafka consumer does get the 
topic metadata for a topic that was created successfully by the same 
application, immediately?


Summary: I don't think you can, but you can workaround this issue by setting 
metadata.max.age.ms.

Details: I believe that AdminUtils.createTopic simply creates the necessary 
topic/partition info in zookeeper and then returns (let's say this is time=T1). The 
broker, at some point later, responds to it and creates topic/partitions (let's say this 
is time=T2). Your consumer is being instantiated between T1 and T2, and so the topic 
doesn't really "exist" yet on the broker.

If you wanted to be sure that the topic was created, you would have to poll the 
brokers until the metadata request suceeded. You *might* be able to call 
consumer.partitionsFor(String topic) in order to trigger a metadata refresh, 
but I'm not sure.

The 5 minute delay you are seeing is due to the consumer configuation value 
metadata.max.age.ms, which defaults to 300000 (5 minutes). metadata.max.age.ms is 
"The period of time in milliseconds after which we force a refresh of metadata even 
if we haven't seen any partition leadership changes to proactively discover any new 
brokers or partitions."

So the consumer is re-requesting the metadata after 5 minutes, is noticing the 
partition, and is assigning it to the consumer.

If you are okay with the current behavior but simply wish to have a shorter delay, you 
can configure your consumer with a lower value of metadata.max.age.ms to get it to 
"notice" the new topic earlier. If you set it to a low value, then it will 
notice the topic earlier. However, if the topic is already created and you leave the 
value low, you will end up unnecessarily refreshing more frequently.

-James


P.S: We have topic auto creation disabled, so this isn't really a auto topic 
creation issue. In our case we are explicitly invoking the create topic 
operation via the Java API.

-Jaikiran

Reply via email to