[ 
https://issues.apache.org/jira/browse/KAFKA-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boquan Tang updated KAFKA-8876:
-------------------------------
    Description: 
Currently KafkaBasedLog does not check if *all* partitions in the topic is 
online or not, this may result it ignoring partitions that's still recovering 
and in turn report to KafkaOffsetBackingStore null offset, while in fact it 
should either wait or fail the thread to prompt retry, so the offset can be 
correctly loaded by the connector.

Specifically, we are using debezium mysql connector to replicate mysql binlog 
to kafka.
In an attempt of restarting after a cluster downage, we observed following:
{code}
2019-08-29T19:27:32Z INFO 
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting 
KafkaOffsetBackingStore
2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] [main] 
Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets
...skipped client config logs...
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-11 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-8 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-23 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-7 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-22 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-6 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-3 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-9 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-24 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-4 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-19 to offset 0.
2019-08-29T19:27:34Z INFO 
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Finished 
reading offsets topic and starting KafkaOffsetBackingStore
{code}
bobqueue-binlog-shovel-v1-offsets-14, which contains offset for one of our 
debezium connectors is not presented. This results in:
{code}
2019-08-29T19:32:09Z INFO [io.debezium.connector.mysql.MySqlConnectorTask] 
[pool-3-thread-1] Found no existing offset, so preparing to perform a snapshot
{code}
Which is undesirable.

Can we make KafkaBasedLog to check the completeness of partition list and 
wait/fail when the check did not pass?

  was:
Currently KafkaBasedLog does not check if *all* partitions in the topic is 
online or not, this may result it ignoring partitions that's still recovering 
and in turn report to KafkaOffsetBackingStore null offset, while in fact it 
should either wait or fail the thread to prompt retry, so the offset can be 
correctly loaded by the connector.

Specifically, we are using debezium mysql connector to replicate mysql binlog 
to kafka.
In an attempt of restarting after a cluster downage, we observed following:
{code}
2019-08-29T19:27:32Z INFO 
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting 
KafkaOffsetBackingStore
2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] [main] 
Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets
...skipped client config logs...
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-11 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-8 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-23 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-7 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-22 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-6 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-3 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-9 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-24 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-4 to offset 0.
2019-08-29T19:27:33Z INFO [org.apache.kafka.clients.consumer.internals.Fetcher] 
[main] [Consumer clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] 
Resetting offset for partition bobqueue-binlog-shovel-v1-offsets-19 to offset 0.
2019-08-29T19:27:34Z INFO 
[org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Finished 
reading offsets topic and starting KafkaOffsetBackingStore
{code}
bobqueue-binlog-shovel-v1-offsets-14, which contains offset for one of our 
debezium connectors is not presented. This results in:
{code}
2019-08-29T19:32:09Z cmh-mexec1029 INFO 
[io.debezium.connector.mysql.MySqlConnectorTask] [pool-3-thread-1] Found no 
existing offset, so preparing to perform a snapshot
{code}
Which is undesirable.

Can we make KafkaBasedLog to check the completeness of partition list and 
wait/fail when the check did not pass?


> KafkaBasedLog does not throw exception when some partitions of the topic is 
> offline
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-8876
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8876
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boquan Tang
>            Priority: Major
>
> Currently KafkaBasedLog does not check if *all* partitions in the topic is 
> online or not, this may result it ignoring partitions that's still recovering 
> and in turn report to KafkaOffsetBackingStore null offset, while in fact it 
> should either wait or fail the thread to prompt retry, so the offset can be 
> correctly loaded by the connector.
> Specifically, we are using debezium mysql connector to replicate mysql binlog 
> to kafka.
> In an attempt of restarting after a cluster downage, we observed following:
> {code}
> 2019-08-29T19:27:32Z INFO 
> [org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting 
> KafkaOffsetBackingStore
> 2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] 
> [main] Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets
> ...skipped client config logs...
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-11 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-8 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-23 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-7 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-22 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-6 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-3 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-9 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-24 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-4 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-19 to offset 0.
> 2019-08-29T19:27:34Z INFO 
> [org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Finished 
> reading offsets topic and starting KafkaOffsetBackingStore
> {code}
> bobqueue-binlog-shovel-v1-offsets-14, which contains offset for one of our 
> debezium connectors is not presented. This results in:
> {code}
> 2019-08-29T19:32:09Z INFO [io.debezium.connector.mysql.MySqlConnectorTask] 
> [pool-3-thread-1] Found no existing offset, so preparing to perform a snapshot
> {code}
> Which is undesirable.
> Can we make KafkaBasedLog to check the completeness of partition list and 
> wait/fail when the check did not pass?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to