Frank,

> I'm operating on the assumption that the connectors in question get stuck
in an inconsistent state
> Another thought... if an API exists to list all connectors in such a
state, then at least some monitoring/alerting could be put in place, right?

There is two different inconsistencies relevant to this discussion.
Inconsistent state (A) is when a new connector config (generation 2) has
been written to the config topic, but a full set of old task configs
(generation 1) exist in the config topic.
Inconsistent state (B) is when a new connector config (generation 2) has
been written to the config topic, and there is an incomplete set of new
task configs (generation 2) in the config topic.
Since this is a reconfiguration, it could also be a mix of generation 1 and
generation task 2 configs in the config topic.

Inconsistent state (A) is a normal part of updating a connector config; the
cluster writes the connector config to the config topic durably, and then
begins to try to asynchronously regenerate task configs.
Inconsistent state (B) is not normal, and happens when a worker is unable
to atomically write a full set of task configurations to the config topic.
The inconsistentConnectors method will report connectors in state (B), but
will not report connectors in state (A).
In state (A), you will see tasks running stale configurations. In state (B)
you will see no tasks running, as the framework will prevent starting tasks
which do not have consistent (B) task configs.
As you're not seeing the no-tasks symptom, I would put state (B) out of
mind and assume that the KafkaConfigBackingStore will give you atomic read
and write semantics for a full set of task configs at once.

> I see on KafkaConfigBackingStore.putTaskConfigs that if the JavaDoc is to
be believed, a ConnectException is thrown "if the task configurations do
not resolve inconsistencies found in the existing root and task
configurations."

It looks like ConnectExceptions are thrown after a failure to read or write
to the config topic, which is a pretty broad class of errors.
But if any such error occurs, the code cannot be guaranteed that a full set
of task configs + task commit message was written, and the topic may be in
state (A) or state (B).
This method is called specifically to resolve state (A) or state (B), and
the exception is just indicating that whatever inconsistency was present
before the call may still be there.

> Am I right that there is only one retry on that exception in
DistributedHerder.reconfigureConnectorTasksWithRetry?

No, the implementation of `reconfigureTasksWithRetry` calls itself in its
error callback, and calls reconfigureConnector to retry the operation.
This is a recursive loop, but isn't obvious because it's inside a callback.

This is the condition which is causing the issue:
https://github.com/apache/kafka/blob/6e2b86597d9cd7c8b2019cffb895522deb63c93a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1918-L1931
The DistributedHerder is comparing the generation 1 and generation 2
configurations, and erroneously believes that they are equal, when in fact
the underlying secrets have changed.
This prevents the DistributedHerder from writing generation 2 task configs
to the KafkaConfigBackingStore entirely, leaving it in state (A) without
realizing it.

Thanks for working on this issue,
Greg

On Wed, Feb 8, 2023 at 8:38 AM Frank Grimes <frankgrime...@yahoo.com.invalid>
wrote:

> Another thought... if an API exists to list all connectors in such a
> state, then at least some monitoring/alerting could be put in place, right?

Reply via email to