> On Feb 26, 2017, at 10:36 PM, Jaikiran Pai <jai.forums2...@gmail.com> wrote: > > An update on one of the workaround I tried - I added some logic in our > consumption part to wait for the KafkaConsumer.partitionsFor() to return the > topic name in the list, before actually considering the KafkaConsumer ready > to consume/poll the messages. Something like this[1]. This does successfully > make the consumer wait till the presence of the topic is confirmed. However, > we still see the assignment of partitions being skipped: > > 2017-02-26 21:02:10,823 [Thread-46] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - > Skipping assignment for topic foo-bar since no metadata is available > > I looked at the code of KafkaConsumer.partitionsFor(...). It does indeed > fetch (remotely) the partitions info but it _doesn't_ (force) update the > "metadata" that the KafkaConsumer instance holds on to. This "metadata" is > what is used by the assignment logic and effectively it ends up considering > this topic absent. I am not sure why the "metadata" cannot be force updated > (internally) when a remote fetch is triggered via partitionsFor() call, I > guess there's a valid reason. In short, this approach of using partitionsFor, > although looked promising, won't work out. So, I'm going to go back to the > metadata.max.age.ms and fiddle with it a bit to keep it at a low value (was > trying to avoid this, since this then is applicable throughout the lifetime > of that consumer). >
Jaikiran, What about 1) create topic 2) create consumer1 and do consumer1.partitionsFor() until it succeeds 3) close consumer1 4) create consumer2 and do consumer2.subscribe() -James > > [1] https://gist.github.com/jaikiran/902c1eadbfdd66466c8d8ecbd81416bf > > -Jaikiran > On Friday 24 February 2017 12:29 PM, Jaikiran Pai wrote: >> James, thank you very much for this explanation and I now understand the >> situation much more clearly. I wasn't aware that the consumer's >> metadata.max.age.ms could play a role in this. I was under the impression >> that the 5 minute timeout is some broker level config which was triggering >> this consumer group reevaluation. >> >> Perhaps changing the metadata.max.age.ms might be what we will end up doing, >> but before doing that, I will check how the consumer.partitionsFor API (the >> one which you noted in your reply) behaves in practice. The javadoc on that >> method states that it will trigger a refresh of the metadata if none is >> found for the topic. There's also a note that it throws a TimeoutException >> if the metadata isn't available for the topic after a timeout period. I'll >> experiment a bit with this API to see if I can be assured of a empty list if >> the topic metadata (after a fetch) isn't available. That way, I can add some >> logic in our application which calls this API for a certain number of fixed >> times, before the consumer is considered "ready". If it does throw a >> TimeoutException, I think I'll just catch it and repeat the call for the >> fixed number of times. >> >> Thanks again, this was really helpful - I was running out of ideas to come >> up with a clean enough workaround to this problem. Your explanation has >> given me a couple of ideas which I consider clean enough to try a few >> things. Once I have consistent working solution for this, I'll send a note >> on what approach I settled on. >> >> >> -Jaikiran >> >> On Friday 24 February 2017 12:08 PM, James Cheng wrote: >>>> On Feb 23, 2017, at 10:03 PM, Jaikiran Pai <jai.forums2...@gmail.com> >>>> wrote: >>>> >>>> (Re)posting this from the user mailing list to dev mailing list, hoping >>>> for some inputs from the Kafka dev team: >>>> >>>> We are on Kafka 0.10.0.1 (server and client) and use Java >>>> consumer/producer APIs. We have an application where we create Kafka >>>> topics dynamically (using the AdminUtils Java API) and then start >>>> producing/consuming on those topics. The issue we frequently run into is >>>> this: >>>> >>>> 1. Application process creates a topic "foo-bar" via >>>> AdminUtils.createTopic. This is sucessfully completed. >>>> 2. Same application process then creates a consumer (using new Java >>>> consumer API) on that foo-bar topic as a next step. >>>> 3. The consumer that gets created in step#2 however, doesn't seem to be >>>> enrolled in consumer group for this topic because of this (notice the last >>>> line in the log): >>>> >>>> 2017-02-21 00:58:43,359 [Thread-6] DEBUG >>>> org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created >>>> 2017-02-21 00:58:43,360 [Thread-6] DEBUG >>>> org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): >>>> foo-bar >>>> 2017-02-21 00:58:43,543 [Thread-6] DEBUG >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received >>>> group coordinator response ClientResponse(receivedTimeMs=1487667523542, >>>> disconnected=false, request=ClientRequest(expectResponse=true, >>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f, >>>> >>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, >>>> body={group_id=my-app-group}), createdTimeMs=1487667523378, >>>> sendTimeMs=1487667523529), >>>> responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}}) >>>> 2017-02-21 00:58:43,543 [Thread-6] INFO >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - >>>> Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for >>>> group my-app-group. >>>> 2017-02-21 00:58:43,545 [Thread-6] INFO >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking >>>> previously assigned partitions [] for group my-app-group >>>> 2017-02-21 00:58:43,545 [Thread-6] INFO >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - >>>> (Re-)joining group my-app-group >>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending >>>> JoinGroup >>>> ({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 >>>> lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: >>>> null) >>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG >>>> org.apache.kafka.common.metrics.Metrics - Added sensor with name >>>> node-2147483647.bytes-sent >>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG >>>> org.apache.kafka.common.metrics.Metrics - Added sensor with name >>>> node-2147483647.bytes-received >>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG >>>> org.apache.kafka.common.metrics.Metrics - Added sensor with name >>>> node-2147483647.latency >>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received >>>> successful join group response for group my-app-group: >>>> {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 >>>> lim=59 cap=59]}]} >>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - >>>> Performing assignment for group my-app-group using strategy range with >>>> subscriptions >>>> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])} >>>> *2017-02-21 00:58:43,552 [Thread-6] DEBUG >>>> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - >>>> Skipping assignment for topic foo-bar since no metadata is available* >>>> >>>> >>>> 4. A few seconds later, a separate process, produces (via Java producer >>>> API) on the foo-bar topic, some messages. >>>> 5. The consumer created in step#2 (although is waiting for messages) on >>>> the foo-bar topic, _doesn't_ consume these messages. >>>> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which >>>> then successfully assigns partition(s) of this foo-bar topic to consumer >>>> created in step#2 and the consumer start consuming these messages. >>>> >>>> This 5 minute delay in consuming messages from this dynamically created >>>> topic is what we want to avoid. Is there anyway I can deterministically >>>> do/force creation of a dynamic topic and be assured that upon completion >>>> of that call, I can create a consumer and start consuming of that topic >>>> such that it can receive messages as soon as the messages are produced on >>>> that topic, without having to wait for a 5 minute delay (or whatever the >>>> rebalance configuration is)? In essence, is there a way to ensure that the >>>> Kafka consumer does get the topic metadata for a topic that was created >>>> successfully by the same application, immediately? >>>> >>>> >>> Summary: I don't think you can, but you can workaround this issue by >>> setting metadata.max.age.ms. >>> >>> Details: I believe that AdminUtils.createTopic simply creates the necessary >>> topic/partition info in zookeeper and then returns (let's say this is >>> time=T1). The broker, at some point later, responds to it and creates >>> topic/partitions (let's say this is time=T2). Your consumer is being >>> instantiated between T1 and T2, and so the topic doesn't really "exist" yet >>> on the broker. >>> >>> If you wanted to be sure that the topic was created, you would have to poll >>> the brokers until the metadata request suceeded. You *might* be able to >>> call consumer.partitionsFor(String topic) in order to trigger a metadata >>> refresh, but I'm not sure. >>> >>> The 5 minute delay you are seeing is due to the consumer configuation value >>> metadata.max.age.ms, which defaults to 300000 (5 minutes). >>> metadata.max.age.ms is "The period of time in milliseconds after which we >>> force a refresh of metadata even if we haven't seen any partition >>> leadership changes to proactively discover any new brokers or partitions." >>> >>> So the consumer is re-requesting the metadata after 5 minutes, is noticing >>> the partition, and is assigning it to the consumer. >>> >>> If you are okay with the current behavior but simply wish to have a shorter >>> delay, you can configure your consumer with a lower value of >>> metadata.max.age.ms to get it to "notice" the new topic earlier. If you set >>> it to a low value, then it will notice the topic earlier. However, if the >>> topic is already created and you leave the value low, you will end up >>> unnecessarily refreshing more frequently. >>> >>> -James >>> >>> >>>> P.S: We have topic auto creation disabled, so this isn't really a auto >>>> topic creation issue. In our case we are explicitly invoking the create >>>> topic operation via the Java API. >>>> >>>> -Jaikiran >> >