C0urante commented on code in PR #11778: URL: https://github.com/apache/kafka/pull/11778#discussion_r887340726
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -349,30 +436,44 @@ public boolean contains(String connector) { /** * Write this connector configuration to persistent storage and wait until it has been acknowledged and read back by - * tailing the Kafka log with a consumer. + * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} must be successfully invoked before calling + * this method if the worker is configured to use a fencable producer for writes to the config topic. * * @param connector name of the connector to write data for * @param properties the configuration to write + * @throws ProducerFencedException if write privileges were claimed by another writer before this method was invoked + * @throws IllegalStateException if {@link #claimWritePrivileges()} is required, but was not successfully invoked before + * this method was called */ @Override public void putConnectorConfig(String connector, Map<String, String> properties) { log.debug("Writing connector configuration for connector '{}'", connector); Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0); connectConfig.put("properties", properties); byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig); - updateConnectorConfig(connector, serializedConfig); + try { + maybeSendFencibly(CONNECTOR_KEY(connector), serializedConfig); + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { Review Comment: This part isn't new, it's just an inlining of a private method that was only being used in one place. Considering it follows the logic used in all similar methods for this class and there haven't been any bugs reported related to it that I'm aware of, I'd like to err on the side of caution here and keep this logic as-is for now, though we can address in a follow-up if this is a genuine issue. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -349,30 +436,44 @@ public boolean contains(String connector) { /** * Write this connector configuration to persistent storage and wait until it has been acknowledged and read back by - * tailing the Kafka log with a consumer. + * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} must be successfully invoked before calling + * this method if the worker is configured to use a fencable producer for writes to the config topic. * * @param connector name of the connector to write data for * @param properties the configuration to write + * @throws ProducerFencedException if write privileges were claimed by another writer before this method was invoked + * @throws IllegalStateException if {@link #claimWritePrivileges()} is required, but was not successfully invoked before + * this method was called */ @Override public void putConnectorConfig(String connector, Map<String, String> properties) { log.debug("Writing connector configuration for connector '{}'", connector); Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0); connectConfig.put("properties", properties); byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig); - updateConnectorConfig(connector, serializedConfig); + try { + maybeSendFencibly(CONNECTOR_KEY(connector), serializedConfig); Review Comment: Fair enough, done. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1211,6 +1248,29 @@ private String leaderUrl() { return assignment.leaderUrl(); } + /** + * Perform an action that writes to the config topic, and if it fails because the leader has been fenced out, make note of that + * fact so that we can try to reclaim write ownership (if still the leader of the cluster) in a subsequent iteration of the tick loop. + * Note that it is not necessary to wrap every write to the config topic in this method, only the writes that should be performed + * exclusively by the leader. For example, {@link ConfigBackingStore#putTargetState(String, TargetState)} does not require this + * method, as it can be invoked by any worker in the cluster. + * @param write the action that writes to the config topic, such as {@link ConfigBackingStore#putSessionKey(SessionKey)} or + * {@link ConfigBackingStore#putConnectorConfig(String, Map)}. + * @return {@code true} if the write succeeded, and {@code false} if the write failed because the worker was fenced out. All other + * failures will be propagated as exceptions. + */ + private boolean writeToConfigTopic(Runnable write) { Review Comment: Fair enough, done 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java: ########## @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.connect.runtime.distributed; +package org.apache.kafka.connect.storage; Review Comment: That package contains a mix of public and private API. The public parts are all documented in the [Javadocs](https://kafka.apache.org/32/javadoc/org/apache/kafka/connect/storage/package-summary.html) for the package and include the `Converter` interface and the `SimpleHeaderConverter` class; the private parts (which include [everything in the connect/runtime module](https://github.com/apache/kafka/tree/1d6e3d6cb3a798983cafb0367b90c3a51579c364/connect/runtime/src/main/java/org/apache/kafka/connect/storage)) contain the `ClusterConfigState` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org