kkonstantine commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r653815012
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -210,6 +211,15 @@ public void onDeletion(ConnectorTaskId id) { statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation())); } + public void recordRestarting(String connector) { Review comment: Seeing above, seems that this might as well be called `onRestart`. It's only naming but still would be nice to maintain consistency ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback }); } + /** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty if this worker has no status for the connector named in the request and therefore the + * connector cannot be restarted + */ + public Optional<RestartPlan> buildRestartPlanFor(RestartRequest request) { + String connectorName = request.connectorName(); + ConnectorStatus connectorStatus = statusBackingStore.get(connectorName); + if (connectorStatus == null) { + return Optional.empty(); + } + + // If requested, mark the connector as restarting + AbstractStatus.State connectorState; + if (request.shouldRestartConnector(connectorStatus)) { + connectorState = AbstractStatus.State.RESTARTING; + } else { + connectorState = connectorStatus.state(); + } + ConnectorStateInfo.ConnectorState connectorInfoState = new ConnectorStateInfo.ConnectorState( + connectorState.toString(), + connectorStatus.workerId(), + connectorStatus.trace() + ); + + // Collect the task states, If requested, mark the task as restarting + List<ConnectorStateInfo.TaskState> taskStates = statusBackingStore.getAll(connectorName) + .stream() Review comment: I think we can avoid this alignment style. It leaves us with significantly less space to write lambdas, etc. (another indicator is that this style is not applied elsewhere in the file). Two tab stops in the line below should be fine, even if the declaration above is where it is now. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + + private final RestartRequest request; + private final ConnectorStateInfo stateInfo; + private final Collection<ConnectorTaskId> idsToRestart; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ + public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { + this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); + this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); + // Collect the task IDs to stop and restart (may be none) + idsToRestart = Collections.unmodifiableList( + stateInfo.tasks() + .stream() + .filter(this::isRestarting) + .map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())) + .collect(Collectors.toList()) + ); + } + + /** + * Get the connector name. + * + * @return the name of the connector; never null + */ + public String connectorName() { + return request.connectorName(); + } + + /** + * Get the original {@link RestartRequest}. + * + * @return the restart request; never null + */ + public RestartRequest restartRequest() { + return request; + } + + /** + * Get the {@link ConnectorStateInfo} that reflects the current state of the connector <em>except</em> with the {@code status} + * set to {@link AbstractStatus.State#RESTARTING} for the {@link Connector} instance and any {@link Task} instances that + * are to be restarted, based upon the {@link #restartRequest() restart request}. + * + * @return the connector state info that reflects the restart plan; never null + */ + public ConnectorStateInfo restartConnectorStateInfo() { Review comment: Not sure we should say `restart` in the method name. The object is already a `RestartPlan` type. So that's a bit redundant I think. `connectorStateInfo` ? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + + private final RestartRequest request; + private final ConnectorStateInfo stateInfo; + private final Collection<ConnectorTaskId> idsToRestart; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ + public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { + this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); + this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); + // Collect the task IDs to stop and restart (may be none) + idsToRestart = Collections.unmodifiableList( Review comment: nit: in the constructor we refer to member fields with `this.` during initialization (even if they are not shadowed) ```suggestion this.idsToRestart = Collections.unmodifiableList( ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -208,6 +209,21 @@ public static String COMMIT_TASKS_KEY(String connectorName) { .field("creation-timestamp", Schema.INT64_SCHEMA) .build(); + public static final String RESTART_PREFIX = "restart-connector-"; + + public static String RESTART_KEY(String connectorName) { + return RESTART_PREFIX + connectorName; + } + + public static final Boolean ONLY_FAILED_DEFAULT = Boolean.FALSE; + public static final Boolean INCLUDE_TASKS_DEFAULT = Boolean.FALSE; Review comment: any reason not to use primitive types? ```suggestion public static final boolean ONLY_FAILED_DEFAULT = false; public static final boolean INCLUDE_TASKS_DEFAULT = false; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -357,6 +367,62 @@ public void validateConnectorConfig(Map<String, String> connectorProps, Callback }); } + /** + * Build the {@link RestartPlan} that describes what should and should not be restarted given the restart request + * and the current status of the connector and task instances. + * + * @param request the restart request; may not be null + * @return the restart plan, or empty if this worker has no status for the connector named in the request and therefore the + * connector cannot be restarted + */ + public Optional<RestartPlan> buildRestartPlanFor(RestartRequest request) { Review comment: I don't think we have examples in Connect where we refer to an argument in the name of a method. Maybe we don't want to change this just yet with the opportunity of the changes introduced by this feature. Another observation is that we don't use `get`, `set` and possibly `build`. But since it wouldn't be obvious if it's an action or an object maybe leaving `buildRestartPlan` might be fine here. (`restartPlan` would be the alternative) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. Review comment: maybe a good idea to say that this is a plan per connector (and not a global plan). I know the javadoc of the constructor says it already but good to be at the top level too. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounterSnapshot.java ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.integration; + +public class StartAndStopCounterSnapshot { Review comment: Do we have to call it snapshot? Is `StartsAndStops` enough? ```suggestion public class StartAndStopCounterSnapshot { ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + + private final RestartRequest request; + private final ConnectorStateInfo stateInfo; + private final Collection<ConnectorTaskId> idsToRestart; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ + public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { + this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); + this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); + // Collect the task IDs to stop and restart (may be none) + idsToRestart = Collections.unmodifiableList( + stateInfo.tasks() + .stream() + .filter(this::isRestarting) + .map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())) + .collect(Collectors.toList()) + ); + } + + /** + * Get the connector name. + * + * @return the name of the connector; never null + */ + public String connectorName() { + return request.connectorName(); + } + + /** + * Get the original {@link RestartRequest}. + * + * @return the restart request; never null + */ + public RestartRequest restartRequest() { + return request; + } + + /** + * Get the {@link ConnectorStateInfo} that reflects the current state of the connector <em>except</em> with the {@code status} + * set to {@link AbstractStatus.State#RESTARTING} for the {@link Connector} instance and any {@link Task} instances that + * are to be restarted, based upon the {@link #restartRequest() restart request}. + * + * @return the connector state info that reflects the restart plan; never null + */ + public ConnectorStateInfo restartConnectorStateInfo() { + return stateInfo; + } + + /** + * Get the immutable collection of {@link ConnectorTaskId} for all tasks to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return the IDs of the tasks to be restarted; never null but possibly empty + */ + public Collection<ConnectorTaskId> taskIdsToRestart() { + return idsToRestart; + } + + /** + * Determine whether the {@link Connector} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if the {@link Connector} instance is to be restarted, or false otherwise + */ + public boolean restartConnector() { Review comment: since this is not an action but a recommendation I think it'd be better to call `shouldRestartConnector` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + + private final RestartRequest request; + private final ConnectorStateInfo stateInfo; + private final Collection<ConnectorTaskId> idsToRestart; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ + public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { + this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); + this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); + // Collect the task IDs to stop and restart (may be none) + idsToRestart = Collections.unmodifiableList( + stateInfo.tasks() + .stream() + .filter(this::isRestarting) + .map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())) + .collect(Collectors.toList()) + ); + } + + /** + * Get the connector name. + * + * @return the name of the connector; never null + */ + public String connectorName() { + return request.connectorName(); + } + + /** + * Get the original {@link RestartRequest}. + * + * @return the restart request; never null + */ + public RestartRequest restartRequest() { + return request; + } + + /** + * Get the {@link ConnectorStateInfo} that reflects the current state of the connector <em>except</em> with the {@code status} + * set to {@link AbstractStatus.State#RESTARTING} for the {@link Connector} instance and any {@link Task} instances that + * are to be restarted, based upon the {@link #restartRequest() restart request}. + * + * @return the connector state info that reflects the restart plan; never null + */ + public ConnectorStateInfo restartConnectorStateInfo() { + return stateInfo; + } + + /** + * Get the immutable collection of {@link ConnectorTaskId} for all tasks to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return the IDs of the tasks to be restarted; never null but possibly empty + */ + public Collection<ConnectorTaskId> taskIdsToRestart() { + return idsToRestart; + } + + /** + * Determine whether the {@link Connector} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if the {@link Connector} instance is to be restarted, or false otherwise + */ + public boolean restartConnector() { + return isRestarting(stateInfo.connector()); + } + + /** + * Determine whether at least one {@link Task} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if any {@link Task} instances are to be restarted, or false if none are to be restarted + */ + public boolean restartAnyTasks() { Review comment: `shouldRestartTasks` or `shouldRestartAnyTasks` (same recommendation as above) ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlanFor(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Only the leader can process restart requests.", leaderUrl()), null); + } + + return null; + }, + forwardErrorCallback(callback) + ); + } + + /** + * Process all pending restart requests. There can be at most one request per connector. + * + * <p>This method is called from within the {@link #tick()} method. + */ + void processRestartRequests() { + List<RestartRequest> restartRequests; + synchronized (this) { + if (pendingRestartRequests.isEmpty()) { + return; + } + //dequeue into a local list to minimize the work being done within the synchronized block + restartRequests = new ArrayList<>(pendingRestartRequests.values()); + pendingRestartRequests.clear(); + } + for (RestartRequest restartRequest : restartRequests) { + try { + doRestartConnectorAndTasks(restartRequest); + } catch (Exception e) { + log.warn("Unexpected error while trying to process " + restartRequest + ", the restart request will be skipped.", e); + } + } + } + + /** + * Builds and and executes a restart plan for the connector and its tasks from <code>request</code>. Review comment: ```suggestion * Builds and executes a restart plan for the connector and its tasks from <code>request</code>. ``` ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java ########## @@ -72,6 +72,10 @@ public int stops() { return stopCounter.get(); } + public StartAndStopCounterSnapshot countsSnapshot() { Review comment: I found the name a bit overloaded and added a suggestion below. You think we could make it a bit simpler? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1063,6 +1076,132 @@ public int generation() { return generation; } + @Override + public void restartConnectorAndTasks( + RestartRequest request, + Callback<ConnectorStateInfo> callback + ) { + final String connectorName = request.connectorName(); + addRequest( + () -> { + if (checkRebalanceNeeded(callback)) { + return null; + } + if (!configState.connectors().contains(request.connectorName())) { + callback.onCompletion(new NotFoundException("Unknown connector: " + connectorName), null); + return null; + } + if (isLeader()) { + // Write a restart request to the config backing store, to be executed asynchronously in tick() + configBackingStore.putRestartRequest(request); + // Compute and send the response that this was accepted + Optional<RestartPlan> maybePlan = buildRestartPlanFor(request); + if (!maybePlan.isPresent()) { + callback.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null); + } else { + RestartPlan plan = maybePlan.get(); + callback.onCompletion(null, plan.restartConnectorStateInfo()); + } + } else { + callback.onCompletion(new NotLeaderException("Only the leader can process restart requests.", leaderUrl()), null); + } + + return null; + }, + forwardErrorCallback(callback) + ); + } + + /** + * Process all pending restart requests. There can be at most one request per connector. + * + * <p>This method is called from within the {@link #tick()} method. + */ + void processRestartRequests() { + List<RestartRequest> restartRequests; + synchronized (this) { + if (pendingRestartRequests.isEmpty()) { + return; + } + //dequeue into a local list to minimize the work being done within the synchronized block + restartRequests = new ArrayList<>(pendingRestartRequests.values()); + pendingRestartRequests.clear(); + } + for (RestartRequest restartRequest : restartRequests) { + try { + doRestartConnectorAndTasks(restartRequest); + } catch (Exception e) { + log.warn("Unexpected error while trying to process " + restartRequest + ", the restart request will be skipped.", e); + } + } + } + + /** + * Builds and and executes a restart plan for the connector and its tasks from <code>request</code>. + * Execution of a plan involves triggering the stop of eligible connector/tasks and then queuing the start for eligible connector/tasks. + * + * @param request the request to restart connector and tasks + */ + protected synchronized void doRestartConnectorAndTasks(RestartRequest request) { + final String connectorName = request.connectorName(); Review comment: use of `final` doesn't seem to be consistent in this method. I'd suggest skipping its addition to local variables unless we need it in lambdas. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/RestartPlan.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.ConnectorTaskId; + +/** + * An immutable restart plan. + */ +public class RestartPlan { + + private final RestartRequest request; + private final ConnectorStateInfo stateInfo; + private final Collection<ConnectorTaskId> idsToRestart; + + /** + * Create a new request to restart a connector and optionally its tasks. + * + * @param request the restart request; may not be null + * @param restartStateInfo the current state info for the connector; may not be null + */ + public RestartPlan(RestartRequest request, ConnectorStateInfo restartStateInfo) { + this.request = Objects.requireNonNull(request, "RestartRequest name may not be null"); + this.stateInfo = Objects.requireNonNull(restartStateInfo, "ConnectorStateInfo name may not be null"); + // Collect the task IDs to stop and restart (may be none) + idsToRestart = Collections.unmodifiableList( + stateInfo.tasks() + .stream() + .filter(this::isRestarting) + .map(taskState -> new ConnectorTaskId(request.connectorName(), taskState.id())) + .collect(Collectors.toList()) + ); + } + + /** + * Get the connector name. + * + * @return the name of the connector; never null + */ + public String connectorName() { + return request.connectorName(); + } + + /** + * Get the original {@link RestartRequest}. + * + * @return the restart request; never null + */ + public RestartRequest restartRequest() { + return request; + } + + /** + * Get the {@link ConnectorStateInfo} that reflects the current state of the connector <em>except</em> with the {@code status} + * set to {@link AbstractStatus.State#RESTARTING} for the {@link Connector} instance and any {@link Task} instances that + * are to be restarted, based upon the {@link #restartRequest() restart request}. + * + * @return the connector state info that reflects the restart plan; never null + */ + public ConnectorStateInfo restartConnectorStateInfo() { + return stateInfo; + } + + /** + * Get the immutable collection of {@link ConnectorTaskId} for all tasks to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return the IDs of the tasks to be restarted; never null but possibly empty + */ + public Collection<ConnectorTaskId> taskIdsToRestart() { + return idsToRestart; + } + + /** + * Determine whether the {@link Connector} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if the {@link Connector} instance is to be restarted, or false otherwise + */ + public boolean restartConnector() { + return isRestarting(stateInfo.connector()); + } + + /** + * Determine whether at least one {@link Task} instance is to be restarted + * based upon the {@link #restartRequest() restart request}. + * + * @return true if any {@link Task} instances are to be restarted, or false if none are to be restarted + */ + public boolean restartAnyTasks() { Review comment: below we have a method name called `restartConnectorAndTasks` implying _any_ tasks. So maybe we can skip `Any` in the name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org