Boquan Tang created KAFKA-8876: ---------------------------------- Summary: KafkaBasedLog does not throw exception when some partitions of the topic is offline Key: KAFKA-8876 URL: https://issues.apache.org/jira/browse/KAFKA-8876 Project: Kafka Issue Type: Bug Reporter: Boquan Tang
Currently KafkaBasedLog does not check if *all* partitions in the topic is online or not, this may result it ignoring partitions that's still recovering and in turn report to KafkaOffsetBackingStore null offset, while in fact it should either wait or fail the thread to prompt retry, so the offset can be correctly loaded by the connector. Specifically, we are using debezium mysql connector to replicate mysql binlog to kafka. In an attempt of restarting after a cluster downage, we observed following: {code} 2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting KafkaOffsetBackingStore 2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] [main] Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets ...skipped client config logs... 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-11 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-8 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-23 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-7 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-22 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-6 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-3 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-9 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-24 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-4 to offset 0. 2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-19 to offset 0. 2019-08-29T19:27:34Z INFO [org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Finished reading offsets topic and starting KafkaOffsetBackingStore {code} bobqueue-binlog-shovel-v1-offsets-14, which contains offset for one of our debezium connectors is not presented. This results in: {code} 2019-08-29T19:32:09Z cmh-mexec1029 INFO [io.debezium.connector.mysql.MySqlConnectorTask] [pool-3-thread-1] Found no existing offset, so preparing to perform a snapshot {code} Which is undesirable. Can we make KafkaBasedLog to check the completeness of partition list and wait/fail when the check did not pass? -- This message was sent by Atlassian Jira (v8.3.2#803003)