Frank, > I'm operating on the assumption that the connectors in question get stuck in an inconsistent state > Another thought... if an API exists to list all connectors in such a state, then at least some monitoring/alerting could be put in place, right?
There is two different inconsistencies relevant to this discussion. Inconsistent state (A) is when a new connector config (generation 2) has been written to the config topic, but a full set of old task configs (generation 1) exist in the config topic. Inconsistent state (B) is when a new connector config (generation 2) has been written to the config topic, and there is an incomplete set of new task configs (generation 2) in the config topic. Since this is a reconfiguration, it could also be a mix of generation 1 and generation task 2 configs in the config topic. Inconsistent state (A) is a normal part of updating a connector config; the cluster writes the connector config to the config topic durably, and then begins to try to asynchronously regenerate task configs. Inconsistent state (B) is not normal, and happens when a worker is unable to atomically write a full set of task configurations to the config topic. The inconsistentConnectors method will report connectors in state (B), but will not report connectors in state (A). In state (A), you will see tasks running stale configurations. In state (B) you will see no tasks running, as the framework will prevent starting tasks which do not have consistent (B) task configs. As you're not seeing the no-tasks symptom, I would put state (B) out of mind and assume that the KafkaConfigBackingStore will give you atomic read and write semantics for a full set of task configs at once. > I see on KafkaConfigBackingStore.putTaskConfigs that if the JavaDoc is to be believed, a ConnectException is thrown "if the task configurations do not resolve inconsistencies found in the existing root and task configurations." It looks like ConnectExceptions are thrown after a failure to read or write to the config topic, which is a pretty broad class of errors. But if any such error occurs, the code cannot be guaranteed that a full set of task configs + task commit message was written, and the topic may be in state (A) or state (B). This method is called specifically to resolve state (A) or state (B), and the exception is just indicating that whatever inconsistency was present before the call may still be there. > Am I right that there is only one retry on that exception in DistributedHerder.reconfigureConnectorTasksWithRetry? No, the implementation of `reconfigureTasksWithRetry` calls itself in its error callback, and calls reconfigureConnector to retry the operation. This is a recursive loop, but isn't obvious because it's inside a callback. This is the condition which is causing the issue: https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931 The DistributedHerder is comparing the generation 1 and generation 2 configurations, and erroneously believes that they are equal, when in fact the underlying secrets have changed. This prevents the DistributedHerder from writing generation 2 task configs to the KafkaConfigBackingStore entirely, leaving it in state (A) without realizing it. Thanks for working on this issue, Greg On Wed, Feb 8, 2023 at 8:38 AM Frank Grimes <frankgrime...@yahoo.com.invalid> wrote: > Another thought... if an API exists to list all connectors in such a > state, then at least some monitoring/alerting could be put in place, right?