Hi Everyone, I wanted to what is the best and secure way of error handling for KafkaConsumer. I am using confluent's recommended consumer implementation. my delivery semantics is at least once. I am switching off the auto commit as well. Or I should just switch on the auto commit. The thing is I am processing all the messages in an asynchronous manner, so I only want to do a commit offset when it comes back from the processing
Here is the KafkaConsumerRunner - private void doCommitSync() { try { consumer.commitSync(); } catch (WakeupException e) { // we're shutting down, but finish the commit first and then // rethrow the exception so that the main loop can exit doCommitSync(); throw e; } catch (CommitFailedException e) { // the commit failed with an unrecoverable error. if there is any // internal state which depended on the commit, you can clean it // up here. otherwise it's reasonable to ignore the error and go on LOGGER.error("Commit failed", e); } } /** * run the thread in here */ @Override public void run() { application.getCurrentSession(); CountDownLatch batchCompleteLatch; try { consumer.subscribe(topics, new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { doCommitSync(); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {} }); while (true) { // adding a count down latch to block the loop until the current batchCompleteLatch = new CountDownLatch(1); ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); LOGGER.info("Processing - " + records.count() + " records"); Observable.<String>create(subscriber -> records.forEach(consumerRecord -> subscriber.onNext(consumerRecord.value()))) .map(json -> JsonSerializer.deserialize(json, A2FKafkaMessage.class)) .groupBy(A2FKafkaMessage::getOrgId) .flatMap(grp -> { return messageProcessor.process(grp.map(a2FKafkaMessage -> { AssetDataPoint assetDataPoint = (AssetDataPoint) a2FKafkaMessage.getMessage(); assetDataPoint.setOrgId(a2FKafkaMessage.getOrgId()); return assetDataPoint; })); }) .subscribe( assetDataPoint -> {}, throwable -> { LOGGER.error(throwable.toString(), throwable); batchCompleteLatch.countDown(); throw new }, // do nothing here, just don't commit () -> { consumer.commitAsync(); batchCompleteLatch.countDown(); } ); batchCompleteLatch.await(); } } catch (WakeupException e) { // ignore we're closing } catch (Exception e) { LOGGER.error(e.toString(), e); } finally { consumer.close(); shutdownLatch.countDown(); } } Do let me know if you have any thoughts or improvements on this. Thanks, Abhinav