Copilot commented on code in PR #17842:
URL: https://github.com/apache/pinot/pull/17842#discussion_r2930675768


##########
pinot-controller/src/test/java/org/apache/pinot/controller/validation/BrokerResourceValidationManagerTest.java:
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link BrokerResourceValidationManager}, including that 
getTablesToProcess
+ * returns both physical table names and logical table partition names (Issue 
#15751).
+ */
+public class BrokerResourceValidationManagerTest {
+
+  private static final String PHYSICAL_TABLE = "myTable_OFFLINE";
+  private static final String LOGICAL_TABLE_PARTITION = "my_logical_table";
+
+  private PinotHelixResourceManager _resourceManager;
+  private BrokerResourceValidationManager _validationManager;
+
+  @BeforeMethod
+  public void setUp() {
+    _resourceManager = mock(PinotHelixResourceManager.class);
+    when(_resourceManager.getAllTables()).thenReturn(List.of(PHYSICAL_TABLE));
+    
when(_resourceManager.getBrokerResourceLogicalTables()).thenReturn(List.of(LOGICAL_TABLE_PARTITION));
+
+    ControllerConf config = new ControllerConf();
+    LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
+    ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    _validationManager = new BrokerResourceValidationManager(config, 
_resourceManager, leadControllerManager,
+        controllerMetrics);
+  }
+
+  /**
+   * Verifies that getTablesToProcess returns both physical tables (from 
getAllTables) and
+   * logical table partitions (from getBrokerResourceLogicalTablePartitions) 
so that the

Review Comment:
   The test Javadoc mentions `getBrokerResourceLogicalTablePartitions`, but the 
production API used/mocked here is `getBrokerResourceLogicalTables()`. Updating 
the comment to match the actual method name will avoid confusion during future 
maintenance/refactors.
   



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1150,16 +1172,36 @@ public PinotResourceManagerResponse 
rebuildBrokerResource(String tableNameWithTy
   }
 
   private void addInstanceToBrokerIdealState(String brokerTenantTag, String 
instanceName) {
-    IdealState tableIdealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, 
Helix.BROKER_RESOURCE_INSTANCE);
-    for (String tableNameWithType : tableIdealState.getPartitionSet()) {
-      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-      Preconditions.checkNotNull(tableConfig);
-      String brokerTag = 
TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
-      if (brokerTag.equals(brokerTenantTag)) {
-        tableIdealState.setPartitionState(tableNameWithType, instanceName, 
BrokerResourceStateModel.ONLINE);
+    // Use atomic read-modify-write so updates (including for logical tables) 
are persisted and not lost to races.
+    HelixHelper.updateIdealState(getHelixZkManager(), 
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
+      Preconditions.checkNotNull(idealState, "Broker ideal state must not be 
null");
+      for (String partitionName : idealState.getPartitionSet()) {
+        String brokerTag = resolveBrokerTagForTable(partitionName);
+        if (brokerTag.equals(brokerTenantTag)) {
+          idealState.setPartitionState(partitionName, instanceName, 
BrokerResourceStateModel.ONLINE);
+        }
       }
+      return idealState;
+    }, DEFAULT_RETRY_POLICY);
+  }
+
+  /**
+   * Resolves the broker tag for a table in the broker resource. Tries 
physical table config first,
+   * then logical table config.
+   *
+   * @param tableName table name in broker ideal state (physical table name 
with type or logical table name)
+   * @return broker tag for the partition, or null if the partition cannot be 
resolved (unknown or missing config)
+   */
+  private String resolveBrokerTagForTable(String tableName) {
+    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableName);
+    if (tableConfig != null) {
+      return TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
+    }
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+    if (logicalTableConfig != null) {
+      return 
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant());
     }
-    _helixAdmin.setResourceIdealState(_helixClusterName, 
Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+    throw new InvalidTableConfigException("Failed to resolve broker tag for 
table " + tableName + " because no table config or logical table config found");

Review Comment:
   `resolveBrokerTagForTable` Javadoc says it "@return broker tag ... or null" 
when the partition cannot be resolved, but the implementation always throws 
`InvalidTableConfigException` when neither physical nor logical config is 
found. Please either update the Javadoc to reflect the exception behavior, or 
change the method to return `null` and have callers handle the 
unresolved-partition case gracefully.



##########
pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java:
##########
@@ -100,6 +104,57 @@ public void testRebuildBrokerResourceWhenBrokerAdded()
         
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
   }
 
+  /**
+   * Verifies that rebuildBrokerResourceFromHelixTags works for a logical 
table partition when a new broker
+   * is added manually and the ideal state is out of sync (Issue #15751).
+   */
+  @Test
+  public void testRebuildBrokerResourceWhenBrokerAddedForLogicalTable()
+      throws Exception {
+    // Add realtime table so we can create a logical table with both offline 
and realtime
+    TableConfig realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+    _helixResourceManager.addTable(realtimeTableConfig);
+
+    String logicalTableName = "test_logical_rebuild";
+    addDummySchema(logicalTableName);
+    List<String> physicalTableNamesWithType =
+        List.of(TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME),
+            TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME));
+    LogicalTableConfig logicalTableConfig =
+        ControllerTest.getDummyLogicalTableConfig(logicalTableName, 
physicalTableNamesWithType,
+            TagNameUtils.DEFAULT_TENANT_NAME);
+    addLogicalTableConfig(logicalTableConfig);
+
+    HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+    IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin, 
getHelixClusterName());
+    Assert.assertTrue(idealState.getPartitionSet().contains(logicalTableName));
+
+    // Add a new broker manually so the logical table partition is missing it
+    final String newBrokerId = "Broker_localhost_3";
+    InstanceConfig instanceConfig = new InstanceConfig(newBrokerId);
+    instanceConfig.setInstanceEnabled(true);
+    instanceConfig.setHostName("Broker_localhost");
+    instanceConfig.setPort("3");
+    helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+    helixAdmin.addInstanceTag(getHelixClusterName(), 
instanceConfig.getInstanceName(),
+        TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+
+    idealState = HelixHelper.getBrokerIdealStates(helixAdmin, 
getHelixClusterName());
+    Assert.assertFalse(idealState.getInstanceSet(logicalTableName)
+        
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+    _helixResourceManager.rebuildBrokerResourceFromHelixTags(logicalTableName);
+    idealState = HelixHelper.getBrokerIdealStates(helixAdmin, 
getHelixClusterName());
+    Assert.assertTrue(idealState.getInstanceSet(logicalTableName)
+        
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+    // Cleanup
+    _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+    _helixResourceManager.deleteRealtimeTable(TEST_TABLE_NAME);

Review Comment:
   This new test adds a broker instance (`Broker_localhost_3`) but does not 
remove it in the cleanup. Because this test class shares a single Helix cluster 
across test methods and TestNG does not guarantee test execution order, leaving 
the extra broker can make other tests flaky (e.g., physical-table broker ideal 
state can become out-of-sync vs `getAllInstancesForBrokerTenant`). Consider 
dropping the instance (and/or removing its tag) in the cleanup block so each 
test leaves the cluster in a known state.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to