kpatelatwork commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r653819933



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test connectors restart API use cases.
+ */
+@Category(IntegrationTest.class)
+public class ConnectorRestartApiIntegrationTest {

Review comment:
       I went with a different sleep duration and that cut the times by 1minute 
15 sec 
   
![image](https://user-images.githubusercontent.com/29556518/122449965-132bfe80-cf6c-11eb-8658-e2a389163455.png)
   
   
https://github.com/apache/kafka/pull/10822/commits/b60fb53a139dbd44c247ac1a4e2c927ef42ddc99.
   I chose 5sec because locally most the test that sets up the cluster takes 
around 5 sec to do it and  most test after they setup the cluster finishes in 
<300ms so request propagation and restart isnt taking long but thats my local 
machine so I am doing 10 times the wait in case cloud or other developer 
machines are slow.  @rhauch Could you please review to see if you agree with 
the fix as another idea is to reduce it to 1 sec?

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test connectors restart API use cases.
+ */
+@Category(IntegrationTest.class)
+public class ConnectorRestartApiIntegrationTest {

Review comment:
       I went with a different sleep duration and that cut the times by 1minute 
15 sec 
   
![image](https://user-images.githubusercontent.com/29556518/122449965-132bfe80-cf6c-11eb-8658-e2a389163455.png)
   
   
https://github.com/apache/kafka/pull/10822/commits/b60fb53a139dbd44c247ac1a4e2c927ef42ddc99.
   I chose 5sec because locally the test that sets up the cluster takes around 
5 sec to do it and  most test after they setup the cluster finishes in <300ms, 
request propagation and restart isn't taking long but thats my local machine 
and I chose to do 15 times the wait in case cloud or other developer machines 
are slow.  @rhauch Could you please review to see if you agree with the fix as 
another idea is to reduce it to 1 sec?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1592,6 +1711,16 @@ public void onSessionKeyUpdate(SessionKey sessionKey) {
                 }
             }
         }
+
+        @Override
+        public void onRestartRequest(RestartRequest request) {
+            log.info("Received and enqueuing {}", request);
+
+            synchronized (DistributedHerder.this) {
+                pendingRestartRequests.put(request.connectorName(), request);

Review comment:
       @rhauch  I agree and implemented it, Could you please review the below 
commit and see if it suffices? 
https://github.com/apache/kafka/pull/10822/commits/cb5afec3af37d60abae988c58e8180440ac97a58

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##########
@@ -297,6 +301,55 @@ public synchronized HerderRequest restartConnector(long 
delayMs, final String co
         return new StandaloneHerderRequest(requestSeqNum.incrementAndGet(), 
future);
     }
 
+    @Override
+    public synchronized void restartConnectorAndTasks(RestartRequest request, 
Callback<ConnectorStateInfo> cb) {
+        // Ensure the connector exists
+        String connectorName = request.connectorName();
+        if (!configState.contains(connectorName)) {
+            cb.onCompletion(new NotFoundException("Unknown connector: " + 
connectorName, null), null);
+            return;
+        }
+
+        Optional<RestartPlan> maybePlan = buildRestartPlanFor(request);
+        if (!maybePlan.isPresent()) {
+            cb.onCompletion(new NotFoundException("Status for connector " + 
connectorName + " not found", null), null);
+            return;
+        }
+        RestartPlan plan = maybePlan.get();
+
+        // If requested, stop the connector and any tasks, marking each as 
restarting
+        log.info("Received {}", plan);
+        if (plan.restartConnector()) {
+            worker.stopAndAwaitConnector(connectorName);
+            recordRestarting(connectorName);
+        }
+        if (plan.restartAnyTasks()) {
+            // Stop the tasks and mark as restarting
+            worker.stopAndAwaitTasks(plan.taskIdsToRestart());
+            plan.taskIdsToRestart().forEach(this::recordRestarting);
+        }
+
+        // Now restart the connector and tasks
+        if (plan.restartConnector()) {
+            log.debug("Restarting connector '{}'", connectorName);
+            startConnector(connectorName, (error, targetState) -> {
+                if (error == null) {
+                    log.debug("Connector '{}' successfully restarted", 
connectorName);
+                } else {
+                    log.debug("Connector '{}' failed to restart", 
connectorName, error);

Review comment:
       +1 I just fixed them in both classes

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1063,6 +1076,112 @@ public int generation() {
         return generation;
     }
 
+    @Override
+    public void restartConnectorAndTasks(
+            RestartRequest request,
+            Callback<ConnectorStateInfo> callback
+    ) {
+        final String connectorName = request.connectorName();
+        addRequest(
+                () -> {
+                    if (checkRebalanceNeeded(callback)) {
+                        return null;
+                    }
+                    if 
(!configState.connectors().contains(request.connectorName())) {
+                        callback.onCompletion(new NotFoundException("Unknown 
connector: " + connectorName), null);
+                        return null;
+                    }
+                    if (isLeader()) {
+                        // Write a restart request to the config backing 
store, to be executed asynchronously in tick()
+                        configBackingStore.putRestartRequest(request);
+                        // Compute and send the response that this was accepted
+                        Optional<RestartPlan> maybePlan = 
buildRestartPlanFor(request);
+                        if (!maybePlan.isPresent()) {
+                            callback.onCompletion(new 
NotFoundException("Status for connector " + connectorName + " not found", 
null), null);
+                        } else {
+                            RestartPlan plan = maybePlan.get();
+                            callback.onCompletion(null, 
plan.restartConnectorStateInfo());
+                        }
+                    } else {
+                        callback.onCompletion(new NotLeaderException("Only the 
leader can process restart requests.", leaderUrl()), null);
+                    }
+
+                    return null;
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    /**
+     * Process all pending restart requests. There can be at most one request 
per connector.
+     *
+     * <p>This method is called from within the {@link #tick()} method.
+     */
+    void processRestartRequests() {
+        List<RestartRequest> restartRequests;
+        synchronized (this) {
+            if (pendingRestartRequests.isEmpty()) {
+                return;
+            }
+            //dequeue into a local list to minimize the work being done within 
the synchronized block
+            restartRequests = new ArrayList<>(pendingRestartRequests.values());
+            pendingRestartRequests.clear();
+        }
+        for (RestartRequest restartRequest : restartRequests) {
+            try {
+                doRestartConnectorAndTasks(restartRequest);
+            } catch (Exception e) {
+                log.warn("Unexpected error while trying to process " + 
restartRequest + ", the restart request will be skipped.", e);
+            }
+        }
+    }
+
+    protected synchronized void doRestartConnectorAndTasks(RestartRequest 
request) {
+        final String connectorName = request.connectorName();
+        Optional<RestartPlan> maybePlan = buildRestartPlanFor(request);
+        if (!maybePlan.isPresent()) {
+            log.debug("Skipping restart of connector '{}' since no status is 
available: {}", connectorName, request);
+            return;
+        }
+        RestartPlan plan = maybePlan.get();
+        log.info("Executing {}", plan);
+
+        // If requested, stop the connector and any tasks, marking each as 
restarting
+        final ExtendedAssignment currentAssignments = assignment;
+        final Collection<ConnectorTaskId> assignedIdsToRestart = 
plan.taskIdsToRestart()
+                                                                     .stream()
+                                                                     
.filter(taskId -> currentAssignments.tasks().contains(taskId))
+                                                                     
.collect(Collectors.toList());
+        final boolean restartConnector = plan.restartConnector() && 
currentAssignments.connectors().contains(connectorName);
+        final boolean restartTasks = !assignedIdsToRestart.isEmpty();
+        if (restartConnector) {
+            worker.stopAndAwaitConnector(connectorName);
+            recordRestarting(connectorName);
+        }
+        if (restartTasks) {
+            // Stop the tasks and mark as restarting
+            worker.stopAndAwaitTasks(assignedIdsToRestart);
+            assignedIdsToRestart.forEach(this::recordRestarting);
+        }
+
+        // Now restart the connector and tasks
+        if (restartConnector) {
+            startConnector(connectorName, (error, targetState) -> {
+                if (error == null) {
+                    log.info("Connector '{}' successfully restarted", 
connectorName);
+                } else {
+                    log.error("Failed to restart connector '" + connectorName 
+ "'", error);
+                }
+            });
+        }
+        if (restartTasks) {
+            log.debug("Restarting {} of {} tasks for {}", 
plan.restartTaskCount(), plan.totalTaskCount(), request);
+            plan.taskIdsToRestart().forEach(this::startTask);
+            log.debug("Restarted {} of {} tasks for {} as requested", 
plan.restartTaskCount(), plan.totalTaskCount(), request);
+        }

Review comment:
       @rhauch I did confirm that both stop and await for task and connector 
does connector.cancel() and task.cancel() if it timed out. I added try/catch in 
start connector/tasks 
https://github.com/apache/kafka/pull/10822/commits/5b5debe2d42a8d67e08cdb4998ef267fb87f9385
   
   However, I am not sure about re-enqueue as it can lead to infinite tries, 
and adding retries may complicate things so need some guidance. Could you 
please review to see if it looks a bit better now?
   
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -717,6 +746,47 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
 
                 if (started)
                     updateListener.onTaskConfigUpdate(updatedTasks);
+            } else if (record.key().startsWith(RESTART_PREFIX)) {
+                String connectorName = 
record.key().substring(RESTART_PREFIX.length());
+                if (value.value() == null) {
+                    log.error("Ignoring restart request because it is 
unexpectedly null");
+                    return;
+                }
+                if (!(value.value() instanceof Map)) {
+                    log.error("Ignoring restart request because the value is 
not a Map but is {}", value.value().getClass());
+                    return;
+                }
+
+                Map<String, Object> valueAsMap = (Map<String, Object>) 
value.value();
+
+                Object failed = valueAsMap.get("only-failed");
+                if (failed == null) {
+                    log.error("Invalid data for restart request 'only-failed' 
field was missing");
+                    return;
+                }
+                if (!(failed instanceof Boolean)) {
+                    log.error("Invalid data for restart request 'only-failed' 
field should be a Boolean but is {}", failed.getClass());
+                    return;
+                }
+                boolean onlyFailed = (Boolean) failed;
+
+                Object withTasks = valueAsMap.get("include-tasks");
+                if (withTasks == null) {
+                    log.error("Invalid data for restart request 
'include-tasks' field was missing");
+                    return;
+                }
+                if (!(withTasks instanceof Boolean)) {
+                    log.error("Invalid data for restart request 
'include-tasks' field should be a Boolean but is {}", withTasks.getClass());
+                    return;
+                }
+                boolean includeTasks = (Boolean) withTasks;

Review comment:
       Good idea @rhauch, I fixed it. I added a default constant in 
KafkaConfigBackingStore class, the only thing I was unsure of if I should use 
the same default constant in ConnectorsResource class API or not. They seem to 
be 2 different world and I didn't wanted to introduce a dependency between the 
classes as it would expose unwanted details so right now default=false is in 2 
places. Please Let me know if you have a better idea to solve this or if it's 
ok to have it copied the default value in 2 places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to