tombentley commented on code in PR #11778:
URL: https://github.com/apache/kafka/pull/11778#discussion_r886997419


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -98,6 +101,15 @@ public interface ConfigBackingStore {
      */
     void putRestartRequest(RestartRequest restartRequest);
 
+    /**
+     * Prepare to write to the backing config store. May be required by some 
implementations (such as those that only permit a single
+     * writer at a time across a cluster of workers) before performing 
mutative operations like writing configurations, target states, etc.

Review Comment:
   ```suggestion
        * writer at a time across a cluster of workers) before performing 
mutating operations like writing configurations, target states, etc.
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##########
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.runtime.distributed;
+package org.apache.kafka.connect.storage;

Review Comment:
   `org.apache.kafka.connect.storage` is a public package, and AFAICS this 
change in public API wasn't mentioned in KIP-618.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1211,6 +1248,29 @@ private String leaderUrl() {
         return assignment.leaderUrl();
     }
 
+    /**
+     * Perform an action that writes to the config topic, and if it fails 
because the leader has been fenced out, make note of that
+     * fact so that we can try to reclaim write ownership (if still the leader 
of the cluster) in a subsequent iteration of the tick loop.
+     * Note that it is not necessary to wrap every write to the config topic 
in this method, only the writes that should be performed
+     * exclusively by the leader. For example, {@link 
ConfigBackingStore#putTargetState(String, TargetState)} does not require this
+     * method, as it can be invoked by any worker in the cluster.
+     * @param write the action that writes to the config topic, such as {@link 
ConfigBackingStore#putSessionKey(SessionKey)} or
+     *              {@link ConfigBackingStore#putConnectorConfig(String, Map)}.
+     * @return {@code true} if the write succeeded, and {@code false} if the 
write failed because the worker was fenced out. All other
+     * failures will be propagated as exceptions.
+     */
+    private boolean writeToConfigTopic(Runnable write) {

Review Comment:
   > Note that it is not necessary to wrap every write to the config topic in 
this method, only the writes that should be performed exclusively by the leader.
   
   Should we make this part of the method name, e.g. 
`writeToConfigTopicAsLeader`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -291,7 +315,16 @@ public void start() {
         log.info("Starting KafkaConfigBackingStore");
         // Before startup, callbacks are *not* invoked. You can grab a 
snapshot after starting -- just take care that
         // updates can continue to occur in the background
-        configLog.start();
+        try {
+            configLog.start();
+        } catch (UnsupportedVersionException e) {
+            throw new ConnectException(
+                    "Enabling exactly-once support for source connectors 
requires a Kafka broker version that allows "
+                            + "admin clients to read consumer offsets. Disable 
the worker's exactly-once support "
+                            + "for source connectors, or user a new Kafka 
broker version.",

Review Comment:
   ```suggestion
                               + "for source connectors, or use a new Kafka 
broker version.",
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -349,30 +436,44 @@ public boolean contains(String connector) {
 
     /**
      * Write this connector configuration to persistent storage and wait until 
it has been acknowledged and read back by
-     * tailing the Kafka log with a consumer.
+     * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} 
must be successfully invoked before calling
+     * this method if the worker is configured to use a fencable producer for 
writes to the config topic.
      *
      * @param connector  name of the connector to write data for
      * @param properties the configuration to write
+     * @throws ProducerFencedException if write privileges were claimed by 
another writer before this method was invoked
+     * @throws IllegalStateException if {@link #claimWritePrivileges()} is 
required, but was not successfully invoked before
+     * this method was called
      */
     @Override
     public void putConnectorConfig(String connector, Map<String, String> 
properties) {
         log.debug("Writing connector configuration for connector '{}'", 
connector);
         Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, connectConfig);
-        updateConnectorConfig(connector, serializedConfig);
+        try {
+            maybeSendFencibly(CONNECTOR_KEY(connector), serializedConfig);

Review Comment:
   If we're talking about a "fencable" producer, then shouldn't this be 
"fencably", rather than "fencibly"?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -349,30 +436,44 @@ public boolean contains(String connector) {
 
     /**
      * Write this connector configuration to persistent storage and wait until 
it has been acknowledged and read back by
-     * tailing the Kafka log with a consumer.
+     * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} 
must be successfully invoked before calling
+     * this method if the worker is configured to use a fencable producer for 
writes to the config topic.
      *
      * @param connector  name of the connector to write data for
      * @param properties the configuration to write
+     * @throws ProducerFencedException if write privileges were claimed by 
another writer before this method was invoked
+     * @throws IllegalStateException if {@link #claimWritePrivileges()} is 
required, but was not successfully invoked before
+     * this method was called
      */
     @Override
     public void putConnectorConfig(String connector, Map<String, String> 
properties) {
         log.debug("Writing connector configuration for connector '{}'", 
connector);
         Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, connectConfig);
-        updateConnectorConfig(connector, serializedConfig);
+        try {
+            maybeSendFencibly(CONNECTOR_KEY(connector), serializedConfig);
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {

Review Comment:
   I realise that we're not trying to support a thread cancellation mechanism 
here, but shouldn't we set the thread's interrupted status when catching 
`InterruptedException`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to