> On Feb 23, 2017, at 10:03 PM, Jaikiran Pai <jai.forums2...@gmail.com> wrote: > > (Re)posting this from the user mailing list to dev mailing list, hoping for > some inputs from the Kafka dev team: > > We are on Kafka 0.10.0.1 (server and client) and use Java consumer/producer > APIs. We have an application where we create Kafka topics dynamically (using > the AdminUtils Java API) and then start producing/consuming on those topics. > The issue we frequently run into is this: > > 1. Application process creates a topic "foo-bar" via AdminUtils.createTopic. > This is sucessfully completed. > 2. Same application process then creates a consumer (using new Java consumer > API) on that foo-bar topic as a next step. > 3. The consumer that gets created in step#2 however, doesn't seem to be > enrolled in consumer group for this topic because of this (notice the last > line in the log): > > 2017-02-21 00:58:43,359 [Thread-6] DEBUG > org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created > 2017-02-21 00:58:43,360 [Thread-6] DEBUG > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): > foo-bar > 2017-02-21 00:58:43,543 [Thread-6] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received > group coordinator response ClientResponse(receivedTimeMs=1487667523542, > disconnected=false, request=ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f, > > request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, > body={group_id=my-app-group}), createdTimeMs=1487667523378, > sendTimeMs=1487667523529), > responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}}) > 2017-02-21 00:58:43,543 [Thread-6] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator localhost:9092 (id: 2147483647 rack: null) for group my-app-group. > 2017-02-21 00:58:43,545 [Thread-6] INFO > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking > previously assigned partitions [] for group my-app-group > 2017-02-21 00:58:43,545 [Thread-6] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > (Re-)joining group my-app-group > 2017-02-21 00:58:43,548 [Thread-6] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending > JoinGroup > ({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 > lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null) > 2017-02-21 00:58:43,548 [Thread-6] DEBUG > org.apache.kafka.common.metrics.Metrics - Added sensor with name > node-2147483647.bytes-sent > 2017-02-21 00:58:43,549 [Thread-6] DEBUG > org.apache.kafka.common.metrics.Metrics - Added sensor with name > node-2147483647.bytes-received > 2017-02-21 00:58:43,549 [Thread-6] DEBUG > org.apache.kafka.common.metrics.Metrics - Added sensor with name > node-2147483647.latency > 2017-02-21 00:58:43,552 [Thread-6] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received > successful join group response for group my-app-group: > {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 > lim=59 cap=59]}]} > 2017-02-21 00:58:43,552 [Thread-6] DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing > assignment for group my-app-group using strategy range with subscriptions > {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])} > *2017-02-21 00:58:43,552 [Thread-6] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - > Skipping assignment for topic foo-bar since no metadata is available* > > > 4. A few seconds later, a separate process, produces (via Java producer API) > on the foo-bar topic, some messages. > 5. The consumer created in step#2 (although is waiting for messages) on the > foo-bar topic, _doesn't_ consume these messages. > 6. *5 minutes later* the Kafka server triggers a consumer rebalance which > then successfully assigns partition(s) of this foo-bar topic to consumer > created in step#2 and the consumer start consuming these messages. > > This 5 minute delay in consuming messages from this dynamically created topic > is what we want to avoid. Is there anyway I can deterministically do/force > creation of a dynamic topic and be assured that upon completion of that call, > I can create a consumer and start consuming of that topic such that it can > receive messages as soon as the messages are produced on that topic, without > having to wait for a 5 minute delay (or whatever the rebalance configuration > is)? In essence, is there a way to ensure that the Kafka consumer does get > the topic metadata for a topic that was created successfully by the same > application, immediately? > >
Summary: I don't think you can, but you can workaround this issue by setting metadata.max.age.ms. Details: I believe that AdminUtils.createTopic simply creates the necessary topic/partition info in zookeeper and then returns (let's say this is time=T1). The broker, at some point later, responds to it and creates topic/partitions (let's say this is time=T2). Your consumer is being instantiated between T1 and T2, and so the topic doesn't really "exist" yet on the broker. If you wanted to be sure that the topic was created, you would have to poll the brokers until the metadata request suceeded. You *might* be able to call consumer.partitionsFor(String topic) in order to trigger a metadata refresh, but I'm not sure. The 5 minute delay you are seeing is due to the consumer configuation value metadata.max.age.ms, which defaults to 300000 (5 minutes). metadata.max.age.ms is "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions." So the consumer is re-requesting the metadata after 5 minutes, is noticing the partition, and is assigning it to the consumer. If you are okay with the current behavior but simply wish to have a shorter delay, you can configure your consumer with a lower value of metadata.max.age.ms to get it to "notice" the new topic earlier. If you set it to a low value, then it will notice the topic earlier. However, if the topic is already created and you leave the value low, you will end up unnecessarily refreshing more frequently. -James > P.S: We have topic auto creation disabled, so this isn't really a auto topic > creation issue. In our case we are explicitly invoking the create topic > operation via the Java API. > > -Jaikiran