I think I've discovered the answer to my second question: according to the
code in ZookeeperConsumerConnector.scala, a rebalance derives its offsets
from what's already in Zookeeper. Therefore, uncommitted but consumed
messages from a given partition will be replayed when the partition is
reassigned.

On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <kyleban...@gmail.com> wrote:

> I'm using Kafka 0.8.1.1.
>
> I have a simple goal: use the high-level consumer to consume a message
> from Kafka, publish the message to a different system, and then commit the
> message in Kafka. Based on my reading of the docs and the mailing list, it
> seems like this isn't so easy to achieve. Here is my current understanding:
>
> First, I have to disable auto-commit. If the consumer automatically
> commits, then I may lose messages if, for example, my process dies after
> consuming but before publishing my message.
>
> Next, if my app is multi-threaded, I need to either
>
> a) use a separate consumer per thread (memory-intensive, hard on
> Zookeeper) or
> b) use a single consumer and assign a KafkaStream to each thread. Then,
> when I want to commit, first synchronize all threads using a barrier.
>
> First question: is this correct so far?
>
>
> Still, it appears that rebalancing may be a problem. In particular, this
> sequence of events:
>
> 1. I'm consuming from a stream tied to two partitions, A and B.
> 2. I consume a message, M, from partition A.
> 3. Partition A gets assigned to a different consumer.
> 4. I choose not to commit M or my process fails.
>
> Second question: When the partition is reassigned, will the message that I
> consumed be automatically committed? If so, then there's no way to get the
> reliability I want.
>
>
> Third question: How do the folks at LinkedIn handle this overall use case?
> What about other users?
>
> It seems to me that a lot of the complexity here could be easily addressed
> by changing the way in which a partition's message pointer is advanced.
> That is, when I consume message M, advance the pointer to message (M - 1)
> rather than to M. In other words, calling iterator.next() would imply that
> the previously consumed message may be safely committed. If this were the
> case, I could simply enable auto-commit and be happy.
>

Reply via email to