This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 16ae07661b Handle remove build routing for logical tables. (#15862)
16ae07661b is described below
commit 16ae07661b04672c7ed7e44dcbf5ca185613ef76
Author: Abhishek Bafna <[email protected]>
AuthorDate: Mon May 26 19:11:21 2025 +0530
Handle remove build routing for logical tables. (#15862)
---
.../broker/api/resources/PinotBrokerRouting.java | 21 ++++++++--
...okerResourceOnlineOfflineStateModelFactory.java | 30 +++++++++-----
.../pinot/broker/routing/BrokerRoutingManager.java | 47 ++++++++++++++++++++--
.../BaseLogicalTableIntegrationTest.java | 1 +
...hTwoOfflineOneRealtimeTableIntegrationTest.java | 8 +++-
.../timeboundary/MinTimeBoundaryStrategy.java | 8 ++++
.../query/timeboundary/TimeBoundaryStrategy.java | 9 +++++
7 files changed, 105 insertions(+), 19 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java
index d36bfd9500..546e155cf0 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerRouting.java
@@ -36,7 +36,9 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
+import org.apache.helix.HelixManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
@@ -61,6 +63,9 @@ public class PinotBrokerRouting {
@Inject
BrokerRoutingManager _routingManager;
+ @Inject
+ private HelixManager _helixManager;
+
@PUT
@Produces(MediaType.TEXT_PLAIN)
@Path("/routing/{tableName}")
@@ -71,9 +76,13 @@ public class PinotBrokerRouting {
@ApiResponse(code = 500, message = "Internal server error")
})
public String buildRouting(
- @ApiParam(value = "Table name (with type)") @PathParam("tableName")
String tableNameWithType,
+ @ApiParam(value = "Table name (with type)") @PathParam("tableName")
String physicalOrLogicalTableName,
@Context HttpHeaders headers) {
-
_routingManager.buildRouting(DatabaseUtils.translateTableName(tableNameWithType,
headers));
+ if
(ZKMetadataProvider.isLogicalTableExists(_helixManager.getHelixPropertyStore(),
physicalOrLogicalTableName)) {
+ _routingManager.buildRoutingForLogicalTable(physicalOrLogicalTableName);
+ } else {
+
_routingManager.buildRouting(DatabaseUtils.translateTableName(physicalOrLogicalTableName,
headers));
+ }
return "Success";
}
@@ -104,9 +113,13 @@ public class PinotBrokerRouting {
@ApiResponse(code = 500, message = "Internal server error")
})
public String removeRouting(
- @ApiParam(value = "Table name (with type)") @PathParam("tableName")
String tableNameWithType,
+ @ApiParam(value = "Table name (with type)") @PathParam("tableName")
String physicalOrLogicalTableName,
@Context HttpHeaders headers) {
-
_routingManager.removeRouting(DatabaseUtils.translateTableName(tableNameWithType,
headers));
+ if
(ZKMetadataProvider.isLogicalTableExists(_helixManager.getHelixPropertyStore(),
physicalOrLogicalTableName)) {
+ _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTableName);
+ } else {
+
_routingManager.removeRouting(DatabaseUtils.translateTableName(physicalOrLogicalTableName,
headers));
+ }
return "Success";
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index dcf8a667e1..cc2f2f406a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -97,14 +97,19 @@ public class BrokerResourceOnlineOfflineStateModelFactory
extends StateModelFact
@Transition(from = "ONLINE", to = "OFFLINE")
public void onBecomeOfflineFromOnline(Message message, NotificationContext
context) {
- String tableNameWithType = message.getPartitionName();
- LOGGER.info("Processing transition from ONLINE to OFFLINE for table:
{}", tableNameWithType);
+ String physicalOrLogicalTable = message.getPartitionName();
+ LOGGER.info("Processing transition from ONLINE to OFFLINE for table:
{}", physicalOrLogicalTable);
try {
- _routingManager.removeRouting(tableNameWithType);
- _queryQuotaManager.dropTableQueryQuota(tableNameWithType);
+ if (ZKMetadataProvider.isLogicalTableExists(_propertyStore,
physicalOrLogicalTable)) {
+ _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTable);
+ _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable);
+ } else {
+ _routingManager.removeRouting(physicalOrLogicalTable);
+ _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable);
+ }
} catch (Exception e) {
LOGGER.error("Caught exception while processing transition from ONLINE
to OFFLINE for table: {}",
- tableNameWithType, e);
+ physicalOrLogicalTable, e);
throw e;
}
}
@@ -116,14 +121,19 @@ public class BrokerResourceOnlineOfflineStateModelFactory
extends StateModelFact
@Transition(from = "ONLINE", to = "DROPPED")
public void onBecomeDroppedFromOnline(Message message, NotificationContext
context) {
- String tableNameWithType = message.getPartitionName();
- LOGGER.info("Processing transition from ONLINE to DROPPED for table:
{}", tableNameWithType);
+ String physicalOrLogicalTable = message.getPartitionName();
+ LOGGER.info("Processing transition from ONLINE to DROPPED for table:
{}", physicalOrLogicalTable);
try {
- _routingManager.removeRouting(tableNameWithType);
- _queryQuotaManager.dropTableQueryQuota(tableNameWithType);
+ if (ZKMetadataProvider.isLogicalTableExists(_propertyStore,
physicalOrLogicalTable)) {
+ _routingManager.removeRoutingForLogicalTable(physicalOrLogicalTable);
+ _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable);
+ } else {
+ _routingManager.removeRouting(physicalOrLogicalTable);
+ _queryQuotaManager.dropTableQueryQuota(physicalOrLogicalTable);
+ }
} catch (Exception e) {
LOGGER.error("Caught exception while processing transition from ONLINE
to DROPPED for table: {}",
- tableNameWithType, e);
+ physicalOrLogicalTable, e);
throw e;
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 801b39055b..e2c9045bef 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -66,6 +66,8 @@ import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -437,10 +439,11 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
TimeBoundaryConfig timeBoundaryConfig =
logicalTableConfig.getTimeBoundaryConfig();
Preconditions.checkArgument(timeBoundaryConfig.getBoundaryStrategy().equals("min"),
"Invalid time boundary strategy: %s",
timeBoundaryConfig.getBoundaryStrategy());
- List<String> includedTables =
- (List<String>)
timeBoundaryConfig.getParameters().getOrDefault("includedTables", List.of());
+ TimeBoundaryStrategy timeBoundaryStrategy =
+
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(timeBoundaryConfig.getBoundaryStrategy());
+ List<String> timeBoundaryTableNames =
timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig);
- for (String tableNameWithType : includedTables) {
+ for (String tableNameWithType : timeBoundaryTableNames) {
Preconditions.checkArgument(TableNameBuilder.isOfflineTableResource(tableNameWithType),
"Invalid table in the time boundary config: %s", tableNameWithType);
try {
@@ -653,6 +656,44 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
}
+ public synchronized void removeRoutingForLogicalTable(String
logicalTableName) {
+ LOGGER.info("Removing time boundary manager for logical table: {}",
logicalTableName);
+ LogicalTableConfig logicalTableConfig =
+ ZKMetadataProvider.getLogicalTableConfig(_propertyStore,
logicalTableName);
+ Preconditions.checkState(logicalTableConfig != null, "Failed to find
logical table config for: %s",
+ logicalTableName);
+ if (!logicalTableConfig.isHybridLogicalTable()) {
+ LOGGER.info("Skip removing time boundary manager for non hybrid logical
table: {}", logicalTableName);
+ return;
+ }
+ String strategy =
logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy();
+ TimeBoundaryStrategy timeBoundaryStrategy =
+
TimeBoundaryStrategyService.getInstance().getTimeBoundaryStrategy(strategy);
+ List<String> timeBoundaryTableNames =
timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig);
+ for (String tableNameWithType : timeBoundaryTableNames) {
+
+ if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ LOGGER.info("Skipping removing time boundary manager for real-time
table: {}", tableNameWithType);
+ continue;
+ }
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+ if (_routingEntryMap.containsKey(realtimeTableName)) {
+ LOGGER.info("Skipping removing time boundary manager for hybrid
physical table: {}", rawTableName);
+ continue;
+ }
+
+ RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
+ if (routingEntry != null) {
+ routingEntry.setTimeBoundaryManager(null);
+ LOGGER.info("Removed time boundary manager for table: {}",
tableNameWithType);
+ } else {
+ LOGGER.warn("Routing does not exist for table: {}, skipping",
tableNameWithType);
+ }
+ }
+ }
+
/**
* Refreshes the metadata for the given segment (called when segment is
getting refreshed).
*/
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 3f2bffd888..29c6f05516 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -110,6 +110,7 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
if (_sharedClusterTestSuite != this) {
_controllerRequestURLBuilder =
_sharedClusterTestSuite._controllerRequestURLBuilder;
_helixResourceManager = _sharedClusterTestSuite._helixResourceManager;
+ _kafkaStarters = _sharedClusterTestSuite._kafkaStarters;
}
_avroFiles = getAllAvroFiles();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
index d406b2ad56..ba594a303c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest.java
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategy;
+import org.apache.pinot.query.timeboundary.TimeBoundaryStrategyService;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;
@@ -62,8 +64,9 @@ public class
LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B
private void updateTimeBoundaryTableInLogicalTable(LogicalTableConfig
logicalTableConfig)
throws IOException {
- List<String> includedTables =
- (List<String>)
logicalTableConfig.getTimeBoundaryConfig().getParameters().get("includedTables");
+ TimeBoundaryStrategy timeBoundaryStrategy =
TimeBoundaryStrategyService.getInstance()
+
.getTimeBoundaryStrategy(logicalTableConfig.getTimeBoundaryConfig().getBoundaryStrategy());
+ List<String> includedTables =
timeBoundaryStrategy.getTimeBoundaryTableNames(logicalTableConfig);
String timeBoundaryTableName =
TableNameBuilder.extractRawTableName(includedTables.get(0));
String newTimeBoundaryTableName = timeBoundaryTableName.equals("o_1") ?
"o_2" : "o_1";
@@ -71,6 +74,7 @@ public class
LogicalTableWithTwoOfflineOneRealtimeTableIntegrationTest extends B
Map<String, Object> parameters = Map.of("includedTables",
List.of(newTimeBoundaryTableName));
logicalTableConfig.getTimeBoundaryConfig().setParameters(parameters);
+ logicalTableConfig.setQueryConfig(null);
updateLogicalTableConfig(logicalTableConfig.getTableName(),
logicalTableConfig);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
index 5d3c98596b..04be7d3cdf 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
@@ -36,6 +36,8 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@AutoService(TimeBoundaryStrategy.class)
public class MinTimeBoundaryStrategy implements TimeBoundaryStrategy {
+ public static final String INCLUDED_TABLES = "includedTables";
+
@Override
public String getName() {
return "min";
@@ -76,4 +78,10 @@ public class MinTimeBoundaryStrategy implements
TimeBoundaryStrategy {
}
return minTimeBoundaryInfo;
}
+
+ @Override
+ public List<String> getTimeBoundaryTableNames(LogicalTableConfig
logicalTableConfig) {
+ Map<String, Object> parameters =
logicalTableConfig.getTimeBoundaryConfig().getParameters();
+ return parameters != null ? (List)
parameters.getOrDefault(INCLUDED_TABLES, List.of()) : List.of();
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
index c1b97f28c5..7a4ee21794 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/timeboundary/TimeBoundaryStrategy.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.timeboundary;
+import java.util.List;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -43,4 +44,12 @@ public interface TimeBoundaryStrategy {
*/
TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig,
TableCache tableCache,
RoutingManager routingManager);
+
+
+ /**
+ * Returns the list of physical table names that are part of the time
boundary.
+ * @param logicalTableConfig The logical table configuration
+ * @return The list of physical table names that are part of the time
boundary.
+ */
+ List<String> getTimeBoundaryTableNames(LogicalTableConfig
logicalTableConfig);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]