Hi, We have been noticing offset resets happening on the kafka consumer because of offset out of range error. Following are the INFO logs found on the consumer side :
[2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 ([prd453-19-event-upsert]-bo-pipeline-12)] [o.a.k.c.consumer.internals.Fetcher] [Consumer clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range for partition prd453-19-event-upsert-32, resetting offset [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 ([prd453-19-event-upsert]-bo-pipeline-12)] [o.a.k.c.consumer.internals.Fetcher] [Consumer clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, groupId=bo-indexer-group-prd453-19] Resetting offset for partition prd453-19-event-upsert-32 to offset 453223789. However, I don't see any errors in the broker logs(just info logs were enabled in the production environment). No logs related to leader-election, replica lag, kafka broker pod restarts or anything. Following are the logs on 2 brokers with respect to the partition for which the offset got reset. Kafka-6 *[2020-07-17T07:40:12,092Z] [INFO ] [ReplicaFetcherThread-2-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log start offset to 453223789[2020-07-17T07:40:12,641Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable segments with base offsets [452091893] due to retention time 86400000ms breach[2020-07-17T07:40:12,641Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log segment [baseOffset 452091893, size 1073741693] for deletion.[2020-07-17T07:41:12,642Z] [INFO ] [kafka-scheduler-0] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 452091893[2020-07-17T07:41:12,712Z] [INFO ] [kafka-scheduler-0] [kafka.log.LogSegment] Deleted log /data/kafka/prd453-19-event-upsert-32/00000000000452091893.log.deleted.[2020-07-17T07:41:12,713Z] [INFO ] [kafka-scheduler-0] [kafka.log.LogSegment] Deleted offset index /data/kafka/prd453-19-event-upsert-32/00000000000452091893.index.deleted.[2020-07-17T07:41:12,713Z] [INFO ] [kafka-scheduler-0] [kafka.log.LogSegment] Deleted time index /data/kafka/prd453-19-event-upsert-32/00000000000452091893.timeindex.deleted.[2020-07-17T07:52:31,839Z] [INFO ] [ReplicaFetcherThread-2-7] [kafka.log.ProducerStateManager] [ProducerStateManager partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 475609786[2020-07-17T07:52:31,840Z] [INFO ] [ReplicaFetcherThread-2-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log segment at offset 475609786 in 2 ms.[2020-07-17T09:05:12,085Z] [INFO ] [ReplicaFetcherThread-2-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log start offset to 454388428[2020-07-17T09:05:12,640Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable segments with base offsets [453223789] due to retention time 86400000ms breach[2020-07-17T09:05:12,640Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log segment [baseOffset 453223789, size 1073741355] for deletion.[2020-07-17T09:06:12,641Z] [INFO ] [kafka-scheduler-3] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 453223789[2020-07-17T09:06:12,697Z] [INFO ] [kafka-scheduler-3] [kafka.log.LogSegment] Deleted log /data/kafka/prd453-19-event-upsert-32/00000000000453223789.log.deleted.[2020-07-17T09:06:12,697Z] [INFO ] [kafka-scheduler-3] [kafka.log.LogSegment] Deleted offset index /data/kafka/prd453-19-event-upsert-32/00000000000453223789.index.deleted.[2020-07-17T09:06:12,697Z] [INFO ] [kafka-scheduler-3] [kafka.log.LogSegment] Deleted time index /data/kafka/prd453-19-event-upsert-32/00000000000453223789.timeindex.deleted.[2020-07-17T09:06:56,684Z] [INFO ] [ReplicaFetcherThread-2-7] [kafka.log.ProducerStateManager] [ProducerStateManager partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 476681303[2020-07-17T09:06:56,685Z] [INFO ] [ReplicaFetcherThread-2-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log segment at offset 476681303 in 1 ms.* Kafka-7 *[2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable segments with base offsets [452091893] due to retention time 86400000ms breach[2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log segment [baseOffset 452091893, size 1073741693] for deletion.[2020-07-17T07:40:12,083Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log start offset to 453223789[2020-07-17T07:41:12,083Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 452091893[2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] [kafka.log.LogSegment] Deleted log /data/kafka/prd453-19-event-upsert-32/00000000000452091893.log.deleted.[2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] [kafka.log.LogSegment] Deleted offset index /data/kafka/prd453-19-event-upsert-32/00000000000452091893.index.deleted.[2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] [kafka.log.LogSegment] Deleted time index /data/kafka/prd453-19-event-upsert-32/00000000000452091893.timeindex.deleted.[2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] [kafka.log.ProducerStateManager] [ProducerStateManager partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 475609786[2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log segment at offset 475609786 in 1 ms.[2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable segments with base offsets [453223789] due to retention time 86400000ms breach[2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log segment [baseOffset 453223789, size 1073741355] for deletion.[2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log start offset to 454388428[2020-07-17T09:06:12,075Z] [INFO ] [kafka-scheduler-6] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 453223789[2020-07-17T09:06:12,108Z] [INFO ] [kafka-scheduler-6] [kafka.log.LogSegment] Deleted log /data/kafka/prd453-19-event-upsert-32/00000000000453223789.log.deleted.[2020-07-17T09:06:12,108Z] [INFO ] [kafka-scheduler-6] [kafka.log.LogSegment] Deleted offset index /data/kafka/prd453-19-event-upsert-32/00000000000453223789.index.deleted.[2020-07-17T09:06:12,108Z] [INFO ] [kafka-scheduler-6] [kafka.log.LogSegment] Deleted time index /data/kafka/prd453-19-event-upsert-32/00000000000453223789.timeindex.deleted.[2020-07-17T09:06:56,682Z] [INFO ] [data-plane-kafka-request-handler-1] [kafka.log.ProducerStateManager] [ProducerStateManager partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 476681303[2020-07-17T09:06:56,683Z] [INFO ] [data-plane-kafka-request-handler-1] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log segment at offset 476681303 in 1 ms.* As observed in the logs, the fetch offset 476383711 is not higher than the log start offset. It appears that the out of range error was because of the fetch offset being larger than the offset range on the broker. Would like to understand the cause for these errors because none of the offset reset options are desirable. Choosing "earliest" creates a sudden huge lag (we have a retention of 24hours) and choosing "latest" leads to data loss (the records produced between the out of range error and when offset reset happens on the consumer). Kafka version : 2.2.2 Number of brokers : 9 Log retention of the topic : 24 hours Kafka configs : advertised.host.name = 10.xx.xx.xx advertised.listeners = null advertised.port = null alter.config.policy.class.name = null alter.log.dirs.replication.quota.window.num = 11 alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name = auto.create.topics.enable = false auto.leader.rebalance.enable = true background.threads = 10 broker.id = 6 broker.id.generation.enable = true broker.rack = null client.quota.callback.class = null compression.type = producer connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 600000 connections.max.reauth.ms = 0 control.plane.listener.name = null controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 3 delegation.token.expiry.check.interval.ms = 3600000 delegation.token.expiry.time.ms = 86400000 delegation.token.master.key = null delegation.token.max.lifetime.ms = 604800000 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true fetch.purgatory.purge.interval.requests = 100 group.initial.rebalance.delay.ms = 3000 group.max.session.timeout.ms = 300000 group.max.size = 2147483647 group.min.session.timeout.ms = 6000 host.name = 0.0.0.0 inter.broker.listener.name = null inter.broker.protocol.version = 2.2-IV1 kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL listeners = null log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 5.24288E7 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /data/kafka log.flush.interval.messages = 20000 log.flush.interval.ms = 10000 log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 2000 log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = true log.message.format.version = 2.2-IV1 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 24 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = max.incremental.fetch.session.cache.slots = 1000 message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 8 num.partitions = 20 num.recovery.threads.per.data.dir = 4 num.replica.alter.log.dirs.threads = null num.replica.fetchers = 4 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 10080 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 3 offsets.topic.segment.bytes = 104857600 password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding password.encoder.iterations = 4096 password.encoder.key.length = 128 password.encoder.keyfactory.algorithm = null password.encoder.old.secret = null password.encoder.secret = null port = 6667 principal.builder.class = null producer.purgatory.purge.interval.requests = 100 queued.max.request.bytes = -1 queued.max.requests = 16 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 25000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 28000 reserved.broker.max.id = 400000000 sasl.client.callback.handler.class = null sasl.enabled.mechanisms = [GSSAPI] sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism.inter.broker.protocol = GSSAPI sasl.server.callback.handler.class = null security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 1048576 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 1048576 ssl.cipher.suites = [] ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.principal.mapping.rules = [DEFAULT] ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000 transaction.max.timeout.ms = 900000 transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr = 1 transaction.state.log.num.partitions = 50 transaction.state.log.replication.factor = 3 transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = false zookeeper.connect = cxx-zookeeper-svc.ana-zookeeper.svc.cluster.local:2181/xxxx/analytics/kafka/c19-v1 zookeeper.connection.timeout.ms = 10000 zookeeper.max.in.flight.requests = 10 zookeeper.session.timeout.ms = 15000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 Thanks, Varsha