Copilot commented on code in PR #17842:
URL: https://github.com/apache/pinot/pull/17842#discussion_r2906718144
##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java:
##########
@@ -54,15 +69,26 @@ protected Context preprocess(Properties
periodicTaskProperties) {
@Override
protected void processTable(String tableNameWithType, Context context) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping broker
resource validation", tableNameWithType);
- return;
+ Set<String> brokerInstances;
+ if (TableNameBuilder.isTableResource(tableNameWithType)) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
broker resource validation",
+ tableNameWithType);
+ return;
+ }
+ brokerInstances = _pinotHelixResourceManager
+ .getAllInstancesForBrokerTenant(context._instanceConfigs,
tableConfig.getTenantConfig().getBroker());
+ } else {
+ LogicalTableConfig logicalTableConfig =
_pinotHelixResourceManager.getLogicalTableConfig(tableNameWithType);
+ if (logicalTableConfig == null) {
+ LOGGER.warn("Failed to find logical table config for: {}, skipping
broker resource validation",
+ tableNameWithType);
+ return;
+ }
+ brokerInstances = _pinotHelixResourceManager
+ .getAllInstancesForBrokerTenant(context._instanceConfigs,
logicalTableConfig.getBrokerTenant());
Review Comment:
Using `TableNameBuilder.isTableResource(...)` to decide physical vs logical
can misclassify a logical table whose name ends with `_OFFLINE`/`_REALTIME`,
causing it to be treated as a physical table and skipped (no table config
found). Prefer determining the type by config presence (e.g., attempt table
config lookup first; if absent, fall back to logical table config) rather than
relying on name patterns.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -748,6 +748,25 @@ public List<String> getAllTables() {
return
getAllResources().stream().filter(TableNameBuilder::isTableResource).collect(Collectors.toList());
}
+ /**
+ * Returns partition names in broker resource ideal state that are logical
tables (not physical _OFFLINE/_REALTIME)
+ * and have a logical table config. Used by broker resource validation to
repair logical table broker assignments.
+ */
+ public List<String> getBrokerResourceLogicalTablePartitions() {
+ IdealState brokerIdealState =
HelixHelper.getBrokerIdealStates(_helixAdmin, _helixClusterName);
+ if (brokerIdealState == null) {
+ return Collections.emptyList();
+ }
+ List<String> logicalPartitions = new ArrayList<>();
+ for (String partition : brokerIdealState.getPartitionSet()) {
+ if (!TableNameBuilder.isTableResource(partition)
+ && ZKMetadataProvider.getLogicalTableConfig(_propertyStore,
partition) != null) {
+ logicalPartitions.add(partition);
+ }
+ }
Review Comment:
`getBrokerResourceLogicalTablePartitions()` currently excludes partitions
that *look* like physical tables by name (`isTableResource`). This can drop
valid logical tables whose names match the physical naming convention (e.g.,
end with `_OFFLINE`). Consider identifying logical partitions by checking
configs instead (e.g., if `getTableConfig(...)` is null and
`getLogicalTableConfig(...)` is non-null, treat as logical), rather than
relying on name heuristics.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java:
##########
@@ -45,6 +49,17 @@ public BrokerResourceValidationManager(ControllerConf
config, PinotHelixResource
controllerMetrics);
}
+ @Override
+ protected List<String> getTablesToProcess(Properties periodicTaskProperties)
{
+ List<String> tables = super.getTablesToProcess(periodicTaskProperties);
+ if (periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME) !=
null) {
+ return tables;
+ }
+ List<String> combined = new ArrayList<>(tables);
+
combined.addAll(_pinotHelixResourceManager.getBrokerResourceLogicalTablePartitions());
+ return combined;
+ }
+
@Override
Review Comment:
This adds logical-table partitions to the work list, but
`ControllerPeriodicTask` will still filter via `shouldProcessTable()` (default:
`_leadControllerManager.isLeaderForTable(tableNameWithType)`). If table
leadership is only tracked for physical tables, logical partitions may be
filtered out and never processed. Consider overriding `shouldProcessTable` in
`BrokerResourceValidationManager` to ensure logical partitions are processed
(e.g., return `true` for logical partitions, while retaining the leader check
for physical tables).
```suggestion
@Override
protected boolean shouldProcessTable(String tableNameWithType) {
// Always process logical-table partitions; retain leader-based
filtering for physical tables.
if (!TableNameBuilder.isTableResource(tableNameWithType)) {
return true;
}
return super.shouldProcessTable(tableNameWithType);
}
@Override
```
##########
pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link BrokerResourceValidationManager}, including that
getTablesToProcess
+ * returns both physical table names and logical table partition names (Issue
#15751).
+ */
+public class BrokerResourceValidationManagerTest {
+
+ private static final String PHYSICAL_TABLE = "myTable_OFFLINE";
+ private static final String LOGICAL_TABLE_PARTITION = "my_logical_table";
+
+ private PinotHelixResourceManager _resourceManager;
+ private BrokerResourceValidationManager _validationManager;
+
+ @BeforeMethod
+ public void setUp() {
+ _resourceManager = mock(PinotHelixResourceManager.class);
+ when(_resourceManager.getAllTables()).thenReturn(List.of(PHYSICAL_TABLE));
+
when(_resourceManager.getBrokerResourceLogicalTablePartitions()).thenReturn(List.of(LOGICAL_TABLE_PARTITION));
+
+ ControllerConf config = new ControllerConf();
+ LeadControllerManager leadControllerManager =
mock(LeadControllerManager.class);
+ ControllerMetrics controllerMetrics = new
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+ _validationManager = new BrokerResourceValidationManager(config,
_resourceManager, leadControllerManager,
+ controllerMetrics);
+ }
+
+ /**
+ * Verifies that getTablesToProcess returns both physical tables (from
getAllTables) and
+ * logical table partitions (from getBrokerResourceLogicalTablePartitions)
so that the
+ * periodic task validates and repairs broker resource for logical tables
too.
+ */
+ @Test
+ public void testGetTablesToProcessIncludesLogicalTablePartitions()
+ throws Exception {
+ Method getTablesToProcess =
+
BrokerResourceValidationManager.class.getDeclaredMethod("getTablesToProcess",
Properties.class);
+ getTablesToProcess.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ List<String> tables = (List<String>)
getTablesToProcess.invoke(_validationManager, new Properties());
Review Comment:
The test uses reflection to invoke `getTablesToProcess`, but the method is
`protected` and the test is in the same package
(`org.apache.pinot.controller.validation`), so it can be called directly
without reflection. Calling it directly will make the test less brittle (e.g.,
no dependence on method name strings or reflective access).
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1150,16 +1189,42 @@ public PinotResourceManagerResponse
rebuildBrokerResource(String tableNameWithTy
}
private void addInstanceToBrokerIdealState(String brokerTenantTag, String
instanceName) {
- IdealState tableIdealState =
_helixAdmin.getResourceIdealState(_helixClusterName,
Helix.BROKER_RESOURCE_INSTANCE);
- for (String tableNameWithType : tableIdealState.getPartitionSet()) {
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- Preconditions.checkNotNull(tableConfig);
- String brokerTag =
TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
- if (brokerTag.equals(brokerTenantTag)) {
- tableIdealState.setPartitionState(tableNameWithType, instanceName,
BrokerResourceStateModel.ONLINE);
+ // Use atomic read-modify-write so updates (including for logical tables)
are persisted and not lost to races.
+ HelixHelper.updateIdealState(getHelixZkManager(),
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
+ assert idealState != null;
+ for (String partitionName : idealState.getPartitionSet()) {
+ String brokerTag =
resolveBrokerTagForBrokerResourcePartition(partitionName);
+ if (brokerTag == null) {
+ continue;
+ }
+ if (brokerTag.equals(brokerTenantTag)) {
+ idealState.setPartitionState(partitionName, instanceName,
BrokerResourceStateModel.ONLINE);
+ }
}
+ return idealState;
+ }, DEFAULT_RETRY_POLICY);
+ }
+
+ /**
+ * Resolves the broker tag for a partition in the broker resource. Tries
physical table config first,
+ * then logical table config, so both are handled regardless of partition
naming (e.g. a logical table
+ * name ending with _OFFLINE would otherwise be misclassified as physical
and skipped).
+ *
+ * @param partitionName partition name in broker ideal state (physical table
name with type or logical table name)
+ * @return broker tag for the partition, or null if the partition cannot be
resolved (unknown or missing config)
+ */
+ private String resolveBrokerTagForBrokerResourcePartition(String
partitionName) {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, partitionName);
+ if (tableConfig != null) {
+ return TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
+ }
+ LogicalTableConfig logicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, partitionName);
+ if (logicalTableConfig != null) {
+ return
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant());
}
- _helixAdmin.setResourceIdealState(_helixClusterName,
Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+ LOGGER.warn("Skipping partition {} in broker resource: no table config or
logical table config found",
Review Comment:
Logging a WARN for every unresolved broker-resource partition during broker
add can create noisy logs (especially if partitions transiently exist without
configs during table drops/updates). Consider lowering this to DEBUG, or
rate-limiting/aggregating (e.g., log once per call with a count), to avoid
spamming controller logs.
```suggestion
LOGGER.debug("Skipping partition {} in broker resource: no table config
or logical table config found",
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1102,21 +1121,41 @@ private PinotResourceManagerResponse
scaleUpBroker(Tenant tenant, String brokerT
public PinotResourceManagerResponse
rebuildBrokerResourceFromHelixTags(String tableNameWithType)
throws Exception {
- TableConfig tableConfig;
- try {
- tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore,
tableNameWithType);
- } catch (Exception e) {
- LOGGER.warn("Caught exception while getting table config for table {}",
tableNameWithType, e);
- throw new InvalidTableConfigException(
- "Failed to fetch broker tag for table " + tableNameWithType + " due
to exception: " + e.getMessage());
- }
- if (tableConfig == null) {
- LOGGER.warn("Table {} does not exist", tableNameWithType);
- throw new InvalidConfigException(
- "Invalid table configuration for table " + tableNameWithType + ".
Table does not exist");
+ Set<String> brokerInstances;
+ if (TableNameBuilder.isTableResource(tableNameWithType)) {
+ TableConfig tableConfig;
+ try {
+ tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore,
tableNameWithType);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while getting table config for table
{}", tableNameWithType, e);
+ throw new InvalidTableConfigException(
+ "Failed to fetch broker tag for table " + tableNameWithType + "
due to exception: " + e.getMessage());
+ }
+ if (tableConfig == null) {
+ LOGGER.warn("Table {} does not exist", tableNameWithType);
+ throw new InvalidConfigException(
+ "Invalid table configuration for table " + tableNameWithType + ".
Table does not exist");
+ }
+ brokerInstances =
getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker());
+ } else {
+ // Logical table
+ LogicalTableConfig logicalTableConfig;
Review Comment:
`rebuildBrokerResourceFromHelixTags` uses
`isTableResource(tableNameWithType)` to branch. This has the same
misclassification problem as above: a logical table named with a physical
suffix will go down the physical path and throw `InvalidConfigException` even
though a logical table config exists. Consider branching by config existence
(try physical `getTableConfig`; if null then try `getLogicalTableConfig`)
rather than by name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]