Boquan Tang created KAFKA-8876:
----------------------------------

             Summary: KafkaBasedLog does not throw exception when some 
partitions of the topic is offline
                 Key: KAFKA-8876
                 URL: https://issues.apache.org/jira/browse/KAFKA-8876
             Project: Kafka
          Issue Type: Bug
            Reporter: Boquan Tang


Currently KafkaBasedLog does not check if *all* partitions in the topic is 
online or not, this may result it ignoring partitions that's still recovering 
and in turn report to KafkaOffsetBackingStore null offset, while in fact it 
should either wait or fail the thread to prompt retry, so the offset can be 
correctly loaded by the connector.

Specifically, we are using debezium mysql connector to replicate mysql binlog 
to kafka.
In an attempt of restarting after a cluster downage, we observed following:
{code}
2019-08-29T19:27:32Z INFO 
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting 
KafkaOffsetBackingStore
2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] [main] 
Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets
...skipped client config logs...
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-11 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-8 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-23 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-7 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-22 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-6 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-3 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-9 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-24 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-4 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-19 to offset 0.
2019-08-29T19:27:34Z INFO 
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Finished 
reading offsets topic and starting KafkaOffsetBackingStore
{code}
bobqueue-binlog-shovel-v1-offsets-14, which contains offset for one of our 
debezium connectors is not presented. This results in:
{code}
2019-08-29T19:32:09Z cmh-mexec1029 INFO 
[io.debezium.connector.mysql.MySqlConnectorTask] [pool-3-thread-1] Found no 
existing offset, so preparing to perform a snapshot
{code}
Which is undesirable.

Can we make KafkaBasedLog to check the completeness of partition list and 
wait/fail when the check did not pass?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to