Yes, that was going to be my last resort attempt :) I'm going to give that
a try today and see how it goes. Although it isn't that great of a
solution, I don't mind doing it since the logic resides at one single place
in our application.

-Jaikiran

On Tuesday, February 28, 2017, James Cheng <wushuja...@gmail.com> wrote:
>
>
>> On Feb 26, 2017, at 10:36 PM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:
>>
>> An update on one of the workaround I tried - I added some logic in our
consumption part to wait for the KafkaConsumer.partitionsFor() to return
the topic name in the list, before actually considering the KafkaConsumer
ready to consume/poll the messages. Something like this[1]. This does
successfully make the consumer wait till the presence of the topic is
confirmed. However, we still see the assignment of partitions being skipped:
>>
>> 2017-02-26 21:02:10,823 [Thread-46] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor -
Skipping assignment for topic foo-bar since no metadata is available
>>
>> I looked at the code of KafkaConsumer.partitionsFor(...). It does indeed
fetch (remotely) the partitions info but it _doesn't_ (force) update the
"metadata" that the KafkaConsumer instance holds on to. This "metadata" is
what is used by the assignment logic and effectively it ends up considering
this topic absent. I am not sure why the "metadata" cannot be force updated
(internally) when a remote fetch is triggered via partitionsFor() call, I
guess there's a valid reason. In short, this approach of using
partitionsFor, although looked promising, won't work out. So, I'm going to
go back to the metadata.max.age.ms and fiddle with it a bit to keep it at a
low value (was trying to avoid this, since this then is applicable
throughout the lifetime of that consumer).
>>
>
> Jaikiran,
>
> What about
> 1) create topic
> 2) create consumer1 and do consumer1.partitionsFor() until it succeeds
> 3) close consumer1
> 4) create consumer2 and do consumer2.subscribe()
>
> -James
>
>
>>
>> [1] https://gist.github.com/jaikiran/902c1eadbfdd66466c8d8ecbd81416bf
>>
>> -Jaikiran
>> On Friday 24 February 2017 12:29 PM, Jaikiran Pai wrote:
>>> James, thank you very much for this explanation and I now understand
the situation much more clearly. I wasn't aware that the consumer's
metadata.max.age.ms could play a role in this. I was under the impression
that the 5 minute timeout is some broker level config which was triggering
this consumer group reevaluation.
>>>
>>> Perhaps changing the metadata.max.age.ms might be what we will end up
doing, but before doing that, I will check how the consumer.partitionsFor
API (the one which you noted in your reply) behaves in practice. The
javadoc on that method states that it will trigger a refresh of the
metadata if none is found for the topic. There's also a note that it throws
a TimeoutException if the metadata isn't available for the topic after a
timeout period. I'll experiment a bit with this API to see if I can be
assured of a empty list if the topic metadata (after a fetch) isn't
available. That way, I can add some logic in our application which calls
this API for a certain number of fixed times, before the consumer is
considered "ready". If it does throw a TimeoutException, I think I'll just
catch it and repeat the call for the fixed number of times.
>>>
>>> Thanks again, this was really helpful - I was running out of ideas to
come up with a clean enough workaround to this problem. Your explanation
has given me a couple of ideas which I consider clean enough to try a few
things. Once I have consistent working solution for this, I'll send a note
on what approach I settled on.
>>>
>>>
>>> -Jaikiran
>>>
>>> On Friday 24 February 2017 12:08 PM, James Cheng wrote:
>>>>> On Feb 23, 2017, at 10:03 PM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:
>>>>>
>>>>> (Re)posting this from the user mailing list to dev mailing list,
hoping for some inputs from the Kafka dev team:
>>>>>
>>>>> We are on Kafka 0.10.0.1 (server and client) and use Java
consumer/producer APIs. We have an application where we create Kafka topics
dynamically (using the AdminUtils Java API) and then start
producing/consuming on those topics. The issue we frequently run into is
this:
>>>>>
>>>>> 1. Application process creates a topic "foo-bar" via
AdminUtils.createTopic. This is sucessfully completed.
>>>>> 2. Same application process then creates a consumer (using new Java
consumer API) on that foo-bar topic as a next step.
>>>>> 3. The consumer that gets created in step#2 however, doesn't seem to
be enrolled in consumer group for this topic because of this (notice the
last line in the log):
>>>>>
>>>>> 2017-02-21 00:58:43,359 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
>>>>> 2017-02-21 00:58:43,360 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s):
foo-bar
>>>>> 2017-02-21 00:58:43,543 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received
group coordinator response ClientResponse(receivedTimeMs=1487667523542,
disconnected=false, request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f,
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
body={group_id=my-app-group}), createdTimeMs=1487667523378,
sendTimeMs=1487667523529),
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
>>>>> 2017-02-21 00:58:43,543 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group
my-app-group.
>>>>> 2017-02-21 00:58:43,545 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking
previously assigned partitions [] for group my-app-group
>>>>> 2017-02-21 00:58:43,545 [Thread-6] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
(Re-)joining group my-app-group
>>>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending
JoinGroup
({group_id=my-app-group,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null)
>>>>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.bytes-sent
>>>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.bytes-received
>>>>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG
org.apache.kafka.common.metrics.Metrics - Added sensor with name
node-2147483647.latency
>>>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received
successful join group response for group my-app-group:
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
lim=59 cap=59]}]}
>>>>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
Performing assignment for group my-app-group using strategy range with
subscriptions
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
>>>>> *2017-02-21 00:58:43,552 [Thread-6] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor -
Skipping assignment for topic foo-bar since no metadata is available*
>>>>>
>>>>>
>>>>> 4. A few seconds later, a separate process, produces (via Java
producer API) on the foo-bar topic, some messages.
>>>>> 5. The consumer created in step#2 (although is waiting for messages)
on the foo-bar topic, _doesn't_ consume these messages.
>>>>> 6. *5 minutes later* the Kafka server triggers a consumer rebalance
which then successfully assigns partition(s) of this foo-bar topic to
consumer created in step#2 and the consumer start consuming these messages.
>>>>>
>>>>> This 5 minute delay in consuming messages from this dynamically
created topic is what we want to avoid. Is there anyway I can
deterministically do/force creation of a dynamic topic and be assured that
upon completion of that call, I can create a consumer and start consuming
of that topic such that it can receive messages as soon as the messages are
produced on that topic, without having to wait for a 5 minute delay (or
whatever the rebalance configuration is)? In essence, is there a way to
ensure that the Kafka consumer does get the topic metadata for a topic that
was created successfully by the same application, immediately?
>>>>>
>>>>>
>>>> Summary: I don't think you can, but you can workaround this issue by
setting metadata.max.age.ms.
>>>>
>>>> Details: I believe that AdminUtils.createTopic simply creates the
necessary topic/partition info in zookeeper and then returns (let's say
this is time=T1). The broker, at some point later, responds to it and
creates topic/partitions (let's say this is time=T2). Your consumer is
being instantiated between T1 and T2, and so the topic doesn't really
"exist" yet on the broker.
>>>>
>>>> If you wanted to be sure that the topic was created, you would have to
poll the brokers until the metadata request suceeded. You *might* be able
to call consumer.partitionsFor(String topic) in order to trigger a metadata
refresh, but I'm not sure.
>>>>
>>>> The 5 minute delay you are seeing is due to the consumer configuation
value metadata.max.age.ms, which defaults to 300000 (5 minutes).
metadata.max.age.ms is "The period of time in milliseconds after which we
force a refresh of metadata even if we haven't seen any partition
leadership changes to proactively discover any new brokers or partitions."
>>>>
>>>> So the consumer is re-requesting the metadata after 5 minutes, is
noticing the partition, and is assigning it to the consumer.
>>>>
>>>> If you are okay with the current behavior but simply wish to have a
shorter delay, you can configure your consumer with a lower value of
metadata.max.age.ms to get it to "notice" the new topic earlier. If you set
it to a low value, then it will notice the topic earlier. However, if the
topic is already created and you leave the value low, you will end up
unnecessarily refreshing more frequently.
>>>>
>>>> -James
>>>>
>>>>
>>>>> P.S: We have topic auto creation disabled, so this isn't really a
auto topic creation issue. In our case we are explicitly invoking the
create topic operation via the Java API.
>>>>>
>>>>> -Jaikiran
>>>
>>
>
>

Reply via email to