C0urante commented on code in PR #11778:
URL: https://github.com/apache/kafka/pull/11778#discussion_r887340726


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -349,30 +436,44 @@ public boolean contains(String connector) {
 
     /**
      * Write this connector configuration to persistent storage and wait until 
it has been acknowledged and read back by
-     * tailing the Kafka log with a consumer.
+     * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} 
must be successfully invoked before calling
+     * this method if the worker is configured to use a fencable producer for 
writes to the config topic.
      *
      * @param connector  name of the connector to write data for
      * @param properties the configuration to write
+     * @throws ProducerFencedException if write privileges were claimed by 
another writer before this method was invoked
+     * @throws IllegalStateException if {@link #claimWritePrivileges()} is 
required, but was not successfully invoked before
+     * this method was called
      */
     @Override
     public void putConnectorConfig(String connector, Map<String, String> 
properties) {
         log.debug("Writing connector configuration for connector '{}'", 
connector);
         Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, connectConfig);
-        updateConnectorConfig(connector, serializedConfig);
+        try {
+            maybeSendFencibly(CONNECTOR_KEY(connector), serializedConfig);
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {

Review Comment:
   This part isn't new, it's just an inlining of a private method that was only 
being used in one place. Considering it follows the logic used in all similar 
methods for this class and there haven't been any bugs reported related to it 
that I'm aware of, I'd like to err on the side of caution here and keep this 
logic as-is for now, though we can address in a follow-up if this is a genuine 
issue.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -349,30 +436,44 @@ public boolean contains(String connector) {
 
     /**
      * Write this connector configuration to persistent storage and wait until 
it has been acknowledged and read back by
-     * tailing the Kafka log with a consumer.
+     * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} 
must be successfully invoked before calling
+     * this method if the worker is configured to use a fencable producer for 
writes to the config topic.
      *
      * @param connector  name of the connector to write data for
      * @param properties the configuration to write
+     * @throws ProducerFencedException if write privileges were claimed by 
another writer before this method was invoked
+     * @throws IllegalStateException if {@link #claimWritePrivileges()} is 
required, but was not successfully invoked before
+     * this method was called
      */
     @Override
     public void putConnectorConfig(String connector, Map<String, String> 
properties) {
         log.debug("Writing connector configuration for connector '{}'", 
connector);
         Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, connectConfig);
-        updateConnectorConfig(connector, serializedConfig);
+        try {
+            maybeSendFencibly(CONNECTOR_KEY(connector), serializedConfig);

Review Comment:
   Fair enough, done.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1211,6 +1248,29 @@ private String leaderUrl() {
         return assignment.leaderUrl();
     }
 
+    /**
+     * Perform an action that writes to the config topic, and if it fails 
because the leader has been fenced out, make note of that
+     * fact so that we can try to reclaim write ownership (if still the leader 
of the cluster) in a subsequent iteration of the tick loop.
+     * Note that it is not necessary to wrap every write to the config topic 
in this method, only the writes that should be performed
+     * exclusively by the leader. For example, {@link 
ConfigBackingStore#putTargetState(String, TargetState)} does not require this
+     * method, as it can be invoked by any worker in the cluster.
+     * @param write the action that writes to the config topic, such as {@link 
ConfigBackingStore#putSessionKey(SessionKey)} or
+     *              {@link ConfigBackingStore#putConnectorConfig(String, Map)}.
+     * @return {@code true} if the write succeeded, and {@code false} if the 
write failed because the worker was fenced out. All other
+     * failures will be propagated as exceptions.
+     */
+    private boolean writeToConfigTopic(Runnable write) {

Review Comment:
   Fair enough, done 👍 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##########
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.runtime.distributed;
+package org.apache.kafka.connect.storage;

Review Comment:
   That package contains a mix of public and private API. The public parts are 
all documented in the 
[Javadocs](https://kafka.apache.org/32/javadoc/org/apache/kafka/connect/storage/package-summary.html)
 for the package and include the `Converter` interface and the 
`SimpleHeaderConverter` class; the private parts (which include [everything in 
the connect/runtime 
module](https://github.com/apache/kafka/tree/1d6e3d6cb3a798983cafb0367b90c3a51579c364/connect/runtime/src/main/java/org/apache/kafka/connect/storage))
 contain the `ClusterConfigState` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to