kkonstantine commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660275673



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> 
connectorProps, Callback
         });
     }
 
+    /**
+     * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+     * and the current status of the connector and task instances.
+     *
+     * @param request the restart request; may not be null
+     * @return the restart plan, or empty if this worker has no status for the 
connector named in the request and therefore the
+     *         connector cannot be restarted
+     */
+    public Optional<RestartPlan> buildRestartPlan(RestartRequest request) {
+        String connectorName = request.connectorName();
+        ConnectorStatus connectorStatus = 
statusBackingStore.get(connectorName);
+        if (connectorStatus == null) {
+            return Optional.empty();
+        }
+
+        // If requested, mark the connector as restarting
+        AbstractStatus.State connectorState;
+        if (request.shouldRestartConnector(connectorStatus)) {
+            connectorState = AbstractStatus.State.RESTARTING;
+        } else {
+            connectorState = connectorStatus.state();
+        }

Review comment:
       nit: the ternary operator can be used (`?:`) as below, unless you're not 
a fan. 
   ```suggestion
           AbstractStatus.State connectorState = 
request.shouldRestartConnector(connectorStatus)
                   ? AbstractStatus.State.RESTARTING 
                   : connectorStatus.state();
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,132 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {

Review comment:
       nit: similar style as elsewhere in this file
   ```suggestion
       public void restartConnectorAndTasks(RestartRequest request, 
Callback<ConnectorStateInfo> callback) {
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Objects;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * A request to restart a connector and/or task instances.
+ * <p>The natural order is based first upon the connector name and then 
requested restart behaviors. 
+ * If two requests have the same connector name, then the requests are ordered 
based on the 
+ * probable number of tasks/connector this request is going to restart.
+ */
+public class RestartRequest implements Comparable<RestartRequest> {
+
+    private final String connectorName;
+    private final boolean onlyFailed;
+    private final boolean includeTasks;
+
+    /**
+     * Create a new request to restart a connector and optionally its tasks.
+     *
+     * @param connectorName the name of the connector; may not be null
+     * @param onlyFailed    true if only failed instances should be restarted
+     * @param includeTasks  true if tasks should be restarted, or false if 
only the connector should be restarted
+     */
+    public RestartRequest(String connectorName, boolean onlyFailed, boolean 
includeTasks) {
+        this.connectorName = Objects.requireNonNull(connectorName, "Connector 
name may not be null");
+        this.onlyFailed = onlyFailed;
+        this.includeTasks = includeTasks;
+    }
+
+    /**
+     * Get the name of the connector.
+     *
+     * @return the connector name; never null
+     */
+    public String connectorName() {
+        return connectorName;
+    }
+
+    /**
+     * Determine whether only failed instances be restarted.
+     *
+     * @return true if only failed instances should be restarted, or false if 
all applicable instances should be restarted
+     */
+    public boolean onlyFailed() {
+        return onlyFailed;
+    }
+
+    /**
+     * Determine whether {@link Task} instances should also be restarted in 
addition to the {@link Connector} instance.
+     *
+     * @return true if the connector and task instances should be restarted, 
or false if just the connector should be restarted
+     */
+    public boolean includeTasks() {
+        return includeTasks;
+    }
+
+    /**
+     * Determine whether the connector with the given status is to be 
restarted.
+     *
+     * @param status the connector status; may not be null
+     * @return true if the connector is to be restarted, or false otherwise
+     */
+    public boolean shouldRestartConnector(ConnectorStatus status) {
+        return !onlyFailed || status.state() == AbstractStatus.State.FAILED;
+    }
+
+    /**
+     * Determine whether only the {@link Connector} instance is to be 
restarted even if not failed.
+     *
+     * @return true if only the {@link Connector} instance is to be restarted 
even if not failed, or false otherwise
+     */
+    public boolean forciblyRestartConnectorOnly() {
+        return !onlyFailed() && !includeTasks();
+    }
+
+    /**
+     * Determine whether the task instance with the given status is to be 
restarted.
+     *
+     * @param status the task status; may not be null
+     * @return true if the task is to be restarted, or false otherwise
+     */
+    public boolean shouldRestartTask(TaskStatus status) {
+        return includeTasks && (!onlyFailed || status.state() == 
AbstractStatus.State.FAILED);
+    }
+
+    @Override
+    public int compareTo(RestartRequest o) {
+        int result = connectorName.compareTo(o.connectorName);
+        if (result == 0) {
+            result = impactRank() - o.impactRank();
+        }
+        return result;

Review comment:
       ```suggestion
           return result == 0 ? impactRank() - o.impactRank() : result;
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -208,6 +209,21 @@ public static String COMMIT_TASKS_KEY(String 
connectorName) {
             .field("creation-timestamp", Schema.INT64_SCHEMA)
             .build();
 
+    public static final String RESTART_PREFIX = "restart-connector-";
+
+    public static String RESTART_KEY(String connectorName) {
+        return RESTART_PREFIX + connectorName;
+    }
+
+    public static final Boolean ONLY_FAILED_DEFAULT = Boolean.FALSE;
+    public static final Boolean INCLUDE_TASKS_DEFAULT = Boolean.FALSE;

Review comment:
       I agree the conversion is on a frequently used path. But maybe it's the 
code below that can be re-written to avoid both autoboxing and unboxing (when 
it's not required)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+/**
+ * An immutable restart plan.
+ */
+public class RestartPlan {
+
+    private final RestartRequest request;
+    private final ConnectorStateInfo stateInfo;
+    private final Collection<ConnectorTaskId> idsToRestart;
+
+    /**
+     * Create a new request to restart a connector and optionally its tasks.
+     *
+     * @param request          the restart request; may not be null
+     * @param restartStateInfo the current state info for the connector; may 
not be null
+     */
+    public RestartPlan(RestartRequest request, ConnectorStateInfo 
restartStateInfo) {
+        this.request = Objects.requireNonNull(request, "RestartRequest name 
may not be null");
+        this.stateInfo = Objects.requireNonNull(restartStateInfo, 
"ConnectorStateInfo name may not be null");
+        // Collect the task IDs to stop and restart (may be none)
+        idsToRestart = Collections.unmodifiableList(
+                stateInfo.tasks()
+                        .stream()
+                        .filter(this::isRestarting)
+                        .map(taskState -> new 
ConnectorTaskId(request.connectorName(), taskState.id()))
+                        .collect(Collectors.toList())
+        );
+    }
+
+    /**
+     * Get the connector name.
+     *
+     * @return the name of the connector; never null
+     */
+    public String connectorName() {
+        return request.connectorName();
+    }
+
+    /**
+     * Get the original {@link RestartRequest}.
+     *
+     * @return the restart request; never null
+     */
+    public RestartRequest restartRequest() {
+        return request;
+    }
+
+    /**
+     * Get the {@link ConnectorStateInfo} that reflects the current state of 
the connector <em>except</em> with the {@code status}
+     * set to {@link AbstractStatus.State#RESTARTING} for the {@link 
Connector} instance and any {@link Task} instances that
+     * are to be restarted, based upon the {@link #restartRequest() restart 
request}.
+     *
+     * @return the connector state info that reflects the restart plan; never 
null
+     */
+    public ConnectorStateInfo restartConnectorStateInfo() {

Review comment:
       No problem.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
 
     }
 
+    @SuppressWarnings("unchecked")
+    RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> 
record, SchemaAndValue value) {
+        String connectorName = record.key().substring(RESTART_PREFIX.length());
+        if (value.value() == null) {
+            log.error("Ignoring restart request because it is unexpectedly 
null");
+            return null;
+        }
+        if (!(value.value() instanceof Map)) {
+            log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+            return null;
+        }
+
+        Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
+
+        Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
+        if (failed == null) {
+            log.warn("Invalid data for restart request '{}' field was missing, 
defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT);
+            failed = ONLY_FAILED_DEFAULT;
+        }
+        if (!(failed instanceof Boolean)) {
+            log.warn("Invalid data for restart request '{}' field should be a 
Boolean but is {}, defaulting to {}", ONLY_FAILED_FIELD_NAME, 
failed.getClass(), ONLY_FAILED_DEFAULT);
+            failed = ONLY_FAILED_DEFAULT;
+        }
+        boolean onlyFailed = (Boolean) failed;
+
+        Object withTasks = valueAsMap.get(INCLUDE_TASKS_FIELD_NAME);
+        if (withTasks == null) {
+            log.warn("Invalid data for restart request '{}' field was missing, 
defaulting to {}", INCLUDE_TASKS_FIELD_NAME, INCLUDE_TASKS_DEFAULT);
+            withTasks = INCLUDE_TASKS_DEFAULT;
+        }
+        if (!(withTasks instanceof Boolean)) {
+            log.warn("Invalid data for restart request '{}' field should be a 
Boolean but is {}, defaulting to {}", INCLUDE_TASKS_FIELD_NAME, 
withTasks.getClass(), INCLUDE_TASKS_DEFAULT);
+            withTasks = INCLUDE_TASKS_DEFAULT;
+        }
+        boolean includeTasks = (Boolean) withTasks;

Review comment:
       Similar suggestion as above

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
 
     }
 
+    @SuppressWarnings("unchecked")
+    RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> 
record, SchemaAndValue value) {
+        String connectorName = record.key().substring(RESTART_PREFIX.length());
+        if (value.value() == null) {
+            log.error("Ignoring restart request because it is unexpectedly 
null");
+            return null;
+        }
+        if (!(value.value() instanceof Map)) {
+            log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+            return null;
+        }
+
+        Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
+
+        Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
+        if (failed == null) {
+            log.warn("Invalid data for restart request '{}' field was missing, 
defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT);
+            failed = ONLY_FAILED_DEFAULT;
+        }
+        if (!(failed instanceof Boolean)) {
+            log.warn("Invalid data for restart request '{}' field should be a 
Boolean but is {}, defaulting to {}", ONLY_FAILED_FIELD_NAME, 
failed.getClass(), ONLY_FAILED_DEFAULT);
+            failed = ONLY_FAILED_DEFAULT;
+        }
+        boolean onlyFailed = (Boolean) failed;

Review comment:
       Following up on the comment above, here's how we could write this and 
avoid autoboxing/unboxing while using the more readable primitives. 
   ```suggestion
           if (!(failed instanceof Boolean)) {
               log.warn("Invalid data for restart request '{}' field should be 
a Boolean but is {}, defaulting to {}",
                       ONLY_FAILED_FIELD_NAME, failed == null ? "null" : 
failed.getClass(), ONLY_FAILED_DEFAULT);
               onlyFailed = ONLY_FAILED_DEFAULT;
           } else {
               onlyFailed = (Boolean) failed;
           }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Objects;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * A request to restart a connector and/or task instances.
+ * <p>The natural order is based first upon the connector name and then 
requested restart behaviors. 
+ * If two requests have the same connector name, then the requests are ordered 
based on the 
+ * probable number of tasks/connector this request is going to restart.
+ */
+public class RestartRequest implements Comparable<RestartRequest> {
+
+    private final String connectorName;
+    private final boolean onlyFailed;
+    private final boolean includeTasks;
+
+    /**
+     * Create a new request to restart a connector and optionally its tasks.
+     *
+     * @param connectorName the name of the connector; may not be null
+     * @param onlyFailed    true if only failed instances should be restarted
+     * @param includeTasks  true if tasks should be restarted, or false if 
only the connector should be restarted
+     */
+    public RestartRequest(String connectorName, boolean onlyFailed, boolean 
includeTasks) {
+        this.connectorName = Objects.requireNonNull(connectorName, "Connector 
name may not be null");
+        this.onlyFailed = onlyFailed;
+        this.includeTasks = includeTasks;
+    }
+
+    /**
+     * Get the name of the connector.
+     *
+     * @return the connector name; never null
+     */
+    public String connectorName() {
+        return connectorName;
+    }
+
+    /**
+     * Determine whether only failed instances be restarted.
+     *
+     * @return true if only failed instances should be restarted, or false if 
all applicable instances should be restarted
+     */
+    public boolean onlyFailed() {
+        return onlyFailed;
+    }
+
+    /**
+     * Determine whether {@link Task} instances should also be restarted in 
addition to the {@link Connector} instance.
+     *
+     * @return true if the connector and task instances should be restarted, 
or false if just the connector should be restarted
+     */
+    public boolean includeTasks() {
+        return includeTasks;
+    }
+
+    /**
+     * Determine whether the connector with the given status is to be 
restarted.
+     *
+     * @param status the connector status; may not be null
+     * @return true if the connector is to be restarted, or false otherwise
+     */
+    public boolean shouldRestartConnector(ConnectorStatus status) {
+        return !onlyFailed || status.state() == AbstractStatus.State.FAILED;
+    }
+
+    /**
+     * Determine whether only the {@link Connector} instance is to be 
restarted even if not failed.
+     *
+     * @return true if only the {@link Connector} instance is to be restarted 
even if not failed, or false otherwise
+     */
+    public boolean forciblyRestartConnectorOnly() {

Review comment:
       ```suggestion
       public boolean forceRestartConnectorOnly() {
   ```
   ? (seems a bit more common way to say it, but I don't have a strong opinion)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+/**
+ * An immutable restart plan per connector.
+ */
+public class RestartPlan {
+
+    private final RestartRequest request;
+    private final ConnectorStateInfo stateInfo;
+    private final Collection<ConnectorTaskId> idsToRestart;
+
+    /**
+     * Create a new plan to restart a connector and optionally its tasks.
+     *
+     * @param request          the restart request; may not be null
+     * @param restartStateInfo the current state info for the connector; may 
not be null
+     */
+    public RestartPlan(RestartRequest request, ConnectorStateInfo 
restartStateInfo) {
+        this.request = Objects.requireNonNull(request, "RestartRequest name 
may not be null");
+        this.stateInfo = Objects.requireNonNull(restartStateInfo, 
"ConnectorStateInfo name may not be null");
+        // Collect the task IDs to stop and restart (may be none)
+        this.idsToRestart = Collections.unmodifiableList(
+                stateInfo.tasks()
+                        .stream()
+                        .filter(this::isRestarting)
+                        .map(taskState -> new 
ConnectorTaskId(request.connectorName(), taskState.id()))
+                        .collect(Collectors.toList())
+        );
+    }
+
+    /**
+     * Get the connector name.
+     *
+     * @return the name of the connector; never null
+     */
+    public String connectorName() {
+        return request.connectorName();
+    }
+
+    /**
+     * Get the original {@link RestartRequest}.
+     *
+     * @return the restart request; never null
+     */
+    public RestartRequest restartRequest() {
+        return request;
+    }
+
+    /**
+     * Get the {@link ConnectorStateInfo} that reflects the current state of 
the connector <em>except</em> with the {@code status}
+     * set to {@link AbstractStatus.State#RESTARTING} for the {@link 
Connector} instance and any {@link Task} instances that
+     * are to be restarted, based upon the {@link #restartRequest() restart 
request}.
+     *
+     * @return the connector state info that reflects the restart plan; never 
null
+     */
+    public ConnectorStateInfo restartConnectorStateInfo() {
+        return stateInfo;
+    }
+
+    /**
+     * Get the immutable collection of {@link ConnectorTaskId} for all tasks 
to be restarted
+     * based upon the {@link #restartRequest() restart request}.
+     *
+     * @return the IDs of the tasks to be restarted; never null but possibly 
empty
+     */
+    public Collection<ConnectorTaskId> taskIdsToRestart() {
+        return idsToRestart;
+    }
+
+    /**
+     * Determine whether the {@link Connector} instance is to be restarted
+     * based upon the {@link #restartRequest() restart request}.
+     *
+     * @return true if the {@link Connector} instance is to be restarted, or 
false otherwise
+     */
+    public boolean shouldRestartConnector() {
+        return isRestarting(stateInfo.connector());
+    }
+
+    /**
+     * Determine whether at least one {@link Task} instance is to be restarted
+     * based upon the {@link #restartRequest() restart request}.
+     *
+     * @return true if any {@link Task} instances are to be restarted, or 
false if none are to be restarted
+     */
+    public boolean shouldRestartTasks() {
+        return !taskIdsToRestart().isEmpty();
+    }
+
+    /**
+     * Get the number of connector tasks that are to be restarted
+     * based upon the {@link #restartRequest() restart request}.
+     *
+     * @return the number of {@link Task} instance is to be restarted
+     */
+    public int restartTaskCount() {
+        return taskIdsToRestart().size();
+    }
+
+    /**
+     * Get the total number of tasks in the connector.
+     *
+     * @return the total number of tasks
+     */
+    public int totalTaskCount() {
+        return stateInfo.tasks().size();
+    }
+
+    private boolean isRestarting(ConnectorStateInfo.AbstractState state) {
+        return isRestarting(state.state());
+    }
+
+    private boolean isRestarting(String state) {
+        return 
AbstractStatus.State.RESTARTING.toString().equalsIgnoreCase(state);
+    }
+
+    @Override
+    public String toString() {
+        if (shouldRestartConnector()) {
+            return String.format(
+                    "plan to restart connector and %d of %d tasks for %s", 
restartTaskCount(), totalTaskCount(), request
+            );
+        }
+        return String.format(
+                "plan to restart %d of %d tasks for %s", restartTaskCount(), 
totalTaskCount(), request
+        );
+    }

Review comment:
       something like? 
   ```suggestion
   return shouldRestartConnector()
                  ? String.format("plan to restart connector and %d of %d tasks 
for %s", restartTaskCount(), totalTaskCount(), request)
                  : String.format("plan to restart %d of %d tasks for %s", 
restartTaskCount(), totalTaskCount(), request);
   ```
   breaking this long statement doesn't make it much less long I guess. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -717,6 +750,15 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
 
                 if (started)
                     updateListener.onTaskConfigUpdate(updatedTasks);
+            } else if (record.key().startsWith(RESTART_PREFIX)) {
+                RestartRequest request = recordToRestartRequest(record, value);
+                if (request == null) {
+                    return;
+                }
+                // Only notify the listener if this backing store is already 
successfully started (having caught up the first time)
+                if (started) {

Review comment:
       Since we print the warning inside `recordToRestartRequest` we can 
probably avoid the early return with:
   ```suggestion
                   // Only notify the listener if this backing store is already 
successfully started (having caught up the first time)
                   if (request != null && started) {
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
 
     }
 
+    @SuppressWarnings("unchecked")
+    RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> 
record, SchemaAndValue value) {
+        String connectorName = record.key().substring(RESTART_PREFIX.length());
+        if (value.value() == null) {
+            log.error("Ignoring restart request because it is unexpectedly 
null");
+            return null;
+        }
+        if (!(value.value() instanceof Map)) {
+            log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+            return null;
+        }

Review comment:
       I see we've been verbose in similar logic above before, but maybe we can 
improve on that a bit, at least in new code. Here's another suggestion: 
   ```suggestion
           if (!(value.value() instanceof Map)) {
               log.error("Ignoring restart request because the value is not a 
Map but is {}", value.value() == null ? "null" : value.value().getClass());
               return null;
           }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,132 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlan(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+                        }
+                    } else {
+                        callback.onCompletion(new NotLeaderException("Only the 
leader can process restart requests.", leaderUrl()), null);
+                    }
+
+                    return null;
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    /**
+     * Process all pending restart requests. There can be at most one request 
per connector.
+     *
+     * <p>This method is called from within the {@link #tick()} method.
+     */
+    void processRestartRequests() {
+        List<RestartRequest> restartRequests;
+        synchronized (this) {
+            if (pendingRestartRequests.isEmpty()) {
+                return;
+            }
+            //dequeue into a local list to minimize the work being done within 
the synchronized block
+            restartRequests = new ArrayList<>(pendingRestartRequests.values());
+            pendingRestartRequests.clear();
+        }
+        for (RestartRequest restartRequest : restartRequests) {

Review comment:
       nit: type inference is nice ... 
   ```suggestion
   restartRequests.forEach(restartRequest -> {
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
##########
@@ -334,6 +353,15 @@ public StartAndStopLatch expectedStops(int expectedStops, 
boolean includeTasks)
         return startAndStopCounter.expectedStops(expectedStops, taskLatches);
     }
 
+    public StartAndStopLatch expectedStops(int expectedStops, Map<String, 
Integer> expectedTasksStops, boolean includeTasks) {
+        List<StartAndStopLatch> taskLatches = null;

Review comment:
       see comment above on whether `null` matters. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> 
connectorProps, Callback
         });
     }
 
+    /**
+     * Build the {@link RestartPlan} that describes what should and should not 
be restarted given the restart request
+     * and the current status of the connector and task instances.
+     *
+     * @param request the restart request; may not be null
+     * @return the restart plan, or empty if this worker has no status for the 
connector named in the request and therefore the
+     *         connector cannot be restarted
+     */
+    public Optional<RestartPlan> buildRestartPlan(RestartRequest request) {
+        String connectorName = request.connectorName();
+        ConnectorStatus connectorStatus = 
statusBackingStore.get(connectorName);
+        if (connectorStatus == null) {
+            return Optional.empty();
+        }
+
+        // If requested, mark the connector as restarting
+        AbstractStatus.State connectorState;
+        if (request.shouldRestartConnector(connectorStatus)) {
+            connectorState = AbstractStatus.State.RESTARTING;
+        } else {
+            connectorState = connectorStatus.state();
+        }
+        ConnectorStateInfo.ConnectorState connectorInfoState = new 
ConnectorStateInfo.ConnectorState(
+                connectorState.toString(),
+                connectorStatus.workerId(),
+                connectorStatus.trace()
+        );
+
+        // Collect the task states, If requested, mark the task as restarting
+        List<ConnectorStateInfo.TaskState> taskStates = 
statusBackingStore.getAll(connectorName)
+                .stream()
+                .map(taskStatus -> {
+                    AbstractStatus.State state = taskStatus.state();
+                    if (request.shouldRestartTask(taskStatus)) {
+                        state = AbstractStatus.State.RESTARTING;
+                    }

Review comment:
       A bit confusing that a second assignment follows if the `if` statement 
is true. 
   I'd also call the variable `taskState` (as opposed to `connectorState` above)
   
   Ternary can be used here as well: 
   ```suggestion
                       AbstractStatus.State state = 
request.shouldRestartTask(taskStatus)
                               ? AbstractStatus.State.RESTARTING
                               : taskStatus.state();
   ```
   (as with any suggestion from github, please check it compiles and conforms 
to the style)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
 
     }
 
+    @SuppressWarnings("unchecked")
+    RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> 
record, SchemaAndValue value) {
+        String connectorName = record.key().substring(RESTART_PREFIX.length());
+        if (value.value() == null) {
+            log.error("Ignoring restart request because it is unexpectedly 
null");
+            return null;
+        }
+        if (!(value.value() instanceof Map)) {
+            log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+            return null;
+        }
+
+        Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
+
+        Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
+        if (failed == null) {
+            log.warn("Invalid data for restart request '{}' field was missing, 
defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT);

Review comment:
       btw, all other log statements are `log.error` elsewhere. Should we 
remain consistent with that, instead of using `log.warn` just here? The issues 
seem similar above. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,132 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlan(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());

Review comment:
       safe to skip the declaration here. 
   ```suggestion
                               callback.onCompletion(null, 
plan.get().restartConnectorStateInfo());
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
##########
@@ -280,6 +289,16 @@ public StartAndStopLatch expectedStarts(int 
expectedStarts, boolean includeTasks
         return startAndStopCounter.expectedStarts(expectedStarts, taskLatches);
     }
 
+    public StartAndStopLatch expectedStarts(int expectedStarts, Map<String, 
Integer> expectedTasksStarts, boolean includeTasks) {
+        List<StartAndStopLatch> taskLatches = null;
+        if (includeTasks) {
+            taskLatches = taskHandles.values().stream()
+                    .map(task -> 
task.expectedStarts(expectedTasksStarts.get(task.taskId())))
+                    .collect(Collectors.toList());
+        }
+        return startAndStopCounter.expectedStarts(expectedStarts, taskLatches);

Review comment:
       dependents in the implementation of `StartAndStopLatch` seems to 
overload the meaning of `null` with a check. But it actually doesn't seem to 
matter. The call is equivalent to passing an empty list. 
   
   Should we simplify that with something like: 
   ```
   List<StartAndStopLatch> taskLatches = includeTasks
                   ? taskHandles.values().stream()
                           .map(task -> task.expectedStarts(expectedStarts))
                           .collect(Collectors.toList())
                   : Collections.emptyList();
                   ```
   ?
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,132 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlan(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+                        }
+                    } else {
+                        callback.onCompletion(new NotLeaderException("Only the 
leader can process restart requests.", leaderUrl()), null);
+                    }
+

Review comment:
       nit: extra
   ```suggestion
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
##########
@@ -428,6 +429,43 @@ public void restartConnector(String connName) {
         }
     }
 
+    /**
+     * Restart an existing connector and its tasks.
+     *
+     * @param connName  name of the connector to be restarted
+     * @param onlyFailed    true if only failed instances should be restarted
+     * @param includeTasks  true if tasks should be restarted, or false if 
only the connector should be restarted
+     * @param onlyCallOnEmptyWorker true if the REST API call should be called 
on a worker not running this connector or its tasks
+     * @throws ConnectRestException if the REST API returns error status
+     * @throws ConnectException for any other error.
+     */
+    public ConnectorStateInfo restartConnectorAndTasks(String connName, 
boolean onlyFailed, boolean includeTasks, boolean onlyCallOnEmptyWorker) {
+        ObjectMapper mapper = new ObjectMapper();
+        String restartPath = String.format("connectors/%s/restart?onlyFailed=" 
+ onlyFailed + "&includeTasks=" + includeTasks, connName);
+        String restartEndpoint;
+        if (onlyCallOnEmptyWorker) {
+            restartEndpoint = 
endpointForResourceNotRunningConnector(restartPath, connName);
+        } else {
+            restartEndpoint = endpointForResource(restartPath);
+        }
+        Response response = requestPost(restartEndpoint, "", 
Collections.emptyMap());
+        try {
+            if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
+                //only the 202 stauts returns a body
+                if (response.getStatus() == 
Response.Status.ACCEPTED.getStatusCode()) {
+                    return mapper.readerFor(ConnectorStateInfo.class)
+                            .readValue(responseToString(response));
+                } else {
+                    return null;

Review comment:
       does it make sense to skip, given that it's called below? (I hope I'm 
not missing something)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,132 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlan(request);

Review comment:
       I'd call it `plan` here. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,132 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlan(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+                        }
+                    } else {
+                        callback.onCompletion(new NotLeaderException("Only the 
leader can process restart requests.", leaderUrl()), null);
+                    }
+
+                    return null;
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    /**
+     * Process all pending restart requests. There can be at most one request 
per connector.
+     *
+     * <p>This method is called from within the {@link #tick()} method.
+     */
+    void processRestartRequests() {
+        List<RestartRequest> restartRequests;
+        synchronized (this) {
+            if (pendingRestartRequests.isEmpty()) {
+                return;
+            }
+            //dequeue into a local list to minimize the work being done within 
the synchronized block
+            restartRequests = new ArrayList<>(pendingRestartRequests.values());
+            pendingRestartRequests.clear();
+        }
+        for (RestartRequest restartRequest : restartRequests) {
+            try {
+                doRestartConnectorAndTasks(restartRequest);
+            } catch (Exception e) {
+                log.warn("Unexpected error while trying to process " + 
restartRequest + ", the restart request will be skipped.", e);
+            }
+        }
+    }
+
+    /**
+     * Builds and executes a restart plan for the connector and its tasks from 
<code>request</code>.
+     * Execution of a plan involves triggering the stop of eligible 
connector/tasks and then queuing the start for eligible connector/tasks.
+     *
+     * @param request the request to restart connector and tasks
+     */
+    protected synchronized void doRestartConnectorAndTasks(RestartRequest 
request) {
+        String connectorName = request.connectorName();
+        Optional<RestartPlan> maybePlan = buildRestartPlan(request);
+        if (!maybePlan.isPresent()) {
+            log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
+            return;
+        }
+        RestartPlan plan = maybePlan.get();
+        log.info("Executing {}", plan);
+
+        // If requested, stop the connector and any tasks, marking each as 
restarting
+        final ExtendedAssignment currentAssignments = assignment;
+        final Collection<ConnectorTaskId> assignedIdsToRestart = 
plan.taskIdsToRestart()
+                .stream()
+                .filter(taskId -> currentAssignments.tasks().contains(taskId))
+                .collect(Collectors.toList());
+        final boolean restartConnector = plan.shouldRestartConnector() && 
currentAssignments.connectors().contains(connectorName);
+        final boolean restartTasks = !assignedIdsToRestart.isEmpty();
+        if (restartConnector) {
+            worker.stopAndAwaitConnector(connectorName);
+            onRestart(connectorName);
+        }
+        if (restartTasks) {
+            // Stop the tasks and mark as restarting
+            worker.stopAndAwaitTasks(assignedIdsToRestart);
+            assignedIdsToRestart.forEach(this::onRestart);
+        }
+
+        // Now restart the connector and tasks
+        if (restartConnector) {
+            try {
+                startConnector(connectorName, (error, targetState) -> {
+                    if (error == null) {
+                        log.info("Connector '{}' restart successful", 
connectorName);
+                    } else {
+                        log.error("Connector '{}' restart failed", 
connectorName, error);
+                    }
+                });
+            } catch (Throwable t) {
+                log.error("Connector '{}' restart failed", connectorName, t);
+            }
+        }
+        if (restartTasks) {
+            log.debug("Restarting {} of {} tasks for {}", 
plan.restartTaskCount(), plan.totalTaskCount(), request);
+            plan.taskIdsToRestart().forEach(taskId -> {
+                try {
+                    if (this.startTask(taskId)) {

Review comment:
       same as the connector call above
   ```suggestion
                       if (startTask(taskId)) {
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
 
     }
 
+    @SuppressWarnings("unchecked")
+    RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> 
record, SchemaAndValue value) {
+        String connectorName = record.key().substring(RESTART_PREFIX.length());
+        if (value.value() == null) {
+            log.error("Ignoring restart request because it is unexpectedly 
null");
+            return null;
+        }
+        if (!(value.value() instanceof Map)) {
+            log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+            return null;
+        }

Review comment:
       This misuse might have originated because above, specifically for tasks, 
`null` is actually a valid value (tombstone/delete), which is not the case 
here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to