Copilot commented on code in PR #17291: URL: https://github.com/apache/pinot/pull/17291#discussion_r2870557223
########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java: ########## @@ -0,0 +1,585 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.querylog.QueryLogger; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.client.ConnectionTimeouts; +import org.apache.pinot.client.PinotClientException; +import org.apache.pinot.client.SystemTableDataTableClient; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTableImplV4; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.SystemTableRegistry; +import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.instance.context.BrokerContext; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.plan.Plan; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.reduce.BrokerReduceService; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.routing.MultiClusterRoutingContext; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.accounting.ThreadAccountant; +import org.apache.pinot.spi.auth.AuthorizationResult; +import org.apache.pinot.spi.auth.broker.RequesterIdentity; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Broker request handler for system tables (handled entirely on the broker). + * <p> + * System tables are virtual tables that expose cluster metadata (e.g., system.tables, system.instances). + * They are executed using the v1 query engine against in-memory segments generated on-demand by + * {@link SystemTableProvider} implementations. + */ +public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class); + // Sentinel values used for broker-local execution of system table queries where no actual server routing is needed. + private static final String SYSTEM_TABLE_PSEUDO_HOST = "localhost"; + private static final int SYSTEM_TABLE_PSEUDO_PORT = 0; + private static final String SYSTEM_TABLE_DATATABLE_API_PATH = "/query/systemTable/datatable"; + // Hop-by-hop headers per RFC 7230 plus content-length/host which are request-specific. + private static final Set<String> HOP_BY_HOP_HEADERS_TO_SKIP = Set.of( + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailer", + "transfer-encoding", + "upgrade", + "host", + "content-length"); + + private final SystemTableRegistry _systemTableRegistry; + private final BrokerReduceService _brokerReduceService; + private final PlanMaker _planMaker; + private final ExecutorService _executorService; + private final ExecutorService _scatterGatherExecutorService; + private final SystemTableDataTableClient _systemTableDataTableClient; + @Nullable + private final HelixManager _helixManager; + + public SystemTableBrokerRequestHandler(PinotConfiguration config, String brokerId, + BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + SystemTableRegistry systemTableRegistry, ThreadAccountant threadAccountant, + @Nullable MultiClusterRoutingContext multiClusterRoutingContext, @Nullable HelixManager helixManager) { + super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache, + threadAccountant, multiClusterRoutingContext); + _systemTableRegistry = systemTableRegistry; + _brokerReduceService = new BrokerReduceService(_config); + _planMaker = new InstancePlanMakerImplV2(); + _planMaker.init(_config); + _helixManager = helixManager; + int executorPoolSize = config.getProperty(CommonConstants.Broker.CONFIG_OF_SYSTEM_TABLE_EXECUTOR_POOL_SIZE, + CommonConstants.Broker.DEFAULT_SYSTEM_TABLE_EXECUTOR_POOL_SIZE); + executorPoolSize = Math.max(1, executorPoolSize); + _executorService = QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize, + new NamedThreadFactory("system-table-query-executor"))); + _scatterGatherExecutorService = + QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize, + new NamedThreadFactory("system-table-scatter-gather-executor"))); + SSLContext sslContext = BrokerContext.getInstance().getClientHttpsContext(); + int timeoutMs; + if (_brokerTimeoutMs > Integer.MAX_VALUE) { + LOGGER.warn("Broker timeout {}ms exceeds Integer.MAX_VALUE; clamping to {} for system table client connections", + _brokerTimeoutMs, Integer.MAX_VALUE); + timeoutMs = Integer.MAX_VALUE; + } else if (_brokerTimeoutMs < 1) { + LOGGER.warn("Broker timeout {}ms is non-positive; using default 5000ms for system table client connections", + _brokerTimeoutMs); + timeoutMs = 5000; + } else { + timeoutMs = (int) _brokerTimeoutMs; + } + ConnectionTimeouts connectionTimeouts = ConnectionTimeouts.create(timeoutMs, timeoutMs, timeoutMs); + _systemTableDataTableClient = + new SystemTableDataTableClient(connectionTimeouts, sslContext); + } + + @Override + public void start() { + } + + @Override + public void shutDown() { + _executorService.shutdownNow(); + _scatterGatherExecutorService.shutdownNow(); + try { + _systemTableDataTableClient.close(); + } catch (Exception e) { + LOGGER.debug("Failed to close system table data table client: {}", e.toString()); + } + _brokerReduceService.shutDown(); + } + + public boolean canHandle(String tableName) { + return isSystemTable(tableName) && _systemTableRegistry.isRegistered(tableName); + } + + @Override + protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) + throws Exception { + long startTimeMs = requestContext.getRequestArrivalTimeMillis(); + long deadlineMs = startTimeMs + _brokerTimeoutMs; + QueryExecutionContext executionContext = + new QueryExecutionContext(QueryExecutionContext.QueryType.STE, requestId, Long.toString(requestId), + QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()), startTimeMs, deadlineMs, deadlineMs, + _brokerId, _brokerId, org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_QUERY_HASH); + try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, _threadAccountant)) { + PinotQuery pinotQuery; + try { + pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); + } catch (Exception e) { + requestContext.setErrorCode(QueryErrorCode.SQL_PARSING); + return new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e.getMessage()); + } + + Set<String> tableNames = RequestUtils.getTableNames(pinotQuery); + if (tableNames == null || tableNames.isEmpty()) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Failed to extract table name"); + } + if (tableNames.size() != 1) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "System tables do not support joins"); + } + String tableName = tableNames.iterator().next(); + if (!isSystemTable(tableName)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Not a system table query"); + } + AuthorizationResult authorizationResult = + hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, httpHeaders); + if (!authorizationResult.hasAccess()) { + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + return new BrokerResponseNative(QueryErrorCode.ACCESS_DENIED, authorizationResult.getFailureMessage()); + } + + boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query); + return handleSystemTableQuery(request, pinotQuery, tableName, requestContext, requesterIdentity, query, + httpHeaders, queryWasLogged); + } + } + + @Override + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) { + return false; + } + + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) + throws Exception { + return false; + } + + @Override + public Map<Long, String> getRunningQueries() { + return Collections.emptyMap(); + } + + @Override + public OptionalLong getRequestIdByClientId(String clientQueryId) { + return OptionalLong.empty(); + } + + private boolean isSystemTable(String tableName) { + return tableName != null && tableName.toLowerCase(Locale.ROOT).startsWith("system."); + } + + /** + * Executes a system table query against the local broker and returns the raw {@link DataTable} results. + * <p> + * This method is used by the internal broker-to-broker scatter-gather endpoint and must never perform fanout. + * It is invoked when a broker receives a scatter-gather request from another broker for a system table query. + * + * @param request the JSON request containing the SQL query and options + * @param requesterIdentity the identity of the requester for authorization + * @param requestContext the request context for tracking + * @param httpHeaders the HTTP headers from the request + * @return a DataTable containing the query results from this broker's local shard + */ + public DataTable handleSystemTableDataTableRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity, + RequestContext requestContext, @Nullable HttpHeaders httpHeaders) { + long startTimeMs = requestContext.getRequestArrivalTimeMillis(); + if (startTimeMs <= 0) { + startTimeMs = System.currentTimeMillis(); + requestContext.setRequestArrivalTimeMillis(startTimeMs); + } + long requestId = _requestIdGenerator.get(); + long deadlineMs = startTimeMs + _brokerTimeoutMs; + + JsonNode sql = request.get(CommonConstants.Broker.Request.SQL); + if (sql == null || !sql.isTextual()) { + return exceptionDataTable(QueryErrorCode.JSON_PARSING, "Failed to find 'sql' in the request: " + request); + } + String query = sql.textValue(); + requestContext.setQuery(query); + + SqlNodeAndOptions sqlNodeAndOptions; + try { + sqlNodeAndOptions = RequestUtils.parseQuery(query, request); + } catch (Exception e) { + requestContext.setErrorCode(QueryErrorCode.SQL_PARSING); + return exceptionDataTable(QueryErrorCode.SQL_PARSING, e.getMessage()); + } + + QueryExecutionContext executionContext = + new QueryExecutionContext(QueryExecutionContext.QueryType.STE, requestId, Long.toString(requestId), + QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()), startTimeMs, deadlineMs, deadlineMs, + _brokerId, _brokerId, org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_QUERY_HASH); + try (QueryThreadContext ignore = QueryThreadContext.open(executionContext, _threadAccountant)) { + AccessControl accessControl = _accessControlFactory.create(); + AuthorizationResult authorizationResult = accessControl.authorize(requesterIdentity); + if (!authorizationResult.hasAccess()) { + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + return exceptionDataTable(QueryErrorCode.ACCESS_DENIED, authorizationResult.getFailureMessage()); + } + + PinotQuery pinotQuery; + try { + pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); + } catch (Exception e) { + requestContext.setErrorCode(QueryErrorCode.SQL_PARSING); + return exceptionDataTable(QueryErrorCode.SQL_PARSING, e.getMessage()); + } + + Set<String> tableNames = RequestUtils.getTableNames(pinotQuery); + if (tableNames == null || tableNames.isEmpty()) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "Failed to extract table name"); + } + if (tableNames.size() != 1) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "System tables do not support joins"); + } + String tableName = tableNames.iterator().next(); + if (!isSystemTable(tableName)) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, "Not a system table query"); + } + + AuthorizationResult tableAuthorizationResult = + hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, httpHeaders); + if (!tableAuthorizationResult.hasAccess()) { + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + return exceptionDataTable(QueryErrorCode.ACCESS_DENIED, tableAuthorizationResult.getFailureMessage()); + } + + SystemTableProvider provider = _systemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + Collection<SystemTableProvider> availableProviders = _systemTableRegistry.getProviders(); + String availableTables = availableProviders.isEmpty() + ? "No system tables are available" + : "Available system tables: " + + availableProviders.stream().map(SystemTableProvider::getTableName) + .collect(java.util.stream.Collectors.joining(", ")); + return exceptionDataTable(QueryErrorCode.TABLE_DOES_NOT_EXIST, + "System table does not exist: " + tableName + ". " + availableTables); + } + + try { + return executeLocalSystemTableQuery(pinotQuery, provider); + } catch (BadQueryRequestException e) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return exceptionDataTable(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); + } catch (Exception e) { + LOGGER.warn("Caught exception while handling system table datatable query {}: {}", tableName, e.getMessage(), + e); + requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION); + return exceptionDataTable(QueryErrorCode.QUERY_EXECUTION, e.getMessage()); + } + } + } + + private BrokerResponse handleSystemTableQuery(JsonNode request, PinotQuery pinotQuery, String tableName, + RequestContext requestContext, @Nullable RequesterIdentity requesterIdentity, String query, + @Nullable HttpHeaders httpHeaders, boolean queryWasLogged) { + if (pinotQuery.isExplain()) { + return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; + } + SystemTableProvider provider = _systemTableRegistry.get(tableName); + if (provider == null) { + requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); + Collection<SystemTableProvider> availableProviders = _systemTableRegistry.getProviders(); + String availableTables = availableProviders.isEmpty() + ? "No system tables are available" + : "Available system tables: " + + availableProviders.stream().map(SystemTableProvider::getTableName) + .collect(java.util.stream.Collectors.joining(", ")); + return new BrokerResponseNative(QueryErrorCode.TABLE_DOES_NOT_EXIST, + "System table does not exist: " + tableName + ". " + availableTables); + } + try { + long deadlineMs = requestContext.getRequestArrivalTimeMillis() + _brokerTimeoutMs; + Map<ServerRoutingInstance, DataTable> dataTableMap; + if (provider.getExecutionMode() == SystemTableProvider.ExecutionMode.BROKER_SCATTER_GATHER) { + dataTableMap = + scatterGatherSystemTableDataTables(provider, pinotQuery, tableName, request, httpHeaders, deadlineMs); + } else { + dataTableMap = new HashMap<>(1); + // Use a synthetic routing instance for broker-local execution of system table queries. + dataTableMap.put(new ServerRoutingInstance(SYSTEM_TABLE_PSEUDO_HOST, SYSTEM_TABLE_PSEUDO_PORT, + TableType.OFFLINE), executeLocalSystemTableQuery(pinotQuery, provider)); + } + + BrokerResponseNative brokerResponse; + BrokerRequest brokerRequest = new BrokerRequest(); + QuerySource querySource = new QuerySource(); + querySource.setTableName(tableName); + brokerRequest.setQuerySource(querySource); + brokerRequest.setPinotQuery(pinotQuery); + brokerResponse = _brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, + _brokerTimeoutMs, _brokerMetrics); + brokerResponse.setTablesQueried(Set.of(TableNameBuilder.extractRawTableName(tableName))); + brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis()); + _queryLogger.logQueryCompleted( + new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, + QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, requesterIdentity, null), + queryWasLogged); + return brokerResponse; + } catch (BadQueryRequestException e) { + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); + } catch (Exception e) { + LOGGER.warn("Caught exception while handling system table query {}: {}", tableName, e.getMessage(), e); + requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION); + return new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, e.getMessage()); + } + } + + private DataTable executeLocalSystemTableQuery(PinotQuery pinotQuery, SystemTableProvider provider) + throws Exception { + IndexSegment dataSource = provider.getDataSource(); + try { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); + queryContext.setSchema(provider.getSchema()); + queryContext.setEndTimeMs(System.currentTimeMillis() + _brokerTimeoutMs); + + // Pass null for serverMetrics because system table queries run broker-local against an in-memory IndexSegment. + Plan plan = _planMaker.makeInstancePlan(List.of(new SegmentContext(dataSource)), queryContext, _executorService, + null); + InstanceResponseBlock instanceResponse = plan.execute(); + return instanceResponse.toDataTable(); + } finally { + dataSource.destroy(); + } + } + + private static final class BrokerTarget { + final ServerRoutingInstance _routingInstance; + final String _dataTableUrl; + + BrokerTarget(ServerRoutingInstance routingInstance, String dataTableUrl) { + _routingInstance = routingInstance; + _dataTableUrl = dataTableUrl; + } + } + + @VisibleForTesting + protected Map<ServerRoutingInstance, DataTable> scatterGatherSystemTableDataTables(SystemTableProvider provider, + PinotQuery pinotQuery, String tableName, JsonNode request, @Nullable HttpHeaders httpHeaders, long deadlineMs) { + if (_helixManager == null) { + throw new IllegalStateException( + "HelixManager is required for scatter-gather execution of system table: " + tableName); + } + + HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor(); + List<String> liveInstances = dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()); + if (liveInstances == null || liveInstances.isEmpty()) { + throw new IllegalStateException("No live instances found for scatter-gather execution of system table: " + + tableName); + } + + String localInstanceId = _brokerId; + List<BrokerTarget> remoteTargets = new ArrayList<>(); + @Nullable ServerRoutingInstance localRoutingInstance = null; + for (String instanceId : liveInstances) { + if (!InstanceTypeUtils.isBroker(instanceId)) { + continue; + } + InstanceConfig instanceConfig = dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(instanceId)); + if (instanceConfig == null) { + LOGGER.warn("Instance config not found for broker instance: {}. This instance will be skipped in scatter-gather" + + " execution for system table: {}", instanceId, tableName); + continue; + } + URI baseUri = URI.create(InstanceUtils.getInstanceBaseUri(instanceConfig)); + ServerRoutingInstance routingInstance = new ServerRoutingInstance(baseUri.getHost(), baseUri.getPort(), + TableType.OFFLINE); + if (instanceId.equals(localInstanceId)) { + localRoutingInstance = routingInstance; + } else { + remoteTargets.add(new BrokerTarget(routingInstance, baseUri.toString() + SYSTEM_TABLE_DATATABLE_API_PATH)); + } + } + + Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(remoteTargets.size() + 1); + ServerRoutingInstance routingInstance; + if (localRoutingInstance != null) { + routingInstance = localRoutingInstance; + } else { + routingInstance = new ServerRoutingInstance(SYSTEM_TABLE_PSEUDO_HOST, SYSTEM_TABLE_PSEUDO_PORT, + TableType.OFFLINE); + } + try { + dataTableMap.put(routingInstance, executeLocalSystemTableQuery(pinotQuery, provider)); + } catch (Exception e) { + LOGGER.error("Failed to execute system table query locally for query: {}", pinotQuery, e); + dataTableMap.put(routingInstance, + exceptionDataTable(QueryErrorCode.QUERY_EXECUTION, "Failed to execute system table query locally: " + + e.getMessage())); + } + + if (remoteTargets.isEmpty()) { + return dataTableMap; + } + + String requestBody = request.toString(); + @Nullable Map<String, String> requestHeaders = toSingleValueRequestHeaders(httpHeaders); + if (requestHeaders == null) { + requestHeaders = new HashMap<>(); + } + requestHeaders.putIfAbsent("Content-Type", MediaType.APPLICATION_JSON); + Map<String, String> requestHeadersFinal = requestHeaders; + List<Pair<BrokerTarget, Future<DataTable>>> futures = new ArrayList<>(remoteTargets.size()); + for (BrokerTarget target : remoteTargets) { + Future<DataTable> future = + _scatterGatherExecutorService.submit(() -> fetchDataTableFromBroker(target._dataTableUrl, requestBody, + requestHeadersFinal)); + futures.add(Pair.of(target, future)); + } + + long remainingMs = Math.max(1, deadlineMs - System.currentTimeMillis()); + for (Pair<BrokerTarget, Future<DataTable>> pair : futures) { + BrokerTarget target = pair.getLeft(); + try { + dataTableMap.put(target._routingInstance, pair.getRight().get(remainingMs, TimeUnit.MILLISECONDS)); Review Comment: In scatter-gather execution, `remainingMs` is computed once before waiting on all futures, then reused for each `Future#get`. This can exceed the overall `deadlineMs` by up to (numBrokers × remainingMs) because waits are sequential. Recompute the remaining time before each `get()` (and short-circuit once the deadline is reached) so the query respects the configured broker timeout end-to-end. ```suggestion for (Pair<BrokerTarget, Future<DataTable>> pair : futures) { BrokerTarget target = pair.getLeft(); long remainingMs = deadlineMs - System.currentTimeMillis(); if (remainingMs <= 0) { pair.getRight().cancel(true); dataTableMap.put(target._routingInstance, exceptionDataTable(QueryErrorCode.BROKER_TIMEOUT, "Timed out waiting for system table response from " + target._dataTableUrl)); continue; } try { dataTableMap.put(target._routingInstance, pair.getRight().get(Math.max(1, remainingMs), TimeUnit.MILLISECONDS)); ``` ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/datasource/InMemorySystemTableSegment.java: ########## @@ -0,0 +1,734 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable.datasource; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.IntFunction; +import javax.annotation.Nullable; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; +import org.apache.pinot.segment.spi.index.multicolumntext.MultiColumnTextMetadata; +import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.segment.spi.index.reader.BloomFilterReader; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.segment.spi.index.reader.H3IndexReader; +import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader; +import org.apache.pinot.segment.spi.index.reader.RangeIndexReader; +import org.apache.pinot.segment.spi.index.reader.TextIndexReader; +import org.apache.pinot.segment.spi.index.reader.VectorIndexReader; +import org.apache.pinot.segment.spi.index.startree.StarTreeV2; +import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata; +import org.apache.pinot.segment.spi.partition.PartitionFunction; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.joda.time.Duration; +import org.joda.time.Interval; + + +/** + * In-memory {@link IndexSegment} implementation intended for system table queries. + * <p> + * This segment is backed by per-column value functions (docId -> value) and exposes raw forward indexes (no + * dictionaries/inverted indexes) so that the standard v1 query engine can operate on it. + */ +public final class InMemorySystemTableSegment implements IndexSegment { + private final String _segmentName; + private final Schema _schema; + private final int _numDocs; + private final Map<String, IntFunction<Object>> _valueProvidersByColumn; + private final Map<String, DataSource> _dataSourcesByColumn; + private final SegmentMetadata _segmentMetadata; + + public InMemorySystemTableSegment(String segmentName, Schema schema, int numDocs, + Map<String, IntFunction<Object>> valueProvidersByColumn) { + _segmentName = segmentName; + _schema = schema; + _numDocs = numDocs; + _valueProvidersByColumn = new HashMap<>(valueProvidersByColumn); + _dataSourcesByColumn = new HashMap<>(schema.getColumnNames().size()); + for (String column : schema.getColumnNames()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(column); + if (fieldSpec == null) { + continue; + } + IntFunction<Object> provider = _valueProvidersByColumn.get(column); + if (provider == null) { + provider = docId -> defaultValue(fieldSpec.getDataType()); + _valueProvidersByColumn.put(column, provider); + } + _dataSourcesByColumn.put(column, new FunctionBasedDataSource(fieldSpec, numDocs, provider)); + } + _segmentMetadata = new InMemorySegmentMetadata(segmentName, schema, numDocs); + } + + @Override + public String getSegmentName() { + return _segmentName; + } + + @Override + public SegmentMetadata getSegmentMetadata() { + return _segmentMetadata; + } + + @Override + public Set<String> getColumnNames() { + return _schema.getColumnNames(); + } + + @Override + public Set<String> getPhysicalColumnNames() { + return _schema.getPhysicalColumnNames(); + } + + @Nullable + @Override + public DataSource getDataSourceNullable(String column) { + return _dataSourcesByColumn.get(column); + } + + @Override + public DataSource getDataSource(String column, Schema schema) { + DataSource dataSource = getDataSourceNullable(column); + if (dataSource != null) { + return dataSource; + } + throw new IllegalStateException("Failed to find data source for column: " + column); + } + + @Nullable + @Override + public List<StarTreeV2> getStarTrees() { + return null; + } + + @Nullable + @Override + public TextIndexReader getMultiColumnTextIndex() { + return null; + } + + @Nullable + @Override + public ThreadSafeMutableRoaringBitmap getValidDocIds() { + return null; + } + + @Nullable + @Override + public ThreadSafeMutableRoaringBitmap getQueryableDocIds() { + return null; + } + + @Override + public GenericRow getRecord(int docId, GenericRow reuse) { + GenericRow row = reuse != null ? reuse : new GenericRow(); + row.getFieldToValueMap().clear(); + for (String column : _schema.getColumnNames()) { + row.putValue(column, getValue(docId, column)); + } + return row; + } + + @Override + public Object getValue(int docId, String column) { + IntFunction<Object> provider = _valueProvidersByColumn.get(column); + if (provider == null) { + return null; + } + return provider.apply(docId); + } + + @Override + public void offload() { + } + + @Override + public void destroy() { + } + + private static Object defaultValue(FieldSpec.DataType dataType) { + switch (dataType) { + case INT: + return 0; + case LONG: + return 0L; + case FLOAT: + return 0.0f; + case DOUBLE: + return 0.0d; + case BIG_DECIMAL: + return BigDecimal.ZERO; + case BOOLEAN: + return false; + case STRING: + return ""; + case BYTES: + return new byte[0]; + default: + return null; + } + } + + private static final class InMemorySegmentMetadata implements SegmentMetadata { + private final String _segmentName; + private final Schema _schema; + private final int _totalDocs; + private final TreeMap<String, ColumnMetadata> _columnMetadataMap; + + private InMemorySegmentMetadata(String segmentName, Schema schema, int totalDocs) { + _segmentName = segmentName; + _schema = schema; + _totalDocs = totalDocs; + _columnMetadataMap = new TreeMap<>(); + for (String column : schema.getColumnNames()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(column); + if (fieldSpec != null) { + _columnMetadataMap.put(column, new InMemoryColumnMetadata(fieldSpec, totalDocs)); + } + } + } + + @Deprecated + @Override + public String getTableName() { + return _schema.getSchemaName(); + } + + @Override + public String getName() { + return _segmentName; + } + + @Override + public String getTimeColumn() { + return ""; + } + + @Override + public long getStartTime() { + return 0; + } + + @Override + public long getEndTime() { + return 0; + } + + @Override + public TimeUnit getTimeUnit() { + return TimeUnit.MILLISECONDS; + } + + @Override + public Duration getTimeGranularity() { + return Duration.ZERO; + } + + @Override + public Interval getTimeInterval() { + return new Interval(0L, 0L); + } + + @Override + public String getCrc() { + return ""; + } + + @Override + public String getDataCrc() { + return ""; + } + + @Override + public SegmentVersion getVersion() { + return SegmentVersion.v3; + } + + @Override + public Schema getSchema() { + return _schema; + } + + @Override + public int getTotalDocs() { + return _totalDocs; + } + + @Override + public File getIndexDir() { + return new File(""); + } + + @Nullable + @Override + public String getCreatorName() { + return "systemtable"; + } + + @Override + public long getIndexCreationTime() { + return 0; + } + + @Override + public long getLastIndexedTimestamp() { + return 0; + } + + @Override + public long getLatestIngestionTimestamp() { + return Long.MIN_VALUE; + } + + @Override + public long getMinimumIngestionLagMs() { + return Long.MAX_VALUE; + } + + @Nullable + @Override + public List<StarTreeV2Metadata> getStarTreeV2MetadataList() { + return null; + } + + @Nullable + @Override + public MultiColumnTextMetadata getMultiColumnTextMetadata() { + return null; + } + + @Override + public Map<String, String> getCustomMap() { + return Collections.emptyMap(); + } + + @Override + public String getStartOffset() { + return ""; + } + + @Override + public String getEndOffset() { + return ""; + } + + @Override + public TreeMap<String, ColumnMetadata> getColumnMetadataMap() { + return _columnMetadataMap; + } + + @Override + public void removeColumn(String column) { + _columnMetadataMap.remove(column); + } + + @Override + public JsonNode toJson(@Nullable Set<String> columnFilter) { + return JsonNodeFactory.instance.objectNode(); + } + } + + private static final class InMemoryColumnMetadata implements ColumnMetadata { + private final FieldSpec _fieldSpec; + private final int _totalDocs; + + private InMemoryColumnMetadata(FieldSpec fieldSpec, int totalDocs) { + _fieldSpec = fieldSpec; + _totalDocs = totalDocs; + } + + @Override + public FieldSpec getFieldSpec() { + return _fieldSpec; + } + + @Override + public int getTotalDocs() { + return _totalDocs; + } + + @Override + public int getCardinality() { + return Constants.UNKNOWN_CARDINALITY; + } + + @Override + public boolean isSorted() { + return false; + } + + @Override + public Comparable getMinValue() { + return defaultComparableValue(_fieldSpec.getDataType()); + } + + @Override + public Comparable getMaxValue() { + return defaultComparableValue(_fieldSpec.getDataType()); + } + + @Override + public boolean hasDictionary() { + return false; + } + + @Override + public int getColumnMaxLength() { + return 0; + } + + @Override + public int getBitsPerElement() { + return 0; + } + + @Override + public int getMaxNumberOfMultiValues() { + return 0; + } + + @Override + public int getTotalNumberOfEntries() { + return _totalDocs; + } + + @Nullable + @Override + public PartitionFunction getPartitionFunction() { + return null; + } + + @Nullable + @Override + public Set<Integer> getPartitions() { + return null; + } + + @Override + public boolean isAutoGenerated() { + return false; + } + + @Override + public Map<IndexType<?, ?, ?>, Long> getIndexSizeMap() { + return Collections.emptyMap(); + } + + private static Comparable defaultComparableValue(FieldSpec.DataType dataType) { + switch (dataType) { + case INT: + return 0; + case LONG: + return 0L; + case FLOAT: + return 0.0f; + case DOUBLE: + return 0.0d; + case BIG_DECIMAL: + return BigDecimal.ZERO; + case BOOLEAN: + return false; + case STRING: + return ""; + default: + return 0; + } + } + } + + private static final class FunctionBasedDataSource implements DataSource { + private final DataSourceMetadata _metadata; + private final ColumnIndexContainer _indexContainer; + private final ForwardIndexReader<?> _forwardIndex; + + private FunctionBasedDataSource(FieldSpec fieldSpec, int numDocs, IntFunction<Object> valueProvider) { + _metadata = new FunctionBasedDataSourceMetadata(fieldSpec, numDocs); + _indexContainer = ColumnIndexContainer.Empty.INSTANCE; + _forwardIndex = new FunctionBasedForwardIndexReader(fieldSpec.getDataType(), valueProvider); + } + + @Override + public DataSourceMetadata getDataSourceMetadata() { + return _metadata; + } + + @Override + public ColumnIndexContainer getIndexContainer() { + return _indexContainer; + } + + @Override + public <R extends IndexReader> R getIndex(IndexType<?, R, ?> type) { + if (type == null) { + return null; + } + if (StandardIndexes.FORWARD_ID.equals(type.getId())) { + return (R) _forwardIndex; + } + return null; + } + + @Override + public ForwardIndexReader<?> getForwardIndex() { + return _forwardIndex; + } + + @Nullable + @Override + public Dictionary getDictionary() { + return null; + } + + @Nullable + @Override + public InvertedIndexReader<?> getInvertedIndex() { + return null; + } + + @Nullable + @Override + public RangeIndexReader<?> getRangeIndex() { + return null; + } + + @Nullable + @Override + public TextIndexReader getTextIndex() { + return null; + } + + @Nullable + @Override + public TextIndexReader getFSTIndex() { + return null; + } + + @Nullable + @Override + public TextIndexReader getIFSTIndex() { + return null; + } + + @Nullable + @Override + public JsonIndexReader getJsonIndex() { + return null; + } + + @Nullable + @Override + public H3IndexReader getH3Index() { + return null; + } + + @Nullable + @Override + public BloomFilterReader getBloomFilter() { + return null; + } + + @Nullable + @Override + public NullValueVectorReader getNullValueVector() { + return null; + } + + @Nullable + @Override + public VectorIndexReader getVectorIndex() { + return null; + } + } + + private static final class FunctionBasedDataSourceMetadata implements DataSourceMetadata { + private final FieldSpec _fieldSpec; + private final int _numDocs; + + private FunctionBasedDataSourceMetadata(FieldSpec fieldSpec, int numDocs) { + _fieldSpec = fieldSpec; + _numDocs = numDocs; + } + + @Override + public FieldSpec getFieldSpec() { + return _fieldSpec; + } + + @Override + public boolean isSorted() { + return false; + } + + @Override + public int getNumDocs() { + return _numDocs; + } + + @Override + public int getNumValues() { + return _numDocs; + } + + @Override + public int getMaxNumValuesPerMVEntry() { + return -1; + } + + @Nullable + @Override + public Comparable getMinValue() { + return null; + } + + @Nullable + @Override + public Comparable getMaxValue() { + return null; + } + + @Nullable + @Override + public PartitionFunction getPartitionFunction() { + return null; + } + + @Nullable + @Override + public Set<Integer> getPartitions() { + return null; + } + + @Override + public int getCardinality() { + return -1; + } + } + + private static final class FunctionBasedForwardIndexReader implements ForwardIndexReader<ForwardIndexReaderContext> { + private final FieldSpec.DataType _storedType; + private final IntFunction<Object> _valueProvider; + + private FunctionBasedForwardIndexReader(FieldSpec.DataType storedType, IntFunction<Object> valueProvider) { + _storedType = storedType; + _valueProvider = valueProvider; + } + + private static <T> T coerceNumber(Object value, Function<Number, T> numberConverter, Function<String, T> parser) { + return value instanceof Number ? numberConverter.apply((Number) value) : parser.apply(String.valueOf(value)); + } Review Comment: `FunctionBasedForwardIndexReader.coerceNumber()` doesn’t handle null values from the value provider. If a provider returns null for a numeric column, this will try to parse the string "null" and throw a `NumberFormatException` during query execution. Consider treating null as the column default (or rejecting nulls earlier) so providers can safely omit numeric values. ########## pinot-plugins/pinot-system-table/src/main/java/org/apache/pinot/systemtable/provider/InstancesSystemTableProvider.java: ########## @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.systemtable.provider; + +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.client.admin.PinotAdminClient; +import org.apache.pinot.client.admin.PinotAdminTransport; +import org.apache.pinot.common.systemtable.SystemTable; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.SystemTableProviderContext; +import org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing Pinot instance metadata. + */ +@SystemTable +public final class InstancesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(InstancesSystemTableProvider.class); + public static final String TABLE_NAME = "system.instances"; + + private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = "pinot.systemtable.instances.controllerTimeoutMs"; + private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = Duration.ofSeconds(5).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("instanceId", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("host", FieldSpec.DataType.STRING) + .addSingleValueDimension("port", FieldSpec.DataType.INT) + .addSingleValueDimension("state", FieldSpec.DataType.STRING) + .addSingleValueDimension("tags", FieldSpec.DataType.STRING) + .build(); + + private @Nullable HelixAdmin _helixAdmin; + private @Nullable String _clusterName; + private List<String> _configuredControllerUrls = List.of(); + private long _controllerTimeoutMs = DEFAULT_CONTROLLER_TIMEOUT_MS; + private final Map<String, PinotAdminClient> _adminClientCache = new ConcurrentHashMap<>(); + + public InstancesSystemTableProvider() { + } + + @Override + public void init(SystemTableProviderContext context) { + _helixAdmin = context.getHelixAdmin(); + _clusterName = context.getClusterName(); + if (context.getConfig() != null) { + _controllerTimeoutMs = + context.getConfig().getProperty(CONTROLLER_TIMEOUT_MS_PROPERTY, DEFAULT_CONTROLLER_TIMEOUT_MS); + } + } + + /** + * Package-private constructor for testing with overrides. + */ + InstancesSystemTableProvider(@Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable List<String> controllerUrls) { + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _configuredControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public TableConfig getTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + } + + @Override + public void close() + throws Exception { + for (Map.Entry<String, PinotAdminClient> entry : _adminClientCache.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + LOGGER.debug("Failed to close admin client for {}: {}", entry.getKey(), e.toString()); + } + } + _adminClientCache.clear(); + } + + @Override + public IndexSegment getDataSource() { + List<String> controllerBaseUrls = getControllerBaseUrls(); + if (controllerBaseUrls.isEmpty()) { + return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, Collections.emptyMap()); + } + + List<String> instances = fetchInstances(controllerBaseUrls); + if (instances.isEmpty()) { + return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, Collections.emptyMap()); + } + + Set<String> liveInstances = fetchLiveInstances(controllerBaseUrls); + + List<InstanceRow> rows = new ArrayList<>(instances.size()); + for (String instanceId : instances) { + rows.add(fetchInstanceRow(instanceId, liveInstances, controllerBaseUrls)); + } Review Comment: `getDataSource()` calls `fetchInstanceRow()` for every instance, and `fetchInstanceRow()` hits the controller `getInstance()` endpoint. This creates an N+1 request pattern (one list call + one call per instance) on every system.instances query, which can be slow and load the controller on large clusters. Consider using Helix (via the provided `HelixAdmin`/cluster name) to read instance configs/live instances in bulk, or add caching/batching to avoid per-instance controller requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
