Copilot commented on code in PR #17842:
URL: https://github.com/apache/pinot/pull/17842#discussion_r2930898348
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1100,23 +1108,25 @@ private PinotResourceManagerResponse
scaleUpBroker(Tenant tenant, String brokerT
return PinotResourceManagerResponse.SUCCESS;
}
- public PinotResourceManagerResponse
rebuildBrokerResourceFromHelixTags(String tableNameWithType)
- throws Exception {
- TableConfig tableConfig;
+ public PinotResourceManagerResponse
rebuildBrokerResourceFromHelixTags(String tableNameWithType) {
+ Set<String> brokerInstances;
try {
- tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore,
tableNameWithType);
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ if (tableConfig != null) {
+ brokerInstances =
getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker());
+ } else {
+ LogicalTableConfig logicalTableConfig =
+ ZKMetadataProvider.getLogicalTableConfig(_propertyStore,
tableNameWithType);
+ Preconditions.checkNotNull(logicalTableConfig, "No table config or
logical table config found for %s",
+ tableNameWithType);
+ brokerInstances =
getAllInstancesForBrokerTenant(logicalTableConfig.getBrokerTenant());
+ }
} catch (Exception e) {
- LOGGER.warn("Caught exception while getting table config for table {}",
tableNameWithType, e);
+ LOGGER.warn("Caught exception while getting config for table {}",
tableNameWithType, e);
throw new InvalidTableConfigException(
- "Failed to fetch broker tag for table " + tableNameWithType + " due
to exception: " + e.getMessage());
+ "Failed to fetch broker config for table " + tableNameWithType + "
due to exception: " + e.getMessage());
}
- if (tableConfig == null) {
- LOGGER.warn("Table {} does not exist", tableNameWithType);
- throw new InvalidConfigException(
- "Invalid table configuration for table " + tableNameWithType + ".
Table does not exist");
- }
- return rebuildBrokerResource(tableNameWithType,
-
getAllInstancesForBrokerTenant(tableConfig.getTenantConfig().getBroker()));
+ return rebuildBrokerResource(tableNameWithType, brokerInstances);
}
Review Comment:
For a non-existent `tableNameWithType`, the old behavior threw a targeted
“table does not exist” style error; this new flow triggers
`Preconditions.checkNotNull(...)`, which is caught and rethrown as
`InvalidTableConfigException` with a generic “Failed to fetch broker config...”
message. Consider explicitly handling the “both configs are null” case outside
the catch to preserve clearer, stable error semantics (and avoid treating “not
found” as an exception path).
##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java:
##########
@@ -45,6 +48,17 @@ public BrokerResourceValidationManager(ControllerConf
config, PinotHelixResource
controllerMetrics);
}
+ @Override
+ protected List<String> getTablesToProcess(Properties periodicTaskProperties)
{
+ List<String> tables = super.getTablesToProcess(periodicTaskProperties);
+ if (periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME) !=
null) {
+ return tables;
+ }
+ List<String> combined = new ArrayList<>(tables);
+
combined.addAll(_pinotHelixResourceManager.getBrokerResourceLogicalTables());
+ return combined;
+ }
Review Comment:
The combined list can contain duplicates (e.g., if the logical list contains
entries already present, or if the underlying methods return duplicates).
Duplicates can cause repeated rebuild calls and noisy logs/metrics. Consider
de-duplicating while preserving deterministic order (e.g., via a
`LinkedHashSet` or `stream().distinct()`).
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java:
##########
@@ -64,18 +64,32 @@ public ControllerPeriodicTask(String taskName, long
runFrequencyInSeconds, long
_controllerMetrics = controllerMetrics;
}
+ /**
+ * Returns the list of table names (with type) to consider for this task.
Subclasses may override to add
+ * more names (e.g. logical table partitions). Default: single table from
property or all physical tables.
+ */
+ protected List<String> getTablesToProcess(Properties periodicTaskProperties)
{
+ String propTableNameWithType = (String)
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
Review Comment:
`Properties` is string-keyed and typically accessed via `getProperty(...)`.
Casting `periodicTaskProperties.get(...)` to `String` can throw
`ClassCastException` if a non-String value is ever set (including in
tests/utilities). Prefer
`periodicTaskProperties.getProperty(PeriodicTask.PROPERTY_KEY_TABLE_NAME)` for
safer, idiomatic access.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1150,16 +1160,37 @@ public PinotResourceManagerResponse
rebuildBrokerResource(String tableNameWithTy
}
private void addInstanceToBrokerIdealState(String brokerTenantTag, String
instanceName) {
- IdealState tableIdealState =
_helixAdmin.getResourceIdealState(_helixClusterName,
Helix.BROKER_RESOURCE_INSTANCE);
- for (String tableNameWithType : tableIdealState.getPartitionSet()) {
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- Preconditions.checkNotNull(tableConfig);
- String brokerTag =
TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig());
- if (brokerTag.equals(brokerTenantTag)) {
- tableIdealState.setPartitionState(tableNameWithType, instanceName,
BrokerResourceStateModel.ONLINE);
+ // Use atomic read-modify-write so updates (including for logical tables)
are persisted and not lost to races.
+ HelixHelper.updateIdealState(getHelixZkManager(),
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
+ Preconditions.checkNotNull(idealState, "Broker ideal state must not be
null");
+ for (String partitionName : idealState.getPartitionSet()) {
+ String brokerTag = resolveBrokerTagForTable(partitionName);
+ if (brokerTag.equals(brokerTenantTag)) {
+ idealState.setPartitionState(partitionName, instanceName,
BrokerResourceStateModel.ONLINE);
+ }
}
+ return idealState;
+ }, DEFAULT_RETRY_POLICY);
Review Comment:
`resolveBrokerTagForTable(partitionName)` does ZK reads per partition, which
can be expensive for large broker resources and adds load during broker add.
Also, if one partition can’t be resolved, the exception will abort the entire
IdealState update. Consider (1) caching `partitionName -> brokerTag` within the
update lambda to avoid repeated ZK lookups, and (2) handling resolution
failures per-partition (warn + continue) so one stale/orphaned partition
doesn’t prevent updating the rest.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java:
##########
@@ -422,6 +422,56 @@ public void testBrokerResourceValidationManager() {
}, 600_000L, "Timeout when waiting for BrokerResourceValidationManager");
}
+ /**
+ * Verifies that BrokerResourceValidationManager also repairs broker
resource for a logical table
+ * when a new broker is added (Issue #15751).
+ */
+ @Test
+ public void testBrokerResourceValidationManagerRepairsLogicalTable()
+ throws IOException {
+ // Add logical table (same broker tenant as physical table)
+ Schema logicalTableSchema = createSchema();
+ logicalTableSchema.setSchemaName(getLogicalTableName());
+ addSchema(logicalTableSchema);
+ createLogicalTable();
+
+ String helixClusterName = getHelixClusterName();
+ String logicalTableName = getLogicalTableName();
+ IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ assertNotNull(idealState);
+ assertTrue(idealState.getPartitionSet().contains(logicalTableName),
"Broker resource should have logical table");
+
+ // Add a new broker so logical table partition is out of sync
+ String brokerId = "Broker_localhost_5678";
+ InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(brokerId);
+ instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+ _helixAdmin.addInstance(helixClusterName, instanceConfig);
+ Set<String> brokersAfterAdd =
_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
+ assertTrue(brokersAfterAdd.contains(brokerId));
+
+ // Assert logical table partition does not yet contain the new broker
(periodic task will repair it)
+ idealState = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ assertNotNull(idealState);
+ assertFalse(idealState.getInstanceSet(logicalTableName).contains(brokerId),
+ "Logical table partition should not yet include the new broker before
periodic task runs");
+
+ // Wait for BrokerResourceValidationManager to repair both physical and
logical table partitions
+ String tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+ TestUtils.waitForCondition(aVoid -> {
+ IdealState is = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ if (is == null) {
+ return false;
+ }
+ return is.getInstanceSet(tableNameWithType).equals(brokersAfterAdd)
+ && is.getInstanceSet(logicalTableName).equals(brokersAfterAdd);
+ }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager to
repair logical table partition");
+
+ // Cleanup: drop broker, logical table config, and logical table schema
+ _helixAdmin.dropInstance(helixClusterName, instanceConfig);
+ _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+ deleteSchema(logicalTableName);
Review Comment:
Cleanup isn’t in a `finally` block. If any assertion or the wait times out,
the test can leak the broker instance and/or logical table config into
subsequent tests, making the suite flaky. Wrap the broker/config creation
portion in `try` and move cleanup to `finally` to guarantee cluster state is
restored.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java:
##########
@@ -422,6 +422,56 @@ public void testBrokerResourceValidationManager() {
}, 600_000L, "Timeout when waiting for BrokerResourceValidationManager");
}
+ /**
+ * Verifies that BrokerResourceValidationManager also repairs broker
resource for a logical table
+ * when a new broker is added (Issue #15751).
+ */
+ @Test
+ public void testBrokerResourceValidationManagerRepairsLogicalTable()
+ throws IOException {
+ // Add logical table (same broker tenant as physical table)
+ Schema logicalTableSchema = createSchema();
+ logicalTableSchema.setSchemaName(getLogicalTableName());
+ addSchema(logicalTableSchema);
+ createLogicalTable();
+
+ String helixClusterName = getHelixClusterName();
+ String logicalTableName = getLogicalTableName();
+ IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ assertNotNull(idealState);
+ assertTrue(idealState.getPartitionSet().contains(logicalTableName),
"Broker resource should have logical table");
+
+ // Add a new broker so logical table partition is out of sync
+ String brokerId = "Broker_localhost_5678";
+ InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(brokerId);
+ instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant(TENANT_NAME));
+ _helixAdmin.addInstance(helixClusterName, instanceConfig);
+ Set<String> brokersAfterAdd =
_helixResourceManager.getAllInstancesForBrokerTenant(TENANT_NAME);
+ assertTrue(brokersAfterAdd.contains(brokerId));
+
+ // Assert logical table partition does not yet contain the new broker
(periodic task will repair it)
+ idealState = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ assertNotNull(idealState);
+ assertFalse(idealState.getInstanceSet(logicalTableName).contains(brokerId),
+ "Logical table partition should not yet include the new broker before
periodic task runs");
+
+ // Wait for BrokerResourceValidationManager to repair both physical and
logical table partitions
+ String tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+ TestUtils.waitForCondition(aVoid -> {
+ IdealState is = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
+ if (is == null) {
+ return false;
+ }
+ return is.getInstanceSet(tableNameWithType).equals(brokersAfterAdd)
+ && is.getInstanceSet(logicalTableName).equals(brokersAfterAdd);
+ }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager to
repair logical table partition");
+
+ // Cleanup: drop broker, logical table config, and logical table schema
+ _helixAdmin.dropInstance(helixClusterName, instanceConfig);
+ _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+ deleteSchema(logicalTableName);
Review Comment:
Cleanup isn’t in a `finally` block. If any assertion or the wait times out,
the test can leak the broker instance and/or logical table config into
subsequent tests, making the suite flaky. Wrap the broker/config creation
portion in `try` and move cleanup to `finally` to guarantee cluster state is
restored.
##########
pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java:
##########
@@ -100,6 +104,62 @@ public void testRebuildBrokerResourceWhenBrokerAdded()
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
}
+ /**
+ * Verifies that rebuildBrokerResourceFromHelixTags works for a logical
table partition when a new broker
+ * is added manually and the ideal state is out of sync (Issue #15751).
+ */
+ @Test
+ public void testRebuildBrokerResourceWhenBrokerAddedForLogicalTable()
+ throws Exception {
+ // Add realtime table so we can create a logical table with both offline
and realtime
+ TableConfig realtimeTableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+ _helixResourceManager.addTable(realtimeTableConfig);
+
+ String logicalTableName = "test_logical_rebuild";
+ addDummySchema(logicalTableName);
+ List<String> physicalTableNamesWithType =
+ List.of(TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME),
+ TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME));
+ LogicalTableConfig logicalTableConfig =
+ ControllerTest.getDummyLogicalTableConfig(logicalTableName,
physicalTableNamesWithType,
+ TagNameUtils.DEFAULT_TENANT_NAME);
+ addLogicalTableConfig(logicalTableConfig);
+
+ HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+ IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertTrue(idealState.getPartitionSet().contains(logicalTableName));
+
+ // Add a new broker manually so the logical table partition is missing it
+ final String newBrokerId = "Broker_localhost_3";
+ InstanceConfig instanceConfig = new InstanceConfig(newBrokerId);
+ instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setHostName("Broker_localhost");
+ instanceConfig.setPort("3");
+ helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+ helixAdmin.addInstanceTag(getHelixClusterName(),
instanceConfig.getInstanceName(),
+ TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+
+ idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertFalse(idealState.getInstanceSet(logicalTableName)
+
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+ _helixResourceManager.rebuildBrokerResourceFromHelixTags(logicalTableName);
+ idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertTrue(idealState.getInstanceSet(logicalTableName)
+
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+ // Cleanup
+ _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+ _helixResourceManager.deleteRealtimeTable(TEST_TABLE_NAME);
+ // Remove the manually added broker instance so that subsequent tests see
a clean cluster state
+ helixAdmin.removeInstanceTag(getHelixClusterName(),
instanceConfig.getInstanceName(),
+ TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+ instanceConfig.setInstanceEnabled(false);
+ helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
Review Comment:
Similar to the integration test, cleanup is not guaranteed if the test fails
mid-way. Since this test mutates shared Helix cluster state, it should use
`try/finally` to always remove the instance tag / drop the instance / delete
configs to avoid cross-test interference.
##########
pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerStatelessTest.java:
##########
@@ -100,6 +104,62 @@ public void testRebuildBrokerResourceWhenBrokerAdded()
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
}
+ /**
+ * Verifies that rebuildBrokerResourceFromHelixTags works for a logical
table partition when a new broker
+ * is added manually and the ideal state is out of sync (Issue #15751).
+ */
+ @Test
+ public void testRebuildBrokerResourceWhenBrokerAddedForLogicalTable()
+ throws Exception {
+ // Add realtime table so we can create a logical table with both offline
and realtime
+ TableConfig realtimeTableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+ _helixResourceManager.addTable(realtimeTableConfig);
+
+ String logicalTableName = "test_logical_rebuild";
+ addDummySchema(logicalTableName);
+ List<String> physicalTableNamesWithType =
+ List.of(TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME),
+ TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME));
+ LogicalTableConfig logicalTableConfig =
+ ControllerTest.getDummyLogicalTableConfig(logicalTableName,
physicalTableNamesWithType,
+ TagNameUtils.DEFAULT_TENANT_NAME);
+ addLogicalTableConfig(logicalTableConfig);
+
+ HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+ IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertTrue(idealState.getPartitionSet().contains(logicalTableName));
+
+ // Add a new broker manually so the logical table partition is missing it
+ final String newBrokerId = "Broker_localhost_3";
+ InstanceConfig instanceConfig = new InstanceConfig(newBrokerId);
+ instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setHostName("Broker_localhost");
+ instanceConfig.setPort("3");
+ helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+ helixAdmin.addInstanceTag(getHelixClusterName(),
instanceConfig.getInstanceName(),
+ TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+
+ idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertFalse(idealState.getInstanceSet(logicalTableName)
+
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+ _helixResourceManager.rebuildBrokerResourceFromHelixTags(logicalTableName);
+ idealState = HelixHelper.getBrokerIdealStates(helixAdmin,
getHelixClusterName());
+ Assert.assertTrue(idealState.getInstanceSet(logicalTableName)
+
.equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+
+ // Cleanup
+ _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
+ _helixResourceManager.deleteRealtimeTable(TEST_TABLE_NAME);
+ // Remove the manually added broker instance so that subsequent tests see
a clean cluster state
+ helixAdmin.removeInstanceTag(getHelixClusterName(),
instanceConfig.getInstanceName(),
+ TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
+ instanceConfig.setInstanceEnabled(false);
+ helixAdmin.dropInstance(getHelixClusterName(), instanceConfig);
Review Comment:
Similar to the integration test, cleanup is not guaranteed if the test fails
mid-way. Since this test mutates shared Helix cluster state, it should use
`try/finally` to always remove the instance tag / drop the instance / delete
configs to avoid cross-test interference.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]