Copilot commented on code in PR #17842: URL: https://github.com/apache/pinot/pull/17842#discussion_r2930675768
########## pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java: ########## @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.validation; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.core.periodictask.PeriodicTask; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +/** + * Unit tests for {@link BrokerResourceValidationManager}, including that getTablesToProcess + * returns both physical table names and logical table partition names (Issue #15751). + */ +public class BrokerResourceValidationManagerTest { + + private static final String PHYSICAL_TABLE = "myTable_OFFLINE"; + private static final String LOGICAL_TABLE_PARTITION = "my_logical_table"; + + private PinotHelixResourceManager _resourceManager; + private BrokerResourceValidationManager _validationManager; + + @BeforeMethod + public void setUp() { + _resourceManager = mock(PinotHelixResourceManager.class); + when(_resourceManager.getAllTables()).thenReturn(List.of(PHYSICAL_TABLE)); + when(_resourceManager.getBrokerResourceLogicalTables()).thenReturn(List.of(LOGICAL_TABLE_PARTITION)); + + ControllerConf config = new ControllerConf(); + LeadControllerManager leadControllerManager = mock(LeadControllerManager.class); + ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); + _validationManager = new BrokerResourceValidationManager(config, _resourceManager, leadControllerManager, + controllerMetrics); + } + + /** + * Verifies that getTablesToProcess returns both physical tables (from getAllTables) and + * logical table partitions (from getBrokerResourceLogicalTablePartitions) so that the Review Comment: The test Javadoc mentions `getBrokerResourceLogicalTablePartitions`, but the production API used/mocked here is `getBrokerResourceLogicalTables()`. Updating the comment to match the actual method name will avoid confusion during future maintenance/refactors. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1150,16 +1172,36 @@ public PinotResourceManagerResponse rebuildBrokerResource(String tableNameWithTy } private void addInstanceToBrokerIdealState(String brokerTenantTag, String instanceName) { - IdealState tableIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, Helix.BROKER_RESOURCE_INSTANCE); - for (String tableNameWithType : tableIdealState.getPartitionSet()) { - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); - Preconditions.checkNotNull(tableConfig); - String brokerTag = TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig()); - if (brokerTag.equals(brokerTenantTag)) { - tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerResourceStateModel.ONLINE); + // Use atomic read-modify-write so updates (including for logical tables) are persisted and not lost to races. + HelixHelper.updateIdealState(getHelixZkManager(), Helix.BROKER_RESOURCE_INSTANCE, idealState -> { + Preconditions.checkNotNull(idealState, "Broker ideal state must not be null"); + for (String partitionName : idealState.getPartitionSet()) { + String brokerTag = resolveBrokerTagForTable(partitionName); + if (brokerTag.equals(brokerTenantTag)) { + idealState.setPartitionState(partitionName, instanceName, BrokerResourceStateModel.ONLINE); + } } + return idealState; + }, DEFAULT_RETRY_POLICY); + } + + /** + * Resolves the broker tag for a table in the broker resource. Tries physical table config first, + * then logical table config. + * + * @param tableName table name in broker ideal state (physical table name with type or logical table name) + * @return broker tag for the partition, or null if the partition cannot be resolved (unknown or missing config) + */ + private String resolveBrokerTagForTable(String tableName) { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableName); + if (tableConfig != null) { + return TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig()); + } + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); + if (logicalTableConfig != null) { + return TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant()); } - _helixAdmin.setResourceIdealState(_helixClusterName, Helix.BROKER_RESOURCE_INSTANCE, tableIdealState); + throw new InvalidTableConfigException("Failed to resolve broker tag for table " + tableName + " because no table config or logical table config found"); Review Comment: `resolveBrokerTagForTable` Javadoc says it "@return broker tag ... or null" when the partition cannot be resolved, but the implementation always throws `InvalidTableConfigException` when neither physical nor logical config is found. Please either update the Javadoc to reflect the exception behavior, or change the method to return `null` and have callers handle the unresolved-partition case gracefully. ########## pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java: ########## @@ -100,6 +104,57 @@ public void testRebuildBrokerResourceWhenBrokerAdded() .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME))); } + /** + * Verifies that rebuildBrokerResourceFromHelixTags works for a logical table partition when a new broker + * is added manually and the ideal state is out of sync (Issue #15751). + */ + @Test + public void testRebuildBrokerResourceWhenBrokerAddedForLogicalTable() + throws Exception { + // Add realtime table so we can create a logical table with both offline and realtime + TableConfig realtimeTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setNumReplicas(2) + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build(); + _helixResourceManager.addTable(realtimeTableConfig); + + String logicalTableName = "test_logical_rebuild"; + addDummySchema(logicalTableName); + List<String> physicalTableNamesWithType = + List.of(TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME), + TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME)); + LogicalTableConfig logicalTableConfig = + ControllerTest.getDummyLogicalTableConfig(logicalTableName, physicalTableNamesWithType, + TagNameUtils.DEFAULT_TENANT_NAME); + addLogicalTableConfig(logicalTableConfig); + + HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool(); + IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName()); + Assert.assertTrue(idealState.getPartitionSet().contains(logicalTableName)); + + // Add a new broker manually so the logical table partition is missing it + final String newBrokerId = "Broker_localhost_3"; + InstanceConfig instanceConfig = new InstanceConfig(newBrokerId); + instanceConfig.setInstanceEnabled(true); + instanceConfig.setHostName("Broker_localhost"); + instanceConfig.setPort("3"); + helixAdmin.addInstance(getHelixClusterName(), instanceConfig); + helixAdmin.addInstanceTag(getHelixClusterName(), instanceConfig.getInstanceName(), + TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)); + + idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName()); + Assert.assertFalse(idealState.getInstanceSet(logicalTableName) + .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME))); + + _helixResourceManager.rebuildBrokerResourceFromHelixTags(logicalTableName); + idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName()); + Assert.assertTrue(idealState.getInstanceSet(logicalTableName) + .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME))); + + // Cleanup + _helixResourceManager.deleteLogicalTableConfig(logicalTableName); + _helixResourceManager.deleteRealtimeTable(TEST_TABLE_NAME); Review Comment: This new test adds a broker instance (`Broker_localhost_3`) but does not remove it in the cleanup. Because this test class shares a single Helix cluster across test methods and TestNG does not guarantee test execution order, leaving the extra broker can make other tests flaky (e.g., physical-table broker ideal state can become out-of-sync vs `getAllInstancesForBrokerTenant`). Consider dropping the instance (and/or removing its tag) in the cleanup block so each test leaves the cluster in a known state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
