You should set the reset to latest, commit offsets manually using a rebalance listener. In this way upon seek() you should get all data right.
Also when you say “Uncommitted” offset, that means you haven’t really processed them. So you should determine such failure, manually control offset commits, and protect from duplication. Regards, On Fri, 1 Jun 2018, 18:32 pradeep s, <sreekumar.prad...@gmail.com> wrote: > Than you. In my case i am receiving messages , doing a small transformation > and sending to a output topic . > If i am running 4 consumers against 4 partitions and one of the consumer > dies , will there be duplicate messages sent in this case > Since when the new consumer comes up , it will again process from the > uncommitted offset . > So do i need transaction semantics in this scenario. > > > On Fri, Jun 1, 2018 at 4:56 AM, M. Manna <manme...@gmail.com> wrote: > > > This is actually quite nicely explained by Jason Gustafson on this > article > > - > > https://www.confluent.io/blog/tutorial-getting-started-with- > > the-new-apache-kafka-0-9-consumer-client/ > > > > It's technically up to the application on how to determine whether > message > > is fully received. If you have database txn involved, I would say that > > CommitFailedException should revert all changes you have done. Because > you > > couldn't commit the offset successfully, you haven't "Really" consumed > any > > message. > > > > Tailoring your code a little bit: > > > > @Override > > public void run() { > > try { > > do { > > processRecords(kafkaConsumer.poll(kafkaConfig. > > getPollTimeoutMs())); > > kafkaConsumer.commitSync(); > > } while (!isConsumerLoopClosed.get()); > > } catch (WakeupException wakeupException) { > > //do nothing if wakeupException is from shutdown hook > > if (!isConsumerLoopClosed.get()) { > > handleConsumerLoopException(wakeupException); > > } > > } catch (RuntimeException ex) { // RuntimeException could also happen > > for other reasons here > > if (ex instanceof CommitFailedException) { > > // revert db txn etc. to avoid false positives > > } else if (ex instanceof KafkaException) { > > // do something else. > > } else { > > // alternatively, do this > > } > > handleConsumerLoopException(ex); > > } finally { > > kafkaConsumer.close(); > > } > > > > } > > > > One thing to remember is that when you are sending data, as of 1.0.0 API > > you can have a "Txn-like" finer control to determine when you have > > successfully committed a transaction. You can check beginTransaction(), > > commitTransaction(), abortTransaction() methods to see how they can be > > utilised to have even finer control over your message delivery. > > > > Regards, > > > > > > On 1 June 2018 at 05:54, pradeep s <sreekumar.prad...@gmail.com> wrote: > > > > > Hi, > > > I am running a poll loop for kafka consumer and the app is deployed in > > > kubernetes.I am using manual commits.Have couple of questions on > > exception > > > handling in the poll loop > > > > > > 1) Do i need to handle consumer rebalance scenario(when any of the > > consumer > > > pod dies) by adding a listener or will the commits be taken care after > > > rebalance . > > > > > > 2) Do i need to handle CommitFailedException specifically > > > > > > Consume loop code below > > > > > > > > > @Override > > > public void run() { > > > try { > > > do { > > > processRecords(kafkaConsumer.poll(kafkaConfig. > > > getPollTimeoutMs())); > > > kafkaConsumer.commitSync(); > > > } while (!isConsumerLoopClosed.get()); > > > } catch (WakeupException wakeupException) { > > > //do nothing if wakeupException is from shutdown hook > > > if (!isConsumerLoopClosed.get()) { > > > handleConsumerLoopException(wakeupException); > > > } > > > } catch (RuntimeException ex) { > > > handleConsumerLoopException(ex); > > > } finally { > > > kafkaConsumer.close(); > > > } > > > > > > > > > } > > > > > > Thanks > > > Pradeep > > > > > >