An update on one of the workaround I tried - I added some logic in our
consumption part to wait for the KafkaConsumer.partitionsFor() to return
the topic name in the list, before actually considering the
KafkaConsumer ready to consume/poll the messages. Something like
this[1]. This does successfully make the consumer wait till the presence
of the topic is confirmed. However, we still see the assignment of
partitions being skipped:
2017-02-26 21:02:10,823 [Thread-46] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor -
Skipping assignment for topic foo-bar since no metadata is available
I looked at the code of KafkaConsumer.partitionsFor(...). It does indeed
fetch (remotely) the partitions info but it _doesn't_ (force) update the
"metadata" that the KafkaConsumer instance holds on to. This "metadata"
is what is used by the assignment logic and effectively it ends up
considering this topic absent. I am not sure why the "metadata" cannot
be force updated (internally) when a remote fetch is triggered via
partitionsFor() call, I guess there's a valid reason. In short, this
approach of using partitionsFor, although looked promising, won't work
out. So, I'm going to go back to the metadata.max.age.ms and fiddle with
it a bit to keep it at a low value (was trying to avoid this, since this
then is applicable throughout the lifetime of that consumer).
[1] https://gist.github.com/jaikiran/902c1eadbfdd66466c8d8ecbd81416bf
-Jaikiran
On Friday 24 February 2017 12:29 PM, Jaikiran Pai wrote:
James, thank you very much for this explanation and I now understand
the situation much more clearly. I wasn't aware that the consumer's
metadata.max.age.ms could play a role in this. I was under the
impression that the 5 minute timeout is some broker level config which
was triggering this consumer group reevaluation.
Perhaps changing the metadata.max.age.ms might be what we will end up
doing, but before doing that, I will check how the
consumer.partitionsFor API (the one which you noted in your reply)
behaves in practice. The javadoc on that method states that it will
trigger a refresh of the metadata if none is found for the topic.
There's also a note that it throws a TimeoutException if the metadata
isn't available for the topic after a timeout period. I'll experiment
a bit with this API to see if I can be assured of a empty list if the
topic metadata (after a fetch) isn't available. That way, I can add
some logic in our application which calls this API for a certain
number of fixed times, before the consumer is considered "ready". If
it does throw a TimeoutException, I think I'll just catch it and
repeat the call for the fixed number of times.
Thanks again, this was really helpful - I was running out of ideas to
come up with a clean enough workaround to this problem. Your
explanation has given me a couple of ideas which I consider clean
enough to try a few things. Once I have consistent working solution
for this, I'll send a note on what approach I settled on.
-Jaikiran
On Friday 24 February 2017 12:08 PM, James Cheng wrote:
On Feb 23, 2017, at 10:03 PM, Jaikiran Pai
<jai.forums2...@gmail.com> wrote:
(Re)posting this from the user mailing list to dev mailing list,
hoping for some inputs from the Kafka dev team:
We are on Kafka 0.10.0.1 (server and client) and use Java
consumer/producer APIs. We have an application where we create Kafka
topics dynamically (using the AdminUtils Java API) and then start
producing/consuming on those topics. The issue we frequently run
into is this:
1. Application process creates a topic "foo-bar" via
AdminUtils.createTopic. This is sucessfully completed.
2. Same application process then creates a consumer (using new Java
consumer API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to
be enrolled in consumer group for this topic because of this (notice
the last line in the log):
2017-02-21 00:58:43,359 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer
created
2017-02-21 00:58:43,360 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
topic(s): foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Received group coordinator response
ClientResponse(receivedTimeMs=1487667523542, disconnected=false,
request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f,
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
body={group_id=my-app-group}), createdTimeMs=1487667523378,
sendTimeMs=1487667523529),
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Discovered coordinator localhost:9092 (id: 2147483647 rack: null)
for group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
Revoking previously assigned partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
(Re-)joining group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Sending JoinGroup
({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647
rack: null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Received successful join group response for group my-app-group:
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
Performing assignment for group my-app-group using strategy range
with subscriptions
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
- Skipping assignment for topic foo-bar since no metadata is available*
4. A few seconds later, a separate process, produces (via Java
producer API) on the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages)
on the foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance
which then successfully assigns partition(s) of this foo-bar topic
to consumer created in step#2 and the consumer start consuming these
messages.
This 5 minute delay in consuming messages from this dynamically
created topic is what we want to avoid. Is there anyway I can
deterministically do/force creation of a dynamic topic and be
assured that upon completion of that call, I can create a consumer
and start consuming of that topic such that it can receive messages
as soon as the messages are produced on that topic, without having
to wait for a 5 minute delay (or whatever the rebalance
configuration is)? In essence, is there a way to ensure that the
Kafka consumer does get the topic metadata for a topic that was
created successfully by the same application, immediately?
Summary: I don't think you can, but you can workaround this issue by
setting metadata.max.age.ms.
Details: I believe that AdminUtils.createTopic simply creates the
necessary topic/partition info in zookeeper and then returns (let's
say this is time=T1). The broker, at some point later, responds to it
and creates topic/partitions (let's say this is time=T2). Your
consumer is being instantiated between T1 and T2, and so the topic
doesn't really "exist" yet on the broker.
If you wanted to be sure that the topic was created, you would have
to poll the brokers until the metadata request suceeded. You *might*
be able to call consumer.partitionsFor(String topic) in order to
trigger a metadata refresh, but I'm not sure.
The 5 minute delay you are seeing is due to the consumer configuation
value metadata.max.age.ms, which defaults to 300000 (5 minutes).
metadata.max.age.ms is "The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new brokers
or partitions."
So the consumer is re-requesting the metadata after 5 minutes, is
noticing the partition, and is assigning it to the consumer.
If you are okay with the current behavior but simply wish to have a
shorter delay, you can configure your consumer with a lower value of
metadata.max.age.ms to get it to "notice" the new topic earlier. If
you set it to a low value, then it will notice the topic earlier.
However, if the topic is already created and you leave the value low,
you will end up unnecessarily refreshing more frequently.
-James
P.S: We have topic auto creation disabled, so this isn't really a
auto topic creation issue. In our case we are explicitly invoking
the create topic operation via the Java API.
-Jaikiran