An update on one of the workaround I tried - I added some logic in our consumption part to wait for the KafkaConsumer.partitionsFor() to return the topic name in the list, before actually considering the KafkaConsumer ready to consume/poll the messages. Something like this[1]. This does successfully make the consumer wait till the presence of the topic is confirmed. However, we still see the assignment of partitions being skipped:

2017-02-26 21:02:10,823 [Thread-46] DEBUG org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - Skipping assignment for topic foo-bar since no metadata is available

I looked at the code of KafkaConsumer.partitionsFor(...). It does indeed fetch (remotely) the partitions info but it _doesn't_ (force) update the "metadata" that the KafkaConsumer instance holds on to. This "metadata" is what is used by the assignment logic and effectively it ends up considering this topic absent. I am not sure why the "metadata" cannot be force updated (internally) when a remote fetch is triggered via partitionsFor() call, I guess there's a valid reason. In short, this approach of using partitionsFor, although looked promising, won't work out. So, I'm going to go back to the metadata.max.age.ms and fiddle with it a bit to keep it at a low value (was trying to avoid this, since this then is applicable throughout the lifetime of that consumer).


[1] https://gist.github.com/jaikiran/902c1eadbfdd66466c8d8ecbd81416bf

-Jaikiran
On Friday 24 February 2017 12:29 PM, Jaikiran Pai wrote:
James, thank you very much for this explanation and I now understand the situation much more clearly. I wasn't aware that the consumer's metadata.max.age.ms could play a role in this. I was under the impression that the 5 minute timeout is some broker level config which was triggering this consumer group reevaluation.

Perhaps changing the metadata.max.age.ms might be what we will end up doing, but before doing that, I will check how the consumer.partitionsFor API (the one which you noted in your reply) behaves in practice. The javadoc on that method states that it will trigger a refresh of the metadata if none is found for the topic. There's also a note that it throws a TimeoutException if the metadata isn't available for the topic after a timeout period. I'll experiment a bit with this API to see if I can be assured of a empty list if the topic metadata (after a fetch) isn't available. That way, I can add some logic in our application which calls this API for a certain number of fixed times, before the consumer is considered "ready". If it does throw a TimeoutException, I think I'll just catch it and repeat the call for the fixed number of times.

Thanks again, this was really helpful - I was running out of ideas to come up with a clean enough workaround to this problem. Your explanation has given me a couple of ideas which I consider clean enough to try a few things. Once I have consistent working solution for this, I'll send a note on what approach I settled on.


-Jaikiran

On Friday 24 February 2017 12:08 PM, James Cheng wrote:
On Feb 23, 2017, at 10:03 PM, Jaikiran Pai <jai.forums2...@gmail.com> wrote:

(Re)posting this from the user mailing list to dev mailing list, hoping for some inputs from the Kafka dev team:

We are on Kafka 0.10.0.1 (server and client) and use Java consumer/producer APIs. We have an application where we create Kafka topics dynamically (using the AdminUtils Java API) and then start producing/consuming on those topics. The issue we frequently run into is this:

1. Application process creates a topic "foo-bar" via AdminUtils.createTopic. This is sucessfully completed. 2. Same application process then creates a consumer (using new Java consumer API) on that foo-bar topic as a next step. 3. The consumer that gets created in step#2 however, doesn't seem to be enrolled in consumer group for this topic because of this (notice the last line in the log):

2017-02-21 00:58:43,359 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created 2017-02-21 00:58:43,360 [Thread-6] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): foo-bar 2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received group coordinator response ClientResponse(receivedTimeMs=1487667523542, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=my-app-group}), createdTimeMs=1487667523378, sendTimeMs=1487667523529), responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}}) 2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group my-app-group. 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group my-app-group 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group my-app-group 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null) 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-sent 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-received 2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.latency 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful join group response for group my-app-group: {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]} 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing assignment for group my-app-group using strategy range with subscriptions {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])} *2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - Skipping assignment for topic foo-bar since no metadata is available*


4. A few seconds later, a separate process, produces (via Java producer API) on the foo-bar topic, some messages. 5. The consumer created in step#2 (although is waiting for messages) on the foo-bar topic, _doesn't_ consume these messages. 6. *5 minutes later* the Kafka server triggers a consumer rebalance which then successfully assigns partition(s) of this foo-bar topic to consumer created in step#2 and the consumer start consuming these messages.

This 5 minute delay in consuming messages from this dynamically created topic is what we want to avoid. Is there anyway I can deterministically do/force creation of a dynamic topic and be assured that upon completion of that call, I can create a consumer and start consuming of that topic such that it can receive messages as soon as the messages are produced on that topic, without having to wait for a 5 minute delay (or whatever the rebalance configuration is)? In essence, is there a way to ensure that the Kafka consumer does get the topic metadata for a topic that was created successfully by the same application, immediately?


Summary: I don't think you can, but you can workaround this issue by setting metadata.max.age.ms.

Details: I believe that AdminUtils.createTopic simply creates the necessary topic/partition info in zookeeper and then returns (let's say this is time=T1). The broker, at some point later, responds to it and creates topic/partitions (let's say this is time=T2). Your consumer is being instantiated between T1 and T2, and so the topic doesn't really "exist" yet on the broker.

If you wanted to be sure that the topic was created, you would have to poll the brokers until the metadata request suceeded. You *might* be able to call consumer.partitionsFor(String topic) in order to trigger a metadata refresh, but I'm not sure.

The 5 minute delay you are seeing is due to the consumer configuation value metadata.max.age.ms, which defaults to 300000 (5 minutes). metadata.max.age.ms is "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions."

So the consumer is re-requesting the metadata after 5 minutes, is noticing the partition, and is assigning it to the consumer.

If you are okay with the current behavior but simply wish to have a shorter delay, you can configure your consumer with a lower value of metadata.max.age.ms to get it to "notice" the new topic earlier. If you set it to a low value, then it will notice the topic earlier. However, if the topic is already created and you leave the value low, you will end up unnecessarily refreshing more frequently.

-James


P.S: We have topic auto creation disabled, so this isn't really a auto topic creation issue. In our case we are explicitly invoking the create topic operation via the Java API.

-Jaikiran


Reply via email to