snleee commented on code in PR #12459:
URL: https://github.com/apache/pinot/pull/12459#discussion_r1529520945
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -534,66 +535,85 @@ private synchronized Map<String, String>
scheduleTasks(List<String> tableNamesWi
* Returns the task name, or {@code null} if no task is scheduled.
*/
@Nullable
- private String scheduleTask(PinotTaskGenerator taskGenerator,
List<TableConfig> enabledTableConfigs,
+ private List<String> scheduleTask(PinotTaskGenerator taskGenerator,
List<TableConfig> enabledTableConfigs,
boolean isLeader) {
LOGGER.info("Trying to schedule task type: {}, isLeader: {}",
taskGenerator.getTaskType(), isLeader);
- List<PinotTaskConfig> pinotTaskConfigs;
- try {
- /* TODO taskGenerator may skip generating tasks for some of the tables
being passed to it.
- In that case, we should not be storing success timestamps for those
table. Same with exceptions that should
- only be associated with the table for which it was raised and not
every eligible table. We can have the
- generateTasks() return a list of TaskGeneratorMostRecentRunInfo for
each table
- */
- pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
- long successRunTimestamp = System.currentTimeMillis();
- for (TableConfig tableConfig : enabledTableConfigs) {
-
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(),
taskGenerator.getTaskType(),
+ Map<String, List<PinotTaskConfig>> pinotMinionInstanceToTaskConfigs = new
HashMap<>();
+ String taskType = taskGenerator.getTaskType();
+ for (TableConfig tableConfig : enabledTableConfigs) {
+ String tableName = tableConfig.getTableName();
+ try {
+ String minionInstanceTag =
taskGenerator.getMinionInstanceTag(tableConfig);
+ List<PinotTaskConfig> presentTaskConfig =
+
pinotMinionInstanceToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new
ArrayList<>());
+ taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+ pinotMinionInstanceToTaskConfigs.put(minionInstanceTag,
presentTaskConfig);
+ long successRunTimestamp = System.currentTimeMillis();
+ _taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
taskGeneratorMostRecentRunInfo ->
taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
// before the first task schedule, the follow two gauge metrics will
be empty
// TODO: find a better way to report task generation information
- _controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(),
taskGenerator.getTaskType(),
+ _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
() -> System.currentTimeMillis() - successRunTimestamp);
- _controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(),
taskGenerator.getTaskType(),
+ _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 0L);
- }
- } catch (Exception e) {
- StringWriter errors = new StringWriter();
- try (PrintWriter pw = new PrintWriter(errors)) {
- e.printStackTrace(pw);
- }
- long successRunTimestamp = System.currentTimeMillis();
- for (TableConfig tableConfig : enabledTableConfigs) {
-
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(),
taskGenerator.getTaskType(),
- taskGeneratorMostRecentRunInfo ->
taskGeneratorMostRecentRunInfo.addErrorRunMessage(successRunTimestamp,
+ } catch (Exception e) {
+ StringWriter errors = new StringWriter();
+ try (PrintWriter pw = new PrintWriter(errors)) {
+ e.printStackTrace(pw);
+ }
+ long failureRunTimestamp = System.currentTimeMillis();
+ _taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
+ taskGeneratorMostRecentRunInfo ->
taskGeneratorMostRecentRunInfo.addErrorRunMessage(failureRunTimestamp,
errors.toString()));
// before the first task schedule, the follow gauge metric will be
empty
// TODO: find a better way to report task generation information
- _controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(),
taskGenerator.getTaskType(),
+ _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 1L);
+ LOGGER.error("Failed to generate tasks for task type {} for table {}",
taskType, tableName, e);
}
- throw e;
}
if (!isLeader) {
taskGenerator.nonLeaderCleanUp();
}
- String taskType = taskGenerator.getTaskType();
- int numTasks = pinotTaskConfigs.size();
- if (numTasks > 0) {
- LOGGER.info("Submitting {} tasks for task type: {} with task configs:
{}", numTasks, taskType, pinotTaskConfigs);
- _controllerMetrics.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
- return _helixTaskResourceManager.submitTask(pinotTaskConfigs,
taskGenerator.getTaskTimeoutMs(),
- taskGenerator.getNumConcurrentTasksPerInstance(),
taskGenerator.getMaxAttemptsPerTask());
- }
- LOGGER.info("No task to schedule for task type: {}", taskType);
- return null;
+ int numErrorTasksScheduled = 0;
+ List<String> submittedTaskNames = new ArrayList<>();
+ for (String minionInstanceTag : pinotMinionInstanceToTaskConfigs.keySet())
{
+ List<PinotTaskConfig> pinotTaskConfigs =
pinotMinionInstanceToTaskConfigs.get(minionInstanceTag);
+ int numTasks = pinotTaskConfigs.size();
+ try {
+ if (numTasks > 0) {
+ // This might lead to lot of logs, maybe sum it up and move outside
the loop
+ LOGGER.info("Submitting {} tasks for task type: {} to
minionInstance: {} with task configs: {}", numTasks,
+ taskType, minionInstanceTag, pinotTaskConfigs);
+ String submittedTaskName =
_helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag,
Review Comment:
We already support submitting task with `minionInstanceTag`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -534,66 +535,85 @@ private synchronized Map<String, String>
scheduleTasks(List<String> tableNamesWi
* Returns the task name, or {@code null} if no task is scheduled.
Review Comment:
Let's update the documentation here as we are changing the returning data
type
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -534,66 +535,85 @@ private synchronized Map<String, String>
scheduleTasks(List<String> tableNamesWi
* Returns the task name, or {@code null} if no task is scheduled.
*/
@Nullable
- private String scheduleTask(PinotTaskGenerator taskGenerator,
List<TableConfig> enabledTableConfigs,
+ private List<String> scheduleTask(PinotTaskGenerator taskGenerator,
List<TableConfig> enabledTableConfigs,
boolean isLeader) {
LOGGER.info("Trying to schedule task type: {}, isLeader: {}",
taskGenerator.getTaskType(), isLeader);
- List<PinotTaskConfig> pinotTaskConfigs;
- try {
- /* TODO taskGenerator may skip generating tasks for some of the tables
being passed to it.
- In that case, we should not be storing success timestamps for those
table. Same with exceptions that should
- only be associated with the table for which it was raised and not
every eligible table. We can have the
- generateTasks() return a list of TaskGeneratorMostRecentRunInfo for
each table
- */
- pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
- long successRunTimestamp = System.currentTimeMillis();
- for (TableConfig tableConfig : enabledTableConfigs) {
-
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(),
taskGenerator.getTaskType(),
+ Map<String, List<PinotTaskConfig>> pinotMinionInstanceToTaskConfigs = new
HashMap<>();
Review Comment:
Let's rename this to `minionInstanceTagToTaskConfigs` since
pinotMinionInstance and instance tag are different concept.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]