Dmytro created FLINK-27962: ------------------------------ Summary: KafkaSourceReader fails to commit consumer offsets for checkpoints Key: FLINK-27962 URL: https://issues.apache.org/jira/browse/FLINK-27962 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.4, 1.15.0 Reporter: Dmytro
The KafkaSourceReader works well for many hours, then fails and re-connects successfully, then continues to work some time. After the first three failures it hangs on "Offset commit failed" and never connected again. Restarting the Flink job does help and it works until the next "3 times fail". *Failed to commit consumer offsets for checkpoint:* {code:java} Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-06-06 14:19:52,297 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464521 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-06-06 14:20:02,297 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464522 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-06-06 14:20:02,297 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464523 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets ..... fails permanently until the job restart {code} *Consumer Config:* {code:java} allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [test.host.net:9093] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = test-client-id client.rack = connections.max.idle.ms = 180000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test-group-id group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 180000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 60000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = class com.test.kafka.security.AzureAuthenticateCallbackHandler sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = OAUTHBEARER security.protocol = SASL_SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 30000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)