On Feb 23, 2017, at 10:03 PM, Jaikiran Pai <jai.forums2...@gmail.com> wrote:
(Re)posting this from the user mailing list to dev mailing list, hoping for
some inputs from the Kafka dev team:
We are on Kafka 0.10.0.1 (server and client) and use Java consumer/producer
APIs. We have an application where we create Kafka topics dynamically (using
the AdminUtils Java API) and then start producing/consuming on those topics.
The issue we frequently run into is this:
1. Application process creates a topic "foo-bar" via AdminUtils.createTopic.
This is sucessfully completed.
2. Same application process then creates a consumer (using new Java consumer
API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to be
enrolled in consumer group for this topic because of this (notice the last line
in the log):
2017-02-21 00:58:43,359 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s):
foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received
group coordinator response ClientResponse(receivedTimeMs=1487667523542,
disconnected=false, request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f,
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
body={group_id=my-app-group}), createdTimeMs=1487667523378,
sendTimeMs=1487667523529),
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered
coordinator localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking
previously assigned partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining
group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending
JoinGroup
({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received
successful join group response for group my-app-group:
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing
assignment for group my-app-group using strategy range with subscriptions
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor -
Skipping assignment for topic foo-bar since no metadata is available*
4. A few seconds later, a separate process, produces (via Java producer API) on
the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages) on the
foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance which then
successfully assigns partition(s) of this foo-bar topic to consumer created in
step#2 and the consumer start consuming these messages.
This 5 minute delay in consuming messages from this dynamically created topic
is what we want to avoid. Is there anyway I can deterministically do/force
creation of a dynamic topic and be assured that upon completion of that call, I
can create a consumer and start consuming of that topic such that it can
receive messages as soon as the messages are produced on that topic, without
having to wait for a 5 minute delay (or whatever the rebalance configuration
is)? In essence, is there a way to ensure that the Kafka consumer does get the
topic metadata for a topic that was created successfully by the same
application, immediately?