Copilot commented on code in PR #16631:
URL: https://github.com/apache/pinot/pull/16631#discussion_r2300208808
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -765,8 +775,12 @@ protected TaskSchedulingInfo
scheduleTask(PinotTaskGenerator taskGenerator, List
tableTaskConfig.getConfigsForTaskType(taskType).put(MinionConstants.TRIGGERED_BY,
triggeredBy);
}
- taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+ generateWithLock(tableName, taskType, () -> {
+ taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
Review Comment:
The `generateTasks` method is called inside the lock but its return value is
ignored. The method should assign the generated tasks to a variable that can be
used outside the lock, similar to how it was handled in the createTask method.
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/TaskGenerationLockTest.java:
##########
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.AccessOption;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TaskGenerationLockTest extends ControllerTest {
+ private static final String TABLE_NAME_WITH_TYPE = "testTable_OFFLINE";
+ private static final String TASK_TYPE = "TestTaskType";
+
+ private PinotTaskManager _taskManager;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ startZk();
+ startController();
+ _taskManager = _controllerStarter.getTaskManager();
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ stopController();
+ stopZk();
+ }
+
+ @Test
+ public void testSuccessfulLockAcquisitionAndExecution() throws Exception {
+ AtomicBoolean taskExecuted = new AtomicBoolean(false);
+
+ Callable<Void> task = () -> {
+ taskExecuted.set(true);
+ return null;
+ };
+
+ _taskManager.generateWithLock(TABLE_NAME_WITH_TYPE, TASK_TYPE, task);
+
+ Assert.assertTrue(taskExecuted.get());
+
+ String lockPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(
+ TABLE_NAME_WITH_TYPE, TASK_TYPE);
+ ZNRecord lockRecord = _propertyStore.get(lockPath, null,
AccessOption.PERSISTENT);
+ Assert.assertNull(lockRecord);
+ }
+
+ @Test
+ public void testLockReleaseOnTaskException() {
+ Callable<Void> failingTask = () -> {
+ throw new RuntimeException("Task failed");
+ };
+
+ try {
+ _taskManager.generateWithLock(TABLE_NAME_WITH_TYPE, TASK_TYPE,
failingTask);
+ Assert.fail("Expected RuntimeException");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getMessage(), "Task failed");
+ }
+
+ String lockPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(
+ TABLE_NAME_WITH_TYPE, TASK_TYPE);
+ ZNRecord lockRecord = _propertyStore.get(lockPath, null,
AccessOption.PERSISTENT);
+ Assert.assertNull(lockRecord);
+ }
+
+ @Test
+ public void testConcurrentLockAcquisitionFailure() throws Exception {
+ String lockPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(
+ TABLE_NAME_WITH_TYPE, TASK_TYPE);
+
+ ZNRecord existingLock = new ZNRecord("existing-lock");
+ existingLock.setLongField(PinotTaskManager.ACQUIRED_AT,
System.currentTimeMillis());
+ _propertyStore.create(lockPath, existingLock, AccessOption.PERSISTENT);
+
+ AtomicBoolean taskExecuted = new AtomicBoolean(false);
+ Callable<Void> task = () -> {
+ taskExecuted.set(true);
+ return null;
+ };
+
+ try {
+ _taskManager.generateWithLock(TABLE_NAME_WITH_TYPE, TASK_TYPE, task);
+ Assert.fail("Expected RuntimeException for lock acquisition failure");
+ } catch (RuntimeException e) {
+ Assert.assertTrue(e.getMessage().contains("Unable to acquire task
generation lock"));
+ Assert.assertFalse(taskExecuted.get());
+ }
+
+ _propertyStore.remove(lockPath, AccessOption.PERSISTENT);
+ }
+
+ @Test
+ public void testExpiredLockCleanupAndAcquisition() throws Exception {
+ String lockPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(
+ TABLE_NAME_WITH_TYPE, TASK_TYPE);
+
+ long expiredTime = System.currentTimeMillis() -
PinotTaskManager.TASK_GENERATION_LOCK_TTL - 1000;
+ ZNRecord expiredLock = new ZNRecord("expired-lock");
+ expiredLock.setLongField(PinotTaskManager.ACQUIRED_AT, expiredTime);
+ _propertyStore.create(lockPath, expiredLock, AccessOption.PERSISTENT);
+
+ AtomicBoolean taskExecuted = new AtomicBoolean(false);
+ Callable<Void> task = () -> {
+ taskExecuted.set(true);
+ return null;
+ };
+
+ _taskManager.generateWithLock(TABLE_NAME_WITH_TYPE, TASK_TYPE, task);
+
+ Assert.assertTrue(taskExecuted.get());
+
+ ZNRecord lockRecord = _propertyStore.get(lockPath, null,
AccessOption.PERSISTENT);
+ Assert.assertNull(lockRecord);
+ }
+
+ @Test
+ public void testLockTTLBoundary() throws Exception {
+ String lockPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(
+ TABLE_NAME_WITH_TYPE, TASK_TYPE);
+
+ long boundaryTime = System.currentTimeMillis() -
PinotTaskManager.TASK_GENERATION_LOCK_TTL + 1000;
+ ZNRecord boundaryLock = new ZNRecord("boundary-lock");
+ boundaryLock.setLongField(PinotTaskManager.ACQUIRED_AT, boundaryTime);
+ _propertyStore.create(lockPath, boundaryLock, AccessOption.PERSISTENT);
+
+ AtomicBoolean taskExecuted = new AtomicBoolean(false);
+ Callable<Void> task = () -> {
+ taskExecuted.set(true);
+ return null;
+ };
+
+ try {
+ _taskManager.generateWithLock(TABLE_NAME_WITH_TYPE, TASK_TYPE, task);
+ Assert.fail("Expected RuntimeException for non-expired lock");
+ } catch (RuntimeException e) {
+ Assert.assertTrue(e.getMessage().contains("Unable to acquire task
generation lock"));
+ Assert.assertFalse(taskExecuted.get());
+ }
+
+ _propertyStore.remove(lockPath, AccessOption.PERSISTENT);
+ }
+
+ @Test
+ public void testLockWithMissingAcquiredAtField() throws Exception {
+ String lockPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskGenerationLock(
+ TABLE_NAME_WITH_TYPE, TASK_TYPE);
+
+ ZNRecord lockWithoutTimestamp = new ZNRecord("no-timestamp-lock");
+ _propertyStore.create(lockPath, lockWithoutTimestamp,
AccessOption.PERSISTENT);
+
+ AtomicBoolean taskExecuted = new AtomicBoolean(false);
+ Callable<Void> task = () -> {
+ taskExecuted.set(true);
+ return null;
+ };
+
+ _taskManager.generateWithLock(TABLE_NAME_WITH_TYPE, TASK_TYPE, task);
+
+ Assert.assertTrue(taskExecuted.get());
+
+ ZNRecord lockRecord = _propertyStore.get(lockPath, null,
AccessOption.PERSISTENT);
+ Assert.assertNull(lockRecord);
+ }
+
+ @Test
+ public void testMultipleTablesAndTaskTypes() throws Exception {
+ String table1 = "table1_OFFLINE";
+ String table2 = "table2_OFFLINE";
+ String taskType1 = "TaskType1";
+ String taskType2 = "TaskType2";
+
+ AtomicBoolean task1Executed = new AtomicBoolean(false);
+ AtomicBoolean task2Executed = new AtomicBoolean(false);
+ AtomicBoolean task3Executed = new AtomicBoolean(false);
+ AtomicBoolean task4Executed = new AtomicBoolean(false);
+
+ Callable<Void> task1 = getCallable(task1Executed);
+ Callable<Void> task2 = getCallable(task2Executed);
+ Callable<Void> task3 = getCallable(task3Executed);
+ Callable<Void> task4 = getCallable(task4Executed);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+ List<Future> futures = new ArrayList<>(4);
+ generateTask(futures, executorService, table1, taskType1, task1);
+ generateTask(futures, executorService, table1, taskType2, task2);
+ generateTask(futures, executorService, table2, taskType1, task3);
+ generateTask(futures, executorService, table2, taskType2, task4);
+ for (Future future : futures) {
+ future.get();
+ }
+ Assert.assertTrue(task1Executed.get());
+ Assert.assertTrue(task2Executed.get());
+ Assert.assertTrue(task3Executed.get());
+ Assert.assertTrue(task4Executed.get());
+ }
+
+ private static Callable<Void> getCallable(AtomicBoolean task1Executed) {
+ return () -> {
+ Thread.sleep(System.currentTimeMillis() % 10);
Review Comment:
Using `System.currentTimeMillis() % 10` for sleep duration is unpredictable
and could result in zero sleep time. Consider using a fixed small delay like
`Thread.sleep(10)` for more predictable test behavior.
```suggestion
Thread.sleep(10);
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -238,8 +245,11 @@ public Map<String, String> createTask(String taskType,
String tableName, @Nullab
// This can be used by the generator to appropriately set the subtask
configs
// Example usage in BaseTaskGenerator.getNumSubTasks()
taskConfigs.put(MinionConstants.TRIGGERED_BY,
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name());
-
- List<PinotTaskConfig> pinotTaskConfigs =
taskGenerator.generateTasks(tableConfig, taskConfigs);
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+ generateWithLock(tableName, taskType, () -> {
+ pinotTaskConfigs.addAll(taskGenerator.generateTasks(tableConfig,
taskConfigs));
+ return null;
+ });
if (pinotTaskConfigs.isEmpty()) {
Review Comment:
The original logic that assigned the result of
`taskGenerator.generateTasks()` directly to `pinotTaskConfigs` has been changed
to use `addAll()` inside the lock. However, this creates a race condition where
`pinotTaskConfigs` is checked for emptiness on line 253 before the lock
execution completes, as the `generateWithLock` call happens after the
initialization but before the check.
```suggestion
List<PinotTaskConfig> pinotTaskConfigs = generateWithLock(tableName,
taskType, () -> {
return taskGenerator.generateTasks(tableConfig, taskConfigs);
});
if (pinotTaskConfigs == null || pinotTaskConfigs.isEmpty()) {
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -95,6 +100,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
public final static String LEAD_CONTROLLER_MANAGER_KEY =
"LeadControllerManager";
public final static String SCHEDULE_KEY = "schedule";
public final static String MINION_INSTANCE_TAG_CONFIG = "minionInstanceTag";
+ public final static String ACQUIRED_AT = "acquiredAt";
Review Comment:
The constant should include a comment explaining that the value is in
milliseconds (5 minutes) to improve code readability.
```suggestion
public final static String ACQUIRED_AT = "acquiredAt";
// Lock TTL for task generation, in milliseconds (5 minutes)
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -722,6 +722,28 @@ public void executeAdhocTask(AdhocTaskConfig
adhocTaskConfig, @Suspended AsyncRe
}
}
+ @DELETE
+ @Path("/tasks/lock/forceRelease")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.FORCE_RELEASE_TASK_GENERATION_LOCK,
+ paramName = "tableNameWithType")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation("Force releases the task generation lock for a given table and
task type")
+ public SuccessResponse cleanUpTaskGenerationLock(
+ @ApiParam(value = "Task type.") @QueryParam("taskType") @Nullable
+ String taskType,
+ @ApiParam(value = "Table name (with type suffix).")
+ @QueryParam("tableNameWithType") @Nullable String tableNameWithType) {
Review Comment:
The method should validate that both `tableNameWithType` and `taskType`
parameters are not null before proceeding. Missing validation could lead to
NullPointerException in the underlying lock release logic.
```suggestion
@QueryParam("tableNameWithType") @Nullable String tableNameWithType) {
if (tableNameWithType == null || taskType == null) {
throw new ControllerApplicationException(LOGGER,
"Missing required query parameters: tableNameWithType and taskType
must not be null.",
Response.Status.BAD_REQUEST);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]