[ 
https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559650#comment-14559650
 ] 

Ewen Cheslack-Postava commented on KAFKA-2168:
----------------------------------------------

Some reasons you might want to use the consumer from multiple threads:

1. I don't think it's necessarily addressed by this JIRA, but if processing 
messages is expensive, or the processing code is easier to write as synchronous 
calls even if it requires accessing some network resource, you might want 
multiple threads to be able to call poll(). This should already behave 
correctly.
2. Manage offset commits in a separate thread from polling. If you need to 
coordinate some other action with offset commit, your choices are currently to 
be careful in computing timeouts for poll() in order to get processing in the 
same thread or to try committing from another thread. The code for doing this 
is much simpler to write if you can just fire up a thread that does sync commit 
+ whatever other operation you need to do, then sleeps for the next commit 
interval. If you do this right you can continue processing messages during the 
offset commit, even if it ends up delayed for some reason.
3. close() is probably the most obvious case given the feedback we've had on 
the producer's close() method blocking indefinitely -- you want to be able to 
close() from a separate thread if you keep a thread dedicated to poll()ing. For 
example, using a shutdown hook requires this. The feedback on the producer made 
it clear this is important and should also have a timeout.
4. Metrics. MetricsReporter is the "right" way to get metrics, but that only 
works if what you care about is already covered. I don't think per 
topic-partition position() and committed() are currently reported -- not sure 
what the plan is there since reporting metrics in something like mirrormaker 
might be too much, but some applications will want to be able to track that 
info in metrics. This is another case where just firing up a thread to 
periodically check the state of the consumer and report it via whatever metrics 
package they use is probably the easiest implementation. 
5. Any time you may need to make dynamic changes to the consumer in response to 
external events. For example, consider a mirrormaker-like service. If you want 
to be able to dynamically reconfigure the consumer to add new topics to the 
job, subscribe() will block indefinitely if poll() has a long timeout and new 
data isn't flowing in to the topics you're already subscribed to. A wakeup() 
method isn't good enough here since you need to manage the subsequent race 
between the thread trying to subscribe() and the poll()ing thread.

At a minimum, the current state where thread safety is guaranteed in the 
javadoc but we have indefinite blocking is a problem. If we want it to be a 
single-threaded API, then we should just leave locking up to the user (although 
we'd probably still at least want some sort of wakeup() method so they could 
interrupt long poll() calls).

> New consumer poll() can block other calls like position(), commit(), and 
> close() indefinitely
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-2168
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2168
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Jason Gustafson
>
> The new consumer is currently using very coarse-grained synchronization. For 
> most methods this isn't a problem since they finish quickly once the lock is 
> acquired, but poll() might run for a long time (and commonly will since 
> polling with long timeouts is a normal use case). This means any operations 
> invoked from another thread may block until the poll() call completes.
> Some example use cases where this can be a problem:
> * A shutdown hook is registered to trigger shutdown and invokes close(). It 
> gets invoked from another thread and blocks indefinitely.
> * User wants to manage offset commit themselves in a background thread. If 
> the commit policy is not purely time based, it's not currently possibly to 
> make sure the call to commit() will be processed promptly.
> Two possible solutions to this:
> 1. Make sure a lock is not held during the actual select call. Since we have 
> multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector) 
> this is probably hard to make work cleanly since locking is currently only 
> performed at the KafkaConsumer level and we'd want it unlocked around a 
> single line of code in Selector.
> 2. Wake up the selector before synchronizing for certain operations. This 
> would require some additional coordination to make sure the caller of 
> wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() 
> thread being woken up and then promptly reacquiring the lock with a 
> subsequent long poll() call).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to