Hi, I would request to help me with this issue.
I should not commit offset when any exception comes while processing a message. I am using below approach to manually comit offset. Can you please help me in getting uncommited offsets to re-process them in later point of time. import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.support.Acknowledgment; public class ConsumerKafka implements AcknowledgingMessageListener<String, String>{ @Override @KafkaListener(id = "consumer", topics = {"${kafka.topic}"} ) public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) { // TODO Auto-generated method stub try{ System.out.println("Read Record is : " + data.value()); System.out.println("Offset is : " + data.offset()); System.out.println("Topic is : " + data.topic()); System.out.println("Partition is : " + data.partition()); acknowledgment.acknowledge(); }catch (Exception e ){ System.out.println("Push the messaged to Error Stream : " + e); } } } If any exception comes catch block doesnt commit the offset. Kafka Config. import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContain erFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerConta iner; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.policy.AlwaysRetryPolicy; import org.springframework.retry.support.RetryTemplate; import com.learnbootkafka.consumer.ConsumerKafka; @Configuration @EnableKafka public class KafkaConfig { @Autowired Environment env; /** * Consumer Config Starts */ @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setPollTimeout(3000); factory.getContainerProperties().setAckMode(AbstractMessageListenerContain er.AckMode.MANUAL); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("kafka.broker")); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit")); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms")); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id")); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("kafka.auto.offset.reset")); return propsMap; } @Bean public ConsumerKafka listener() { return new ConsumerKafka(); } Would be thankful for getting help. Thank You, Suraj PR