This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7a585c9ed90 Fix Helix message exception when EV doesn't exist (#16133)
7a585c9ed90 is described below
commit 7a585c9ed908b6e6c32e89db8b0049e3daf52667
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Jun 17 17:20:46 2025 -0600
Fix Helix message exception when EV doesn't exist (#16133)
---
.../helix/core/PinotHelixResourceManager.java | 226 ++++++---------------
.../realtime/PinotLLCRealtimeSegmentManager.java | 29 +--
.../helix/core/relocation/SegmentRelocator.java | 19 +-
.../helix/core/util/MessagingServiceUtils.java | 74 +++++++
.../core/relocation/SegmentRelocatorTest.java | 4 +-
5 files changed, 147 insertions(+), 205 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1d01e23525b..8b474d2e987 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -155,6 +155,7 @@ import
org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.controller.workload.QueryWorkloadManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -199,10 +200,8 @@ public class PinotHelixResourceManager {
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 5;
public static final String APPEND = "APPEND";
- private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500;
private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500;
private static final String API_REQUEST_ID_PREFIX = "api-";
- private static final int INFINITE_TIMEOUT = -1;
private enum LineageUpdateType {
START, END, REVERT
@@ -211,8 +210,6 @@ public class PinotHelixResourceManager {
// TODO: make this configurable
public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 *
60_000L; // 10 minutes
public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1
second
- public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20
minutes
- public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1
second
private static final DateTimeFormatter SIMPLE_DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC);
@@ -2759,24 +2756,10 @@ public class PinotHelixResourceManager {
* Delete the table on servers by sending table deletion messages.
*/
private void deleteTableOnServers(String tableNameWithType) {
- // External view can be null for newly created table, skip sending messages
- if
(_helixDataAccessor.getProperty(_keyBuilder.externalView(tableNameWithType)) ==
null) {
- LOGGER.warn("No delete table message sent for newly created table: {}
without external view", tableNameWithType);
- return;
- }
-
LOGGER.info("Sending delete table messages for table: {}",
tableNameWithType);
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
- TableDeletionMessage tableDeletionMessage = new
TableDeletionMessage(tableNameWithType);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
-
- // Infinite timeout on the recipient
- int timeoutMs = -1;
- int numMessagesSent = messagingService.send(recipientCriteria,
tableDeletionMessage, null, timeoutMs);
+ TableDeletionMessage message = new TableDeletionMessage(tableNameWithType);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} delete table messages for table: {}",
numMessagesSent, tableNameWithType);
} else {
@@ -2821,27 +2804,21 @@ public class PinotHelixResourceManager {
Preconditions.checkArgument(tt == TableType.OFFLINE,
"Table: %s is not an OFFLINE table, which is required to force to
download segments", tableNameWithType);
}
- // Infinite timeout on the recipient
- int timeoutMs = -1;
+
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
for (Map.Entry<String, List<String>> entry :
instanceToSegmentsMap.entrySet()) {
String targetInstance = entry.getKey();
- List<String> segments = entry.getValue();
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(targetInstance);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
- SegmentReloadMessage segmentReloadMessage = new
SegmentReloadMessage(tableNameWithType, segments, forceDownload);
- ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, timeoutMs);
+ SegmentReloadMessage message = new
SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload);
+ int numMessagesSent =
+ MessagingServiceUtils.send(messagingService, message,
tableNameWithType, null, targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages to instance: {} for table: {}",
numMessagesSent, targetInstance,
tableNameWithType);
} else {
LOGGER.warn("No reload message sent to instance: {} for table: {}",
targetInstance, tableNameWithType);
}
- instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent,
segmentReloadMessage.getMsgId()));
+ instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent,
message.getMsgId()));
}
return instanceMsgInfoMap;
}
@@ -2858,24 +2835,17 @@ public class PinotHelixResourceManager {
"Table: %s is not an OFFLINE table, which is required to force to
download segments", tableNameWithType);
}
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(targetInstance == null ? "%" :
targetInstance);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
- SegmentReloadMessage segmentReloadMessage = new
SegmentReloadMessage(tableNameWithType, forceDownload);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
-
- // Infinite timeout on the recipient
- int timeoutMs = -1;
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, timeoutMs);
+ SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType,
forceDownload);
+ int numMessagesSent =
+ MessagingServiceUtils.send(messagingService, message,
tableNameWithType, null, targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent,
tableNameWithType);
} else {
LOGGER.warn("No reload message sent for table: {}", tableNameWithType);
}
- return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
+ return Pair.of(numMessagesSent, message.getMsgId());
}
public Pair<Integer, String> reloadSegment(String tableNameWithType, String
segmentName, boolean forceDownload,
@@ -2891,26 +2861,18 @@ public class PinotHelixResourceManager {
segmentName);
}
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(targetInstance == null ? "%" :
targetInstance);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setPartition(segmentName);
- recipientCriteria.setSessionSpecific(true);
- SegmentReloadMessage segmentReloadMessage =
- new SegmentReloadMessage(tableNameWithType,
Collections.singletonList(segmentName), forceDownload);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
-
- // Infinite timeout on the recipient
- int timeoutMs = -1;
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, timeoutMs);
+ SegmentReloadMessage message = new SegmentReloadMessage(tableNameWithType,
List.of(segmentName), forceDownload);
+ int numMessagesSent =
+ MessagingServiceUtils.send(messagingService, message,
tableNameWithType, segmentName, targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages for segment: {} in table: {}",
numMessagesSent, segmentName,
tableNameWithType);
} else {
LOGGER.warn("No reload message sent for segment: {} in table: {}",
segmentName, tableNameWithType);
}
- return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
+
+ return Pair.of(numMessagesSent, message.getMsgId());
}
/**
@@ -3023,8 +2985,8 @@ public class PinotHelixResourceManager {
*/
@VisibleForTesting
void resetPartitionAllState(String instanceName, String resourceName,
Set<String> resetPartitionNames) {
- LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster
{}.",
- resetPartitionNames == null ? "NULL" : resetPartitionNames,
resourceName, instanceName, _helixClusterName);
+ LOGGER.info("Resetting partitions: {} for resource: {} on instance: {}",
resetPartitionNames, resourceName,
+ instanceName);
HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -3060,7 +3022,7 @@ public class PinotHelixResourceManager {
+ message.getResourceName());
}
- String adminName = null;
+ String adminName;
try {
adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
} catch (UnknownHostException e) {
@@ -3112,21 +3074,12 @@ public class PinotHelixResourceManager {
*/
public void sendSegmentRefreshMessage(String tableNameWithType, String
segmentName, boolean refreshServerSegment,
boolean refreshBrokerRouting) {
- SegmentRefreshMessage segmentRefreshMessage = new
SegmentRefreshMessage(tableNameWithType, segmentName);
-
- // Send segment refresh message to servers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setSessionSpecific(true);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ SegmentRefreshMessage message = new
SegmentRefreshMessage(tableNameWithType, segmentName);
+ // Send segment refresh message to servers
if (refreshServerSegment) {
- // Send segment refresh message to servers
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setPartition(segmentName);
- // Send message with no callback and infinite timeout on the recipient
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentRefreshMessage, null, -1);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType, segmentName, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to
which messages were sent
LOGGER.info("Sent {} segment refresh messages to servers for segment:
{} of table: {}", numMessagesSent,
@@ -3137,11 +3090,11 @@ public class PinotHelixResourceManager {
}
}
+ // Send segment refresh message to brokers
if (refreshBrokerRouting) {
- // Send segment refresh message to brokers
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setPartition(tableNameWithType);
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentRefreshMessage, null, -1);
+ int numMessagesSent =
+ MessagingServiceUtils.send(messagingService, message,
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType,
+ null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to
which messages were sent
LOGGER.info("Sent {} segment refresh messages to brokers for segment:
{} of table: {}", numMessagesSent,
@@ -3155,18 +3108,10 @@ public class PinotHelixResourceManager {
/// Sends table config refresh message to brokers.
private void sendTableConfigRefreshMessage(String tableNameWithType) {
- TableConfigRefreshMessage tableConfigRefreshMessage = new
TableConfigRefreshMessage(tableNameWithType);
-
- // Send table config refresh message to brokers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setSessionSpecific(true);
- recipientCriteria.setPartition(tableNameWithType);
- // Send message with no callback and infinite timeout on the recipient
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ TableConfigRefreshMessage message = new
TableConfigRefreshMessage(tableNameWithType);
int numMessagesSent =
- _helixZkManager.getMessagingService().send(recipientCriteria,
tableConfigRefreshMessage, null, -1);
+ MessagingServiceUtils.send(messagingService, message,
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which
messages were sent
LOGGER.info("Sent {} table config refresh messages to brokers for table:
{}", numMessagesSent, tableNameWithType);
@@ -3177,43 +3122,21 @@ public class PinotHelixResourceManager {
/// Sends table config and schema refresh message to servers.
private void sendTableConfigSchemaRefreshMessage(String tableNameWithType) {
- TableConfigSchemaRefreshMessage refreshMessage = new
TableConfigSchemaRefreshMessage(tableNameWithType);
-
- // Send table config and schema refresh message to servers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
-
- // Send message with no callback and infinite timeout on the recipient
- try {
- int numMessagesSent =
_helixZkManager.getMessagingService().send(recipientCriteria, refreshMessage,
null, -1);
- if (numMessagesSent > 0) {
- LOGGER.info("Sent {} table config and schema refresh messages for
table: {}", numMessagesSent,
- tableNameWithType);
- } else {
- LOGGER.warn("No table config and schema refresh message sent for
table: {}", tableNameWithType);
- }
- } catch (Exception e) {
- LOGGER.warn("Caught exception while sending table config and schema
refresh message for table: {}",
- tableNameWithType, e);
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ TableConfigSchemaRefreshMessage message = new
TableConfigSchemaRefreshMessage(tableNameWithType);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} table config and schema refresh messages for table:
{}", numMessagesSent, tableNameWithType);
+ } else {
+ LOGGER.warn("No table config and schema refresh message sent for table:
{}", tableNameWithType);
}
}
private void sendLogicalTableConfigRefreshMessage(String logicalTableName) {
- LogicalTableConfigRefreshMessage refreshMessage = new
LogicalTableConfigRefreshMessage(logicalTableName);
-
- // Send logical table config refresh message to brokers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setSessionSpecific(true);
- recipientCriteria.setPartition(logicalTableName);
- // Send message with no callback and infinite timeout on the recipient
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ LogicalTableConfigRefreshMessage message = new
LogicalTableConfigRefreshMessage(logicalTableName);
int numMessagesSent =
- _helixZkManager.getMessagingService().send(recipientCriteria,
refreshMessage, null, -1);
+ MessagingServiceUtils.send(messagingService, message,
Helix.BROKER_RESOURCE_INSTANCE, logicalTableName, null);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} logical table config refresh messages to brokers
for table: {}", numMessagesSent,
logicalTableName);
@@ -3223,18 +3146,11 @@ public class PinotHelixResourceManager {
}
private void sendApplicationQpsQuotaRefreshMessage(String appName) {
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
ApplicationQpsQuotaRefreshMessage message = new
ApplicationQpsQuotaRefreshMessage(appName);
-
- // Send database config refresh message to brokers
- Criteria criteria = new Criteria();
- criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- criteria.setInstanceName("%");
- criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- criteria.setSessionSpecific(true);
-
- int numMessagesSent = _helixZkManager.getMessagingService().send(criteria,
message, null, INFINITE_TIMEOUT);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, Helix.BROKER_RESOURCE_INSTANCE);
if (numMessagesSent > 0) {
- LOGGER.info("Sent {} applcation qps quota refresh messages to brokers
for application: {}", numMessagesSent,
+ LOGGER.info("Sent {} application qps quota refresh messages to brokers
for application: {}", numMessagesSent,
appName);
} else {
LOGGER.warn("No application qps quota refresh message sent to brokers
for application: {}", appName);
@@ -3242,17 +3158,9 @@ public class PinotHelixResourceManager {
}
private void sendDatabaseConfigRefreshMessage(String databaseName) {
- DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new
DatabaseConfigRefreshMessage(databaseName);
-
- // Send database config refresh message to brokers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setSessionSpecific(true);
- // Send message with no callback and infinite timeout on the recipient
- int numMessagesSent =
- _helixZkManager.getMessagingService().send(recipientCriteria,
databaseConfigRefreshMessage, null, -1);
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ DatabaseConfigRefreshMessage message = new
DatabaseConfigRefreshMessage(databaseName);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, Helix.BROKER_RESOURCE_INSTANCE);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} database config refresh messages to brokers for
database: {}", numMessagesSent,
databaseName);
@@ -3262,18 +3170,10 @@ public class PinotHelixResourceManager {
}
private void sendRoutingTableRebuildMessage(String tableNameWithType) {
- RoutingTableRebuildMessage routingTableRebuildMessage = new
RoutingTableRebuildMessage(tableNameWithType);
-
- // Send routing table rebuild message to brokers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
- recipientCriteria.setSessionSpecific(true);
- recipientCriteria.setPartition(tableNameWithType);
- // Send message with no callback and infinite timeout on the recipient
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ RoutingTableRebuildMessage message = new
RoutingTableRebuildMessage(tableNameWithType);
int numMessagesSent =
- _helixZkManager.getMessagingService().send(recipientCriteria,
routingTableRebuildMessage, null, -1);
+ MessagingServiceUtils.send(messagingService, message,
Helix.BROKER_RESOURCE_INSTANCE, tableNameWithType, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which
messages were sent
LOGGER.info("Sent {} routing table rebuild messages to brokers for
table: {}", numMessagesSent,
@@ -4758,27 +4658,17 @@ public class PinotHelixResourceManager {
public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String
tableName, String periodicTaskName,
Map<String, String> taskProperties) {
String periodicTaskRequestId = API_REQUEST_ID_PREFIX +
UUID.randomUUID().toString().substring(0, 8);
-
LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all
controllers for running task {} against {},"
+ " with properties {}.\"", periodicTaskRequestId,
periodicTaskName,
tableName != null ? " table '" + tableName + "'" : "all tables",
taskProperties);
-
- // Create and send message to send to all controllers (including this one)
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setSessionSpecific(true);
-
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
- recipientCriteria.setSelfExcluded(false);
- RunPeriodicTaskMessage runPeriodicTaskMessage =
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ RunPeriodicTaskMessage message =
new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName,
tableName, taskProperties);
-
- ClusterMessagingService clusterMessagingService =
getHelixZkManager().getMessagingService();
- int messageCount = clusterMessagingService.send(recipientCriteria,
runPeriodicTaskMessage, null, -1);
-
+ int numMessagesSent =
+ MessagingServiceUtils.sendIncludingSelf(messagingService, message,
Helix.LEAD_CONTROLLER_RESOURCE_NAME);
LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to
{} controllers.", periodicTaskRequestId,
- messageCount);
- return new PeriodicTaskInvocationResponse(periodicTaskRequestId,
messageCount > 0);
+ numMessagesSent);
+ return new PeriodicTaskInvocationResponse(periodicTaskRequestId,
numMessagesSent > 0);
}
/**
@@ -4824,13 +4714,13 @@ public class PinotHelixResourceManager {
}
public void sendQueryWorkloadRefreshMessage(Map<String,
QueryWorkloadRefreshMessage> instanceToRefreshMessageMap) {
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
instanceToRefreshMessageMap.forEach((instance, message) -> {
Criteria criteria = new Criteria();
criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
criteria.setInstanceName(instance);
criteria.setSessionSpecific(true);
-
- int numMessagesSent =
_helixZkManager.getMessagingService().send(criteria, message, null, -1);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, criteria);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} query workload config refresh messages to
instance: {}", numMessagesSent, instance);
} else {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 012ef8d2d5b..f7bcd221d0a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -55,10 +55,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -99,6 +97,7 @@ import
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
import
org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
import
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -1410,16 +1409,10 @@ public class PinotLLCRealtimeSegmentManager {
newConsumingSegment, newInstances, realtimeTableName,
instancesNoLongerServe);
ClusterMessagingService messagingService =
_helixManager.getMessagingService();
+ IngestionMetricsRemoveMessage message = new
IngestionMetricsRemoveMessage();
List<String> instancesSent = new
ArrayList<>(instancesNoLongerServe.size());
for (String instance : instancesNoLongerServe) {
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setInstanceName(instance);
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setResource(realtimeTableName);
- recipientCriteria.setPartition(committedSegment);
- recipientCriteria.setSessionSpecific(true);
- IngestionMetricsRemoveMessage message = new
IngestionMetricsRemoveMessage();
- if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
+ if (MessagingServiceUtils.send(messagingService, message,
realtimeTableName, committedSegment, instance) > 0) {
instancesSent.add(instance);
} else {
LOGGER.warn("Failed to send ingestion metrics remove message for
table: {} segment: {} to instance: {}",
@@ -2358,20 +2351,14 @@ public class PinotLLCRealtimeSegmentManager {
private void sendForceCommitMessageToServers(String tableNameWithType,
Set<String> consumingSegments) {
if (!consumingSegments.isEmpty()) {
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setInstanceName("%");
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
+ LOGGER.info("Sending force commit messages for segments: {} of table:
{}", consumingSegments, tableNameWithType);
+ ClusterMessagingService messagingService =
_helixManager.getMessagingService();
ForceCommitMessage message = new ForceCommitMessage(tableNameWithType,
consumingSegments);
- int numMessagesSent =
_helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType);
if (numMessagesSent > 0) {
- LOGGER.info("Sent {} force commit messages for table: {} segments:
{}", numMessagesSent, tableNameWithType,
- consumingSegments);
+ LOGGER.info("Sent {} force commit messages for table: {}",
numMessagesSent, tableNameWithType);
} else {
- throw new RuntimeException(
- String.format("No force commit message was sent for table: %s
segments: %s", tableNameWithType,
- consumingSegments));
+ throw new IllegalStateException("No force commit message sent for
table: " + tableNameWithType);
}
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index fd10148ce3e..729b0c49c72 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -33,8 +34,6 @@ import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.Criteria;
-import org.apache.helix.InstanceType;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -47,6 +46,7 @@ import
org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -319,18 +319,11 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
Map<String, Set<String>> serverToSegmentsToMigrate,
ClusterMessagingService messagingService) {
for (Map.Entry<String, Set<String>> entry :
serverToSegmentsToMigrate.entrySet()) {
String serverName = entry.getKey();
- Set<String> segmentNames = entry.getValue();
- // One SegmentReloadMessage per server but takes all segment names.
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(serverName);
- recipientCriteria.setResource(tableNameWithType);
- recipientCriteria.setSessionSpecific(true);
- SegmentReloadMessage segmentReloadMessage =
- new SegmentReloadMessage(tableNameWithType, new
ArrayList<>(segmentNames), false);
+ List<String> segments = new ArrayList<>(entry.getValue());
LOGGER.info("Sending SegmentReloadMessage to server: {} to reload
segments: {} of table: {}", serverName,
- segmentNames, tableNameWithType);
- int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, -1);
+ segments, tableNameWithType);
+ SegmentReloadMessage message = new
SegmentReloadMessage(tableNameWithType, segments, false);
+ int numMessagesSent = MessagingServiceUtils.send(messagingService,
message, tableNameWithType, null, serverName);
if (numMessagesSent > 0) {
LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}",
serverName, tableNameWithType);
} else {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
new file mode 100644
index 00000000000..88bf6f43433
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import javax.annotation.Nullable;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.Message;
+
+
+public class MessagingServiceUtils {
+ private MessagingServiceUtils() {
+ }
+
+ /// Sends a message to the recipients specified by the criteria, returns the
number of messages being sent.
+ public static int send(ClusterMessagingService messagingService, Message
message, Criteria criteria) {
+ try {
+ return messagingService.send(criteria, message);
+ } catch (Exception e) {
+ // NOTE:
+ // It can throw exception when the target resource doesn't exist (e.g.
ExternalView has not been created yet). It
+ // is normal case, and we count it as no message being sent.
+ return 0;
+ }
+ }
+
+ public static int send(ClusterMessagingService messagingService, Message
message, String resource,
+ @Nullable String partition, @Nullable String instanceName, boolean
includingSelf) {
+ Criteria criteria = new Criteria();
+ criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ criteria.setSessionSpecific(true);
+ criteria.setResource(resource);
+ if (partition != null) {
+ criteria.setPartition(partition);
+ }
+ if (instanceName != null) {
+ criteria.setInstanceName(instanceName);
+ } else {
+ criteria.setInstanceName("%");
+ }
+ criteria.setSelfExcluded(!includingSelf);
+ return send(messagingService, message, criteria);
+ }
+
+ public static int send(ClusterMessagingService messagingService, Message
message, String resource,
+ @Nullable String partition, @Nullable String instanceName) {
+ return send(messagingService, message, resource, partition, instanceName,
false);
+ }
+
+ public static int send(ClusterMessagingService messagingService, Message
message, String resource) {
+ return send(messagingService, message, resource, null, null, false);
+ }
+
+ public static int sendIncludingSelf(ClusterMessagingService
messagingService, Message message, String resource) {
+ return send(messagingService, message, resource, null, null, true);
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
index e6bbcdd026a..6d058effdb6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
@@ -44,7 +44,6 @@ import org.apache.pinot.util.TestUtils;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -94,8 +93,7 @@ public class SegmentRelocatorTest {
ArgumentCaptor<Criteria> criteriaCapture =
ArgumentCaptor.forClass(Criteria.class);
ArgumentCaptor<SegmentReloadMessage> reloadMessageCapture =
ArgumentCaptor.forClass(SegmentReloadMessage.class);
- verify(messagingService, times(2)).send(criteriaCapture.capture(),
reloadMessageCapture.capture(), eq(null),
- eq(-1));
+ verify(messagingService, times(2)).send(criteriaCapture.capture(),
reloadMessageCapture.capture());
List<Criteria> criteriaList = criteriaCapture.getAllValues();
List<SegmentReloadMessage> msgList = reloadMessageCapture.getAllValues();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]