kkonstantine commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660275673
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback }); } + /** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty if this worker has no status for the connector named in the request and therefore the + * connector cannot be restarted + */ + public Optional<RestartPlan> buildRestartPlan(RestartRequest request) { + String connectorName = request.connectorName(); + ConnectorStatus connectorStatus = statusBackingStore.get(connectorName); + if (connectorStatus == null) { + return Optional.empty(); + } + + // If requested, mark the connector as restarting + AbstractStatus.State connectorState; + if (request.shouldRestartConnector(connectorStatus)) { + connectorState = AbstractStatus.State.RESTARTING; + } else { + connectorState = connectorStatus.state(); + } Review comment: nit: the ternary operator can be used (`?:`) as below, unless you're not a fan. ```suggestion AbstractStatus.State connectorState = request.shouldRestartConnector(connectorStatus) ? AbstractStatus.State.RESTARTING : connectorStatus.state(); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { Review comment: nit: similar style as elsewhere in this file ```suggestion public void restartConnectorAndTasks(RestartRequest request, Callback<ConnectorStateInfo> callback) { ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Objects; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; + +/** + * A request to restart a connector and/or task instances. + * <p>The natural order is based first upon the connector name and then requested restart behaviors. + * If two requests have the same connector name, then the requests are ordered based on the + * probable number of tasks/connector this request is going to restart. + */ +public class RestartRequest implements Comparable<RestartRequest> { + + private final String connectorName; + private final boolean onlyFailed; + private final boolean includeTasks; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param connectorName the name of the connector; may not be null + * @param onlyFailed true if only failed instances should be restarted + * @param includeTasks true if tasks should be restarted, or false if only the connector should be restarted + */ + public RestartRequest(String connectorName, boolean onlyFailed, boolean includeTasks) { + this.connectorName = Objects.requireNonNull(connectorName, "Connector name may not be null"); + this.onlyFailed = onlyFailed; + this.includeTasks = includeTasks; + } + + /** + * Get the name of the connector. + * + * @return the connector name; never null + */ + public String connectorName() { + return connectorName; + } + + /** + * Determine whether only failed instances be restarted. + * + * @return true if only failed instances should be restarted, or false if all applicable instances should be restarted + */ + public boolean onlyFailed() { + return onlyFailed; + } + + /** + * Determine whether {@link Task} instances should also be restarted in addition to the {@link Connector} instance. + * + * @return true if the connector and task instances should be restarted, or false if just the connector should be restarted + */ + public boolean includeTasks() { + return includeTasks; + } + + /** + * Determine whether the connector with the given status is to be restarted. + * + * @param status the connector status; may not be null + * @return true if the connector is to be restarted, or false otherwise + */ + public boolean shouldRestartConnector(ConnectorStatus status) { + return !onlyFailed || status.state() == AbstractStatus.State.FAILED; + } + + /** + * Determine whether only the {@link Connector} instance is to be restarted even if not failed. + * + * @return true if only the {@link Connector} instance is to be restarted even if not failed, or false otherwise + */ + public boolean forciblyRestartConnectorOnly() { + return !onlyFailed() && !includeTasks(); + } + + /** + * Determine whether the task instance with the given status is to be restarted. + * + * @param status the task status; may not be null + * @return true if the task is to be restarted, or false otherwise + */ + public boolean shouldRestartTask(TaskStatus status) { + return includeTasks && (!onlyFailed || status.state() == AbstractStatus.State.FAILED); + } + + @Override + public int compareTo(RestartRequest o) { + int result = connectorName.compareTo(o.connectorName); + if (result == 0) { + result = impactRank() - o.impactRank(); + } + return result; Review comment: ```suggestion return result == 0 ? impactRank() - o.impactRank() : result; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -208,6 +209,21 @@ public static String COMMIT_TASKS_KEY(String connectorName) { .field("creation-timestamp", Schema.INT64_SCHEMA) .build(); + public static final String RESTART_PREFIX = "restart-connector-"; + + public static String RESTART_KEY(String connectorName) { + return RESTART_PREFIX + connectorName; + } + + public static final Boolean ONLY_FAILED_DEFAULT = Boolean.FALSE; + public static final Boolean INCLUDE_TASKS_DEFAULT = Boolean.FALSE; Review comment: I agree the conversion is on a frequently used path. But maybe it's the code below that can be re-written to avoid both autoboxing and unboxing (when it's not required) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + + private final RestartRequest request; + private final ConnectorStateInfo stateInfo; + private final Collection<ConnectorTaskId> idsToRestart; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ + public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { + this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); + this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); + // Collect the task IDs to stop and restart (may be none) + idsToRestart = Collections.unmodifiableList( + stateInfo.tasks() + .stream() + .filter(this::isRestarting) + .map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())) + .collect(Collectors.toList()) + ); + } + + /** + * Get the connector name. + * + * @return the name of the connector; never null + */ + public String connectorName() { + return request.connectorName(); + } + + /** + * Get the original {@link RestartRequest}. + * + * @return the restart request; never null + */ + public RestartRequest restartRequest() { + return request; + } + + /** + * Get the {@link ConnectorStateInfo} that reflects the current state of the connector <em>except</em> with the {@code status} + * set to {@link AbstractStatus.State#RESTARTING} for the {@link Connector} instance and any {@link Task} instances that + * are to be restarted, based upon the {@link #restartRequest() restart request}. + * + * @return the connector state info that reflects the restart plan; never null + */ + public ConnectorStateInfo restartConnectorStateInfo() { Review comment: No problem. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) } + @SuppressWarnings("unchecked") + RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) { + String connectorName = record.key().substring(RESTART_PREFIX.length()); + if (value.value() == null) { + log.error("Ignoring restart request because it is unexpectedly null"); + return null; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); + return null; + } + + Map<String, Object> valueAsMap = (Map<String, Object>) value.value(); + + Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME); + if (failed == null) { + log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT); + failed = ONLY_FAILED_DEFAULT; + } + if (!(failed instanceof Boolean)) { + log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", ONLY_FAILED_FIELD_NAME, failed.getClass(), ONLY_FAILED_DEFAULT); + failed = ONLY_FAILED_DEFAULT; + } + boolean onlyFailed = (Boolean) failed; + + Object withTasks = valueAsMap.get(INCLUDE_TASKS_FIELD_NAME); + if (withTasks == null) { + log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", INCLUDE_TASKS_FIELD_NAME, INCLUDE_TASKS_DEFAULT); + withTasks = INCLUDE_TASKS_DEFAULT; + } + if (!(withTasks instanceof Boolean)) { + log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", INCLUDE_TASKS_FIELD_NAME, withTasks.getClass(), INCLUDE_TASKS_DEFAULT); + withTasks = INCLUDE_TASKS_DEFAULT; + } + boolean includeTasks = (Boolean) withTasks; Review comment: Similar suggestion as above ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) } + @SuppressWarnings("unchecked") + RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) { + String connectorName = record.key().substring(RESTART_PREFIX.length()); + if (value.value() == null) { + log.error("Ignoring restart request because it is unexpectedly null"); + return null; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); + return null; + } + + Map<String, Object> valueAsMap = (Map<String, Object>) value.value(); + + Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME); + if (failed == null) { + log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT); + failed = ONLY_FAILED_DEFAULT; + } + if (!(failed instanceof Boolean)) { + log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", ONLY_FAILED_FIELD_NAME, failed.getClass(), ONLY_FAILED_DEFAULT); + failed = ONLY_FAILED_DEFAULT; + } + boolean onlyFailed = (Boolean) failed; Review comment: Following up on the comment above, here's how we could write this and avoid autoboxing/unboxing while using the more readable primitives. ```suggestion if (!(failed instanceof Boolean)) { log.warn("Invalid data for restart request '{}' field should be a Boolean but is {}, defaulting to {}", ONLY_FAILED_FIELD_NAME, failed == null ? "null" : failed.getClass(), ONLY_FAILED_DEFAULT); onlyFailed = ONLY_FAILED_DEFAULT; } else { onlyFailed = (Boolean) failed; } ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartRequest.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Objects; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; + +/** + * A request to restart a connector and/or task instances. + * <p>The natural order is based first upon the connector name and then requested restart behaviors. + * If two requests have the same connector name, then the requests are ordered based on the + * probable number of tasks/connector this request is going to restart. + */ +public class RestartRequest implements Comparable<RestartRequest> { + + private final String connectorName; + private final boolean onlyFailed; + private final boolean includeTasks; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param connectorName the name of the connector; may not be null + * @param onlyFailed true if only failed instances should be restarted + * @param includeTasks true if tasks should be restarted, or false if only the connector should be restarted + */ + public RestartRequest(String connectorName, boolean onlyFailed, boolean includeTasks) { + this.connectorName = Objects.requireNonNull(connectorName, "Connector name may not be null"); + this.onlyFailed = onlyFailed; + this.includeTasks = includeTasks; + } + + /** + * Get the name of the connector. + * + * @return the connector name; never null + */ + public String connectorName() { + return connectorName; + } + + /** + * Determine whether only failed instances be restarted. + * + * @return true if only failed instances should be restarted, or false if all applicable instances should be restarted + */ + public boolean onlyFailed() { + return onlyFailed; + } + + /** + * Determine whether {@link Task} instances should also be restarted in addition to the {@link Connector} instance. + * + * @return true if the connector and task instances should be restarted, or false if just the connector should be restarted + */ + public boolean includeTasks() { + return includeTasks; + } + + /** + * Determine whether the connector with the given status is to be restarted. + * + * @param status the connector status; may not be null + * @return true if the connector is to be restarted, or false otherwise + */ + public boolean shouldRestartConnector(ConnectorStatus status) { + return !onlyFailed || status.state() == AbstractStatus.State.FAILED; + } + + /** + * Determine whether only the {@link Connector} instance is to be restarted even if not failed. + * + * @return true if only the {@link Connector} instance is to be restarted even if not failed, or false otherwise + */ + public boolean forciblyRestartConnectorOnly() { Review comment: ```suggestion public boolean forceRestartConnectorOnly() { ``` ? (seems a bit more common way to say it, but I don't have a strong opinion) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan per connector. + */ +public class RestartPlan { + + private final RestartRequest request; + private final ConnectorStateInfo stateInfo; + private final Collection<ConnectorTaskId> idsToRestart; + + /** + * Create a new plan to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ + public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { + this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); + this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); + // Collect the task IDs to stop and restart (may be none) + this.idsToRestart = Collections.unmodifiableList( + stateInfo.tasks() + .stream() + .filter(this::isRestarting) + .map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())) + .collect(Collectors.toList()) + ); + } + + /** + * Get the connector name. + * + * @return the name of the connector; never null + */ + public String connectorName() { + return request.connectorName(); + } + + /** + * Get the original {@link RestartRequest}. + * + * @return the restart request; never null + */ + public RestartRequest restartRequest() { + return request; + } + + /** + * Get the {@link ConnectorStateInfo} that reflects the current state of the connector <em>except</em> with the {@code status} + * set to {@link AbstractStatus.State#RESTARTING} for the {@link Connector} instance and any {@link Task} instances that + * are to be restarted, based upon the {@link #restartRequest() restart request}. + * + * @return the connector state info that reflects the restart plan; never null + */ + public ConnectorStateInfo restartConnectorStateInfo() { + return stateInfo; + } + + /** + * Get the immutable collection of {@link ConnectorTaskId} for all tasks to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return the IDs of the tasks to be restarted; never null but possibly empty + */ + public Collection<ConnectorTaskId> taskIdsToRestart() { + return idsToRestart; + } + + /** + * Determine whether the {@link Connector} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if the {@link Connector} instance is to be restarted, or false otherwise + */ + public boolean shouldRestartConnector() { + return isRestarting(stateInfo.connector()); + } + + /** + * Determine whether at least one {@link Task} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if any {@link Task} instances are to be restarted, or false if none are to be restarted + */ + public boolean shouldRestartTasks() { + return !taskIdsToRestart().isEmpty(); + } + + /** + * Get the number of connector tasks that are to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return the number of {@link Task} instance is to be restarted + */ + public int restartTaskCount() { + return taskIdsToRestart().size(); + } + + /** + * Get the total number of tasks in the connector. + * + * @return the total number of tasks + */ + public int totalTaskCount() { + return stateInfo.tasks().size(); + } + + private boolean isRestarting(ConnectorStateInfo.AbstractState state) { + return isRestarting(state.state()); + } + + private boolean isRestarting(String state) { + return AbstractStatus.State.RESTARTING.toString().equalsIgnoreCase(state); + } + + @Override + public String toString() { + if (shouldRestartConnector()) { + return String.format( + "plan to restart connector and %d of %d tasks for %s", restartTaskCount(), totalTaskCount(), request + ); + } + return String.format( + "plan to restart %d of %d tasks for %s", restartTaskCount(), totalTaskCount(), request + ); + } Review comment: something like? ```suggestion return shouldRestartConnector() ? String.format("plan to restart connector and %d of %d tasks for %s", restartTaskCount(), totalTaskCount(), request) : String.format("plan to restart %d of %d tasks for %s", restartTaskCount(), totalTaskCount(), request); ``` breaking this long statement doesn't make it much less long I guess. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -717,6 +750,15 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) if (started) updateListener.onTaskConfigUpdate(updatedTasks); + } else if (record.key().startsWith(RESTART_PREFIX)) { + RestartRequest request = recordToRestartRequest(record, value); + if (request == null) { + return; + } + // Only notify the listener if this backing store is already successfully started (having caught up the first time) + if (started) { Review comment: Since we print the warning inside `recordToRestartRequest` we can probably avoid the early return with: ```suggestion // Only notify the listener if this backing store is already successfully started (having caught up the first time) if (request != null && started) { ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) } + @SuppressWarnings("unchecked") + RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) { + String connectorName = record.key().substring(RESTART_PREFIX.length()); + if (value.value() == null) { + log.error("Ignoring restart request because it is unexpectedly null"); + return null; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); + return null; + } Review comment: I see we've been verbose in similar logic above before, but maybe we can improve on that a bit, at least in new code. Here's another suggestion: ```suggestion if (!(value.value() instanceof Map)) { log.error("Ignoring restart request because the value is not a Map but is {}", value.value() == null ? "null" : value.value().getClass()); return null; } ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlan(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Only the leader can process restart requests.", leaderUrl()), null); + } + + return null; + }, + forwardErrorCallback(callback) + ); + } + + /** + * Process all pending restart requests. There can be at most one request per connector. + * + * <p>This method is called from within the {@link #tick()} method. + */ + void processRestartRequests() { + List<RestartRequest> restartRequests; + synchronized (this) { + if (pendingRestartRequests.isEmpty()) { + return; + } + //dequeue into a local list to minimize the work being done within the synchronized block + restartRequests = new ArrayList<>(pendingRestartRequests.values()); + pendingRestartRequests.clear(); + } + for (RestartRequest restartRequest : restartRequests) { Review comment: nit: type inference is nice ... ```suggestion restartRequests.forEach(restartRequest -> { ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java ########## @@ -334,6 +353,15 @@ public StartAndStopLatch expectedStops(int expectedStops, boolean includeTasks) return startAndStopCounter.expectedStops(expectedStops, taskLatches); } + public StartAndStopLatch expectedStops(int expectedStops, Map<String, Integer> expectedTasksStops, boolean includeTasks) { + List<StartAndStopLatch> taskLatches = null; Review comment: see comment above on whether `null` matters. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback }); } + /** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty if this worker has no status for the connector named in the request and therefore the + * connector cannot be restarted + */ + public Optional<RestartPlan> buildRestartPlan(RestartRequest request) { + String connectorName = request.connectorName(); + ConnectorStatus connectorStatus = statusBackingStore.get(connectorName); + if (connectorStatus == null) { + return Optional.empty(); + } + + // If requested, mark the connector as restarting + AbstractStatus.State connectorState; + if (request.shouldRestartConnector(connectorStatus)) { + connectorState = AbstractStatus.State.RESTARTING; + } else { + connectorState = connectorStatus.state(); + } + ConnectorStateInfo.ConnectorState connectorInfoState = new ConnectorStateInfo.ConnectorState( + connectorState.toString(), + connectorStatus.workerId(), + connectorStatus.trace() + ); + + // Collect the task states, If requested, mark the task as restarting + List<ConnectorStateInfo.TaskState> taskStates = statusBackingStore.getAll(connectorName) + .stream() + .map(taskStatus -> { + AbstractStatus.State state = taskStatus.state(); + if (request.shouldRestartTask(taskStatus)) { + state = AbstractStatus.State.RESTARTING; + } Review comment: A bit confusing that a second assignment follows if the `if` statement is true. I'd also call the variable `taskState` (as opposed to `connectorState` above) Ternary can be used here as well: ```suggestion AbstractStatus.State state = request.shouldRestartTask(taskStatus) ? AbstractStatus.State.RESTARTING : taskStatus.state(); ``` (as with any suggestion from github, please check it compiles and conforms to the style) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) } + @SuppressWarnings("unchecked") + RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) { + String connectorName = record.key().substring(RESTART_PREFIX.length()); + if (value.value() == null) { + log.error("Ignoring restart request because it is unexpectedly null"); + return null; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); + return null; + } + + Map<String, Object> valueAsMap = (Map<String, Object>) value.value(); + + Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME); + if (failed == null) { + log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT); Review comment: btw, all other log statements are `log.error` elsewhere. Should we remain consistent with that, instead of using `log.warn` just here? The issues seem similar above. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlan(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); Review comment: safe to skip the declaration here. ```suggestion callback.onCompletion(null, plan.get().restartConnectorStateInfo()); ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java ########## @@ -280,6 +289,16 @@ public StartAndStopLatch expectedStarts(int expectedStarts, boolean includeTasks return startAndStopCounter.expectedStarts(expectedStarts, taskLatches); } + public StartAndStopLatch expectedStarts(int expectedStarts, Map<String, Integer> expectedTasksStarts, boolean includeTasks) { + List<StartAndStopLatch> taskLatches = null; + if (includeTasks) { + taskLatches = taskHandles.values().stream() + .map(task -> task.expectedStarts(expectedTasksStarts.get(task.taskId()))) + .collect(Collectors.toList()); + } + return startAndStopCounter.expectedStarts(expectedStarts, taskLatches); Review comment: dependents in the implementation of `StartAndStopLatch` seems to overload the meaning of `null` with a check. But it actually doesn't seem to matter. The call is equivalent to passing an empty list. Should we simplify that with something like: ``` List<StartAndStopLatch> taskLatches = includeTasks ? taskHandles.values().stream() .map(task -> task.expectedStarts(expectedStarts)) .collect(Collectors.toList()) : Collections.emptyList(); ``` ? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlan(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Only the leader can process restart requests.", leaderUrl()), null); + } + Review comment: nit: extra ```suggestion ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java ########## @@ -428,6 +429,43 @@ public void restartConnector(String connName) { } } + /** + * Restart an existing connector and its tasks. + * + * @param connName name of the connector to be restarted + * @param onlyFailed true if only failed instances should be restarted + * @param includeTasks true if tasks should be restarted, or false if only the connector should be restarted + * @param onlyCallOnEmptyWorker true if the REST API call should be called on a worker not running this connector or its tasks + * @throws ConnectRestException if the REST API returns error status + * @throws ConnectException for any other error. + */ + public ConnectorStateInfo restartConnectorAndTasks(String connName, boolean onlyFailed, boolean includeTasks, boolean onlyCallOnEmptyWorker) { + ObjectMapper mapper = new ObjectMapper(); + String restartPath = String.format("connectors/%s/restart?onlyFailed=" + onlyFailed + "&includeTasks=" + includeTasks, connName); + String restartEndpoint; + if (onlyCallOnEmptyWorker) { + restartEndpoint = endpointForResourceNotRunningConnector(restartPath, connName); + } else { + restartEndpoint = endpointForResource(restartPath); + } + Response response = requestPost(restartEndpoint, "", Collections.emptyMap()); + try { + if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { + //only the 202 stauts returns a body + if (response.getStatus() == Response.Status.ACCEPTED.getStatusCode()) { + return mapper.readerFor(ConnectorStateInfo.class) + .readValue(responseToString(response)); + } else { + return null; Review comment: does it make sense to skip, given that it's called below? (I hope I'm not missing something) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlan(request); Review comment: I'd call it `plan` here. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlan(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Only the leader can process restart requests.", leaderUrl()), null); + } + + return null; + }, + forwardErrorCallback(callback) + ); + } + + /** + * Process all pending restart requests. There can be at most one request per connector. + * + * <p>This method is called from within the {@link #tick()} method. + */ + void processRestartRequests() { + List<RestartRequest> restartRequests; + synchronized (this) { + if (pendingRestartRequests.isEmpty()) { + return; + } + //dequeue into a local list to minimize the work being done within the synchronized block + restartRequests = new ArrayList<>(pendingRestartRequests.values()); + pendingRestartRequests.clear(); + } + for (RestartRequest restartRequest : restartRequests) { + try { + doRestartConnectorAndTasks(restartRequest); + } catch (Exception e) { + log.warn("Unexpected error while trying to process " + restartRequest + ", the restart request will be skipped.", e); + } + } + } + + /** + * Builds and executes a restart plan for the connector and its tasks from <code>request</code>. + * Execution of a plan involves triggering the stop of eligible connector/tasks and then queuing the start for eligible connector/tasks. + * + * @param request the request to restart connector and tasks + */ + protected synchronized void doRestartConnectorAndTasks(RestartRequest request) { + String connectorName = request.connectorName(); + Optional<RestartPlan> maybePlan = buildRestartPlan(request); + if (!maybePlan.isPresent()) { + log.debug("Skipping restart of connector '{}' since no status is available: {}", connectorName, request); + return; + } + RestartPlan plan = maybePlan.get(); + log.info("Executing {}", plan); + + // If requested, stop the connector and any tasks, marking each as restarting + final ExtendedAssignment currentAssignments = assignment; + final Collection<ConnectorTaskId> assignedIdsToRestart = plan.taskIdsToRestart() + .stream() + .filter(taskId -> currentAssignments.tasks().contains(taskId)) + .collect(Collectors.toList()); + final boolean restartConnector = plan.shouldRestartConnector() && currentAssignments.connectors().contains(connectorName); + final boolean restartTasks = !assignedIdsToRestart.isEmpty(); + if (restartConnector) { + worker.stopAndAwaitConnector(connectorName); + onRestart(connectorName); + } + if (restartTasks) { + // Stop the tasks and mark as restarting + worker.stopAndAwaitTasks(assignedIdsToRestart); + assignedIdsToRestart.forEach(this::onRestart); + } + + // Now restart the connector and tasks + if (restartConnector) { + try { + startConnector(connectorName, (error, targetState) -> { + if (error == null) { + log.info("Connector '{}' restart successful", connectorName); + } else { + log.error("Connector '{}' restart failed", connectorName, error); + } + }); + } catch (Throwable t) { + log.error("Connector '{}' restart failed", connectorName, t); + } + } + if (restartTasks) { + log.debug("Restarting {} of {} tasks for {}", plan.restartTaskCount(), plan.totalTaskCount(), request); + plan.taskIdsToRestart().forEach(taskId -> { + try { + if (this.startTask(taskId)) { Review comment: same as the connector call above ```suggestion if (startTask(taskId)) { ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) } + @SuppressWarnings("unchecked") + RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) { + String connectorName = record.key().substring(RESTART_PREFIX.length()); + if (value.value() == null) { + log.error("Ignoring restart request because it is unexpectedly null"); + return null; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); + return null; + } Review comment: This misuse might have originated because above, specifically for tasks, `null` is actually a valid value (tombstone/delete), which is not the case here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org