This issue is being addressed in KAFKA-4631. See https://issues.apache.org/jira/browse/KAFKA-4631 and the discussion in the PR https://github.com/apache/kafka/pull/2622 for details.
Regards, Rajini On Thu, Mar 2, 2017 at 4:35 AM, Jaikiran Pai <jai.forums2...@gmail.com> wrote: > For future reference - I asked this question on dev mailing list and based > on the discussion there was able to come up with a workaround to get this > working. Details here https://www.mail-archive.com/d > e...@kafka.apache.org/msg67613.html > > -Jaikiran > > > On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote: > >> We are on Kafka 0.10.0.1 (server and client) and use Java >> consumer/producer APIs. We have an application where we create Kafka topics >> dynamically (using the AdminUtils Java API) and then start >> producing/consuming on those topics. The issue we frequently run into is >> this: >> >> 1. Application process creates a topic "foo-bar" via >> AdminUtils.createTopic. This is sucessfully completed. >> 2. Same application process then creates a consumer (using new Java >> consumer API) on that foo-bar topic as a next step. >> 3. The consumer that gets created in step#2 however, doesn't seem to be >> enrolled in consumer group for this topic because of this (notice the last >> line in the log): >> >> 2017-02-21 00:58:43,359 [Thread-6] DEBUG >> org.apache.kafka.clients.consumer.KafkaConsumer >> - Kafka consumer created >> 2017-02-21 00:58:43,360 [Thread-6] DEBUG >> org.apache.kafka.clients.consumer.KafkaConsumer >> - Subscribed to topic(s): foo-bar >> 2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consu >> mer.internals.AbstractCoordinator - Received group coordinator response >> ClientResponse(receivedTimeMs=1487667523542, disconnected=false, >> request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clie >> nts.consumer.internals.ConsumerNetworkClient$RequestFutureCo >> mpletionHandler@50aad50f, request=RequestSend(header={ap >> i_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, >> body={group_id=my-app-group}), createdTimeMs=1487667523378, >> sendTimeMs=1487667523529), responseBody={error_code=0,coo >> rdinator={node_id=0,host=localhost,port=9092}}) >> 2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consu >> mer.internals.AbstractCoordinator - Discovered coordinator >> localhost:9092 (id: 2147483647 rack: null) for group my-app-group. >> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu >> mer.internals.ConsumerCoordinator - Revoking previously assigned >> partitions [] for group my-app-group >> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu >> mer.internals.AbstractCoordinator - (Re-)joining group my-app-group >> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consu >> mer.internals.AbstractCoordinator - Sending JoinGroup >> ({group_id=my-app-group,session_timeout=30000,member_id=, >> protocol_type=consumer,group_protocols=[{protocol_name= >> range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]}) >> to coordinator localhost:9092 (id: 2147483647 <(214)%20748-3647> rack: >> null) >> 2017-02-21 00:58:43,548 [Thread-6] DEBUG >> org.apache.kafka.common.metrics.Metrics >> - Added sensor with name node-2147483647.bytes-sent >> 2017-02-21 00:58:43,549 [Thread-6] DEBUG >> org.apache.kafka.common.metrics.Metrics >> - Added sensor with name node-2147483647.bytes-received >> 2017-02-21 00:58:43,549 [Thread-6] DEBUG >> org.apache.kafka.common.metrics.Metrics >> - Added sensor with name node-2147483647.latency >> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu >> mer.internals.AbstractCoordinator - Received successful join group >> response for group my-app-group: {error_code=0,generation_id=1, >> group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe >> -87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe- >> 87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523- >> 402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 >> lim=59 cap=59]}]} >> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu >> mer.internals.ConsumerCoordinator - Performing assignment for group >> my-app-group using strategy range with subscriptions >> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscriptio >> n(topics=[foo-bar])} >> *2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu >> mer.internals.AbstractPartitionAssignor - Skipping assignment for topic >> foo-bar since no metadata is available* >> >> >> 4. A few seconds later, a separate process, produces (via Java producer >> API) on the foo-bar topic, some messages. >> 5. The consumer created in step#2 (although is waiting for messages) on >> the foo-bar topic, _doesn't_ consume these messages. >> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which >> then successfully assigns partition(s) of this foo-bar topic to consumer >> created in step#2 and the consumer start consuming these messages. >> >> This 5 minute delay in consuming messages from this dynamically created >> topic is what we want to avoid. Is there anyway I can deterministically >> do/force creation of a dynamic topic and be assured that upon completion of >> that call, I can create a consumer and start consuming of that topic such >> that it can receive messages as soon as the messages are produced on that >> topic, without having to wait for a 5 minute delay (or whatever the >> rebalance configuration is)? In essence, is there a way to ensure that the >> Kafka consumer does get the topic metadata for a topic that was created >> successfully by the same application, immediately? >> >> >> P.S: We have topic auto creation disabled, so this isn't really a auto >> topic creation issue. In our case we are explicitly invoking the create >> topic operation via the Java API. >> >> -Jaikiran >> > >