Varsha Abhinandan created KAFKA-10313: -----------------------------------------
Summary: Out of range offset errors leading to offset reset Key: KAFKA-10313 URL: https://issues.apache.org/jira/browse/KAFKA-10313 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.2.2 Reporter: Varsha Abhinandan Hi, We have been occasionally noticing offset resets happening on the Kafka consumer because of offset out of range error. However, I don't see any errors in the broker logs. No logs related to leader-election, replica lag, Kafka broker pod restarts or anything. (just info logs were enabled in the prod environment). It appeared from the logs that the out of range error was because of the fetch offset being larger than the offset range on the broker. Noticed this happening multiple times on different consumers, stream apps in the prod environment. So, it doesn't seem like an application bug and more like a bug in the KafkaConsumer. Would like to understand the cause for such errors. Also, none of the offset reset options are desirable. Choosing "earliest" creates a sudden huge lag (we have a retention of 24hours) and choosing "latest" leads to data loss (the records produced between the out of range error and when offset reset happens on the consumer). So, wondering if it is better for the Kafka client to separate out 'auto.offset.reset' config for just offset not found. For, out of range error maybe the Kafka client can automatically reset the offset to latest if the fetch offset is higher to prevent data loss. Also, automatically reset it to earliest if the fetch offset is lesser than the start offset. Following are the logs on the consumer side : [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 ([prd453-19-event-upsert]-bo-pipeline-12)] [o.a.k.c.consumer.internals.Fetcher] [Consumer clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range for partition prd453-19-event-upsert-32, resetting offset[2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 ([prd453-19-event-upsert]-bo-pipeline-12)] [o.a.k.c.consumer.internals.Fetcher] [Consumer clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, groupId=bo-indexer-group-prd453-19] Resetting offset for partition prd453-19-event-upsert-32 to offset 453223789. Broker logs for the partition : _[2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable segments with base offsets [452091893] due to retention time 86400000ms breach_ _[2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log segment [baseOffset 452091893, size 1073741693] for deletion._ _[2020-07-17T07:40:12,083Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log start offset to 453223789_ _[2020-07-17T07:41:12,083Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 452091893_ _[2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] [kafka.log.LogSegment] Deleted log /data/kafka/prd453-19-event-upsert-32/00000000000452091893.log.deleted._ _[2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] [kafka.log.LogSegment] Deleted offset index /data/kafka/prd453-19-event-upsert-32/00000000000452091893.index.deleted._ _[2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] [kafka.log.LogSegment] Deleted time index /data/kafka/prd453-19-event-upsert-32/00000000000452091893.timeindex.deleted._ _[2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] [kafka.log.ProducerStateManager] [ProducerStateManager partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 475609786_ _[2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log segment at offset 475609786 in 1 ms._ _[2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable segments with base offsets [453223789] due to retention time 86400000ms breach_ _[2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log segment [baseOffset 453223789, size 1073741355] for deletion._ _[2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log start offset to 454388428_ _[2020-07-17T09:06:12,075Z] [INFO ] [kafka-scheduler-6] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 453223789_ _[2020-07-17T09:06:12,108Z] [INFO ] [kafka-scheduler-6] [kafka.log.LogSegment] Deleted log /data/kafka/prd453-19-event-upsert-32/00000000000453223789.log.deleted._ _[2020-07-17T09:06:12,108Z] [INFO ] [kafka-scheduler-6] [kafka.log.LogSegment] Deleted offset index /data/kafka/prd453-19-event-upsert-32/00000000000453223789.index.deleted._ _[2020-07-17T09:06:12,108Z] [INFO ] [kafka-scheduler-6] [kafka.log.LogSegment] Deleted time index /data/kafka/prd453-19-event-upsert-32/00000000000453223789.timeindex.deleted._ _[2020-07-17T09:06:56,682Z] [INFO ] [data-plane-kafka-request-handler-1] [kafka.log.ProducerStateManager] [ProducerStateManager partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 476681303_ _[2020-07-17T09:06:56,683Z] [INFO ] [data-plane-kafka-request-handler-1] [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log segment at offset 476681303 in 1 ms._ Kafka version : 2.2.2 Number of brokers : 9 Log retention of the topic : 24 hours Kafka configs : unclean.leader.election.enable = false auto.leader.rebalance.enable = true background.threads = 10 broker.id.generation.enable = true [connection.failed.authentication.delay.ms|http://connection.failed.authentication.delay.ms/] = 100 [connections.max.idle.ms|http://connections.max.idle.ms/] = 600000 default.replication.factor = 3 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true [group.initial.rebalance.delay.ms|http://group.initial.rebalance.delay.ms/] = 3000 [group.max.session.timeout.ms|http://group.max.session.timeout.ms/] = 300000 [group.min.session.timeout.ms|http://group.min.session.timeout.ms/] = 6000 inter.broker.protocol.version = 2.2-IV1 kafka.metrics.polling.interval.secs = 10 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL listeners = null [log.cleaner.backoff.ms|http://log.cleaner.backoff.ms/] = 15000 log.cleaner.dedupe.buffer.size = 134217728 [log.cleaner.delete.retention.ms|http://log.cleaner.delete.retention.ms/] = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 5.24288E7 log.cleaner.min.cleanable.ratio = 0.5 [log.cleaner.min.compaction.lag.ms|http://log.cleaner.min.compaction.lag.ms/] = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /data/kafka log.flush.interval.messages = 20000 [log.flush.interval.ms|http://log.flush.interval.ms/] = 10000 [log.flush.offset.checkpoint.interval.ms|http://log.flush.offset.checkpoint.interval.ms/] = 60000 [log.flush.scheduler.interval.ms|http://log.flush.scheduler.interval.ms/] = 2000 [log.flush.start.offset.checkpoint.interval.ms|http://log.flush.start.offset.checkpoint.interval.ms/] = 60000 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = true log.message.format.version = 2.2-IV1 [log.message.timestamp.difference.max.ms|http://log.message.timestamp.difference.max.ms/] = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 [log.retention.check.interval.ms|http://log.retention.check.interval.ms/] = 300000 log.retention.hours = 24 log.roll.hours = 168 log.segment.bytes = 1073741824 [log.segment.delete.delay.ms|http://log.segment.delete.delay.ms/] = 60000 max.incremental.fetch.session.cache.slots = 1000 message.max.bytes = 1000012 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 8 num.partitions = 20 num.recovery.threads.per.data.dir = 4 num.replica.alter.log.dirs.threads = null num.replica.fetchers = 4 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 [offsets.commit.timeout.ms|http://offsets.commit.timeout.ms/] = 5000 offsets.load.buffer.size = 5242880 [offsets.retention.check.interval.ms|http://offsets.retention.check.interval.ms/] = 600000 offsets.retention.minutes = 10080 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 3 offsets.topic.segment.bytes = 104857600 producer.purgatory.purge.interval.requests = 100 queued.max.request.bytes = -1 queued.max.requests = 16 [replica.fetch.backoff.ms|http://replica.fetch.backoff.ms/] = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 [replica.fetch.wait.max.ms|http://replica.fetch.wait.max.ms/] = 500 [replica.high.watermark.checkpoint.interval.ms|http://replica.high.watermark.checkpoint.interval.ms/] = 5000 [replica.lag.time.max.ms|http://replica.lag.time.max.ms/] = 25000 replica.socket.receive.buffer.bytes = 65536 [replica.socket.timeout.ms|http://replica.socket.timeout.ms/] = 30000 [request.timeout.ms|http://request.timeout.ms/] = 28000 -- This message was sent by Atlassian Jira (v8.3.4#803005)