I was reflecting on this more and I think there is a change or two that
should be considered for kafka.

First off, auto-commit is a very resilient mode.  Drops in zookeeper
sessions due to garbage collection, network, rebalance or other
interference are handled gracefully within the kafka client.

Systems still can drop even if the garbage collector is well tuned and
we're hosted on a high-quality network (the difference between 99.9% and
99.9999% reliability).  My proposal is to handle this drop better when
auto-commit is turned off:

- If a rebalance or similar occur (which cause the offset to get reverted
in the client), check and see if the client was assigned back to the same
partition or a different one.  If it's the same partition, find the place
last consumed (it doesn't do this today for us).  This is to make a
graceful recovery.
- If the partition assignment changes (which can mean duplicate data is
getting processed), throw an exception back to the application code.  This
lets the application code handle this exception-case with respect to the
work it's doing (with might be transactional).  Failing "silently" (yes
it's still getting logged) is very dangerous in our situation.

One or both of these might need to be behind a consumer config parameter,
I'm not familiar enough with when these get created and when the behavior
is simply changed for the better for the kafka project.

What are your thoughts?

Thanks,
Cliff

On Fri, Oct 23, 2015 at 10:57 AM, Cliff Rhyne <crh...@signal.co> wrote:

> Thanks, Jiangjie.  Understanding more about the auto-commit behavior and
> why it's resilient to these is a big help.
>
> We're going to do some deeper investigation and testing.  I'll report back
> when I have more information.
>
> Thanks,
> Cliff
>
>
> On Thu, Oct 22, 2015 at 11:48 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
>> Hi Cliff,
>>
>> If auto.offset.commit is set to true, the offset will be committed in
>> following cases in addition to periodical offset commit:
>>
>> 1. During consumer rebalance before release the partition ownership.
>> If consumer A owns partition P before rebalance, it will commit offset for
>> partition P during rebalance. If consumer B become the new owner of
>> partition P after rebalance, it will start from the committed offset, so
>> there will be no duplicate messages.
>> 2. When consumer closes.
>>
>> Rebalance will be triggered in the following cases:
>> 1. A consumer joins/leaves the group.
>> 2. Some topic/partition changes occurred to the interested topics.(e.g.
>> partition expansion for a topic; a new topic created and the consumer is
>> using a wildcard that matches the new topic name)
>>
>> To answer your question:
>> Simple consumer should not interfere with high level consumer because it
>> does not have any group management embedded.
>>
>> Typically a single high level consumer group will not rebalance unless
>> there is topic/partition change. However, it is possible the consumer
>> itself dropped out of the group and rejoins. This typically happens when
>> you have a ZK session timeout. In that case, you should see "ZK expired"
>> in
>> your log. You can search for that and see if that is the problem.
>>
>> Jiangjie (Becket) Qin
>>
>>
>> On Thu, Oct 22, 2015 at 1:14 PM, Cliff Rhyne <crh...@signal.co> wrote:
>>
>> > We did some more testing with logging turned on (I figured out why it
>> > wasn't working).  We tried increasing the JVM memory capacity on our
>> test
>> > server (it's lower than in production) and increasing the zookeeper
>> > timeouts.  Neither changed the results.  With trace logging enabled, we
>> saw
>> > that we were getting rebalances even though there is only one high level
>> > consumer running (there previously was a simple consumer that was told
>> to
>> > disconnect, but that consumer only checked the offsets and never
>> consumed
>> > data).
>> >
>> > - Is there possibly a race condition where the simple consumer has a
>> hold
>> > on a partition and shutdown is called before starting a high level
>> consumer
>> > but shutdown is done asynchronously?
>> > - What are the various things that can cause a consumer rebalance other
>> > than adding / removing high level consumers?
>> >
>> > Thanks,
>> > Cliff
>> >
>> > On Wed, Oct 21, 2015 at 4:20 PM, Cliff Rhyne <crh...@signal.co> wrote:
>> >
>> > > Hi Kris,
>> > >
>> > > Thanks for the tip.  I'm going to investigate this further.  I checked
>> > and
>> > > we have fairly short zk timeouts and run with a smaller memory
>> allocation
>> > > on the two environments we encounter this issue.  I'll let you all
>> know
>> > > what I find.
>> > >
>> > > I saw this ticket https://issues.apache.org/jira/browse/KAFKA-2049
>> that
>> > > seems to be related to the problem (but would only inform that an
>> issue
>> > > occurred).  Are there any other open issues that could be worked on to
>> > > improve Kafka's handling of this situation?
>> > >
>> > > Thanks,
>> > > Cliff
>> > >
>> > > On Wed, Oct 21, 2015 at 2:53 PM, Kris K <squareksc...@gmail.com>
>> wrote:
>> > >
>> > >> Hi Cliff,
>> > >>
>> > >> One other case I observed in my environment is - when there were gc
>> > pauses
>> > >> on one of our high level consumer in the group.
>> > >>
>> > >> Thanks,
>> > >> Kris
>> > >>
>> > >> On Wed, Oct 21, 2015 at 10:12 AM, Cliff Rhyne <crh...@signal.co>
>> wrote:
>> > >>
>> > >> > Hi James,
>> > >> >
>> > >> > There are two scenarios we run:
>> > >> >
>> > >> > 1. Multiple partitions with one consumer per partition.  This
>> rarely
>> > has
>> > >> > starting/stopping of consumers, so the pool is very static.  There
>> is
>> > a
>> > >> > configured consumer timeout, which is causing the
>> > >> ConsumerTimeoutException
>> > >> > to get thrown prior to the test starting.  We handle this exception
>> > and
>> > >> > then resume consuming.
>> > >> > 2. Single partition with one consumer.  This consumer is started
>> by a
>> > >> > triggered condition (number of messages pending to be processed in
>> the
>> > >> > kafka topic or a schedule).  The consumer is stopped after
>> processing
>> > is
>> > >> > completed.
>> > >> >
>> > >> > In both cases, based on my understanding there shouldn't be a
>> > rebalance
>> > >> as
>> > >> > either a) all consumers are running or b) there's only one
>> consumer /
>> > >> > partition.  Also, the same consumer group is used by all consumers
>> in
>> > >> > scenario 1 and 2.  Is there a good way to investigate whether
>> > rebalances
>> > >> > are occurring?
>> > >> >
>> > >> > Thanks,
>> > >> > Cliff
>> > >> >
>> > >> > On Wed, Oct 21, 2015 at 11:37 AM, James Cheng <jch...@tivo.com>
>> > wrote:
>> > >> >
>> > >> > > Do you have multiple consumers in a consumer group?
>> > >> > >
>> > >> > > I think that when a new consumer joins the consumer group, that
>> the
>> > >> > > existing consumers will stop consuming during the group
>> rebalance,
>> > and
>> > >> > then
>> > >> > > when they start consuming again, that they will consume from the
>> > last
>> > >> > > committed offset.
>> > >> > >
>> > >> > > You should get more verification on this, tho. I might be
>> > remembering
>> > >> > > wrong.
>> > >> > >
>> > >> > > -James
>> > >> > >
>> > >> > > > On Oct 21, 2015, at 8:40 AM, Cliff Rhyne <crh...@signal.co>
>> > wrote:
>> > >> > > >
>> > >> > > > Hi,
>> > >> > > >
>> > >> > > > My team and I are looking into a problem where the Java high
>> level
>> > >> > > consumer
>> > >> > > > provides duplicate messages if we turn auto commit off (using
>> > >> version
>> > >> > > > 0.8.2.1 of the server and Java client).  The expected sequence
>> of
>> > >> > events
>> > >> > > > are:
>> > >> > > >
>> > >> > > > 1. Start high-level consumer and initialize a KafkaStream to
>> get a
>> > >> > > > ConsumerIterator
>> > >> > > > 2. Consume n items (could be 10,000, could be 1,000,000) from
>> the
>> > >> > > iterator
>> > >> > > > 3. Commit the new offsets
>> > >> > > >
>> > >> > > > What we are seeing is that during step 2, some number of the n
>> > >> messages
>> > >> > > are
>> > >> > > > getting returned by the iterator in duplicate (in some cases,
>> > we've
>> > >> > seen
>> > >> > > > n*5 messages consumed).  The problem appears to go away if we
>> turn
>> > >> on
>> > >> > > auto
>> > >> > > > commit (and committing offsets to kafka helped too), but auto
>> > commit
>> > >> > > causes
>> > >> > > > conflicts with our offset rollback logic.  The issue seems to
>> > happen
>> > >> > more
>> > >> > > > when we are in our test environment on a lower-cost cloud
>> > provider.
>> > >> > > >
>> > >> > > > Diving into the Java and Scala classes including the
>> > >> ConsumerIterator,
>> > >> > > it's
>> > >> > > > not obvious what event causes a duplicate offset to be
>> requested
>> > or
>> > >> > > > returned (there's even a loop that is supposed to exclude
>> > duplicate
>> > >> > > > messages in this class).  I tried turning on trace logging but
>> my
>> > >> log4j
>> > >> > > > config isn't getting the Kafka client logs to write out.
>> > >> > > >
>> > >> > > > Does anyone have suggestions of where to look or how to enable
>> > >> logging?
>> > >> > > >
>> > >> > > > Thanks,
>> > >> > > > Cliff
>> > >> > >
>> > >> > >
>> > >> > > ________________________________
>> > >> > >
>> > >> > > This email and any attachments may contain confidential and
>> > privileged
>> > >> > > material for the sole use of the intended recipient. Any review,
>> > >> copying,
>> > >> > > or distribution of this email (or any attachments) by others is
>> > >> > prohibited.
>> > >> > > If you are not the intended recipient, please contact the sender
>> > >> > > immediately and permanently delete this email and any
>> attachments.
>> > No
>> > >> > > employee or agent of TiVo Inc. is authorized to conclude any
>> binding
>> > >> > > agreement on behalf of TiVo Inc. by email. Binding agreements
>> with
>> > >> TiVo
>> > >> > > Inc. may only be made by a signed written agreement.
>> > >> > >
>> > >> >
>> > >>
>> > >
>> >
>>
>

Reply via email to