Hi Phil,

Regarding pause and resume,I have not tried this approach but i think this
approach may not be feasible. If your consumer no longer has that partition
assigned from which record being processed was fetched or even if partition
is assigned again to consumer somehow you may still not be able to do this
and see UNKNOWN_MEMBER_ID or error ILLEGAL_GENERATION_ID in logs.Let me
know if this approach works for you. i will also try this then.

Kafka never sends same record to 2 consumers in same consumer group. If
another consumer got the same record then it means that first consumer
doesn't has that particular partition assigned to it anymore. Even if you
want to commit from first consumer you will not be able to and commit will
throw exception. Even after increasing sessionTimeOut a rebalance can still
occur. I see that kafka sends a metadata refresh request after every 300000
ms (default) and even though nothing changed (no new consumer, broker or
partition) this refresh generally triggers a rebalance  (atleast in my
tests) .

Calling commitSync renews session and keeps consumer alive. commitSync is a
blocking operation so you may not want to call it on each record
processing. You can try calling commitSync(Offset) just before starting to
process a record only if lets say 75% of configured session time is
elapsed. This will keep your consumer alive during rare longer processing
time and will also not commit each record. But as i said earlier this will
not guarantee that rebalance will not happen. A metadata refresh or other
change may still trigger rebalance but if you take above approach then
atleast a rebalance will not occur because of session time out during a
longer processing time.

if you maintain offsets outside of kafka and configure consumers to
coordinate with each other through these external offsets then you can skip
processing duplicate records even if kafka sends same records twice.
Through these external offsets you will have to device a way to skip a
record if already processed by another consumer or wait if same record is
in process by another consumer.


Regards,
Vinay Sharma



On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <phil.luckhu...@encycle.com>
wrote:

> Thanks for all the responses. Unfortunately it seems that currently there
> is no fool proof solution to this. It's not a problem with the stored
> offsets as it will happen even if I do a commitSync after each record is
> processed. It's the unprocessed records in the batch that get processed
> twice.
>
> I'm now taking the approach of trying to limit the possibility of a
> rebalance as much as possible by reducing the data returned by poll. I'm
> also using the pause, poll, resume pattern to ensure the consumer doesn't
> cause a rebalance if the processing loop takes longer than
> session.timeout.ms.
>
> Cheers,
> Phil
>
>
>
> On 21 Apr 2016 16:24, at 16:24, vinay sharma <vinsharma.t...@gmail.com>
> wrote:
> >Hi,
> >
> >By design Kafka does ensure not to send same record to multiple
> >consumers
> >in same consumer group. Issue is because of rebalance while a
> >processing is
> >going on and records are not yet commited. In my view there are only 2
> >possible solutions to it
> >1) As mentioned in documentation, store offsets outside of kafka (
> >
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> ).
> >This is a complete solution but will definitely add extra developement
> >and
> >also extra processing to each message. Problem may still exist if at
> >the
> >time of a crash consumer was out of sync from external custom offset
> >storage and offsets stored in kafka both.
> >2)  As mentioned in fix for defect 919 (
> >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to
> >true.
> >This will make kafka commit fetched records before rebalancing. Only
> >drawback is that some records may never be processed if consumer
> >crashes
> >while processing records which are already marked committed due to
> >rebalance.
> >
> >Regards,
> >Vinay Sharma
>

Reply via email to