liuchang0520 commented on a change in pull request #6778:
URL: https://github.com/apache/pinot/pull/6778#discussion_r698067226
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1228,4 +1301,146 @@ private int
getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
return (numPartitions + numInstancesPerReplicaGroup - 1) /
numInstancesPerReplicaGroup;
}
}
+
+ // Pre-fetch the LLC segments without deep store copy.
+ public void prefetchLLCSegmentsWithoutDeepStoreCopy(String
tableNameWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != TableType.REALTIME) {
+ return;
+ }
+
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}",
tableNameWithType);
+ return;
+ }
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ return;
+ }
+
+ long currentTimeMs = getCurrentTimeMs();
+ List<String> segmentNames =
ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, tableNameWithType);
+ for (String segmentName : segmentNames) {
+ try {
+ // Only fetch recently created LLC segment to alleviate ZK access.
Validate segment creation time from segment name.
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ if (currentTimeMs - llcSegmentName.getCreationTimeMs() >
_validationRangeForLLCSegmentsDeepStoreCopyMs) {
+ continue;
+ }
+
+ LLCRealtimeSegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, segmentName, new Stat());
+ // Cache the committed LLC segments without segment store download
url
+ if (segmentZKMetadata.getStatus() == Status.DONE &&
+
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
+ cacheLLCSegmentNameForUpload(tableNameWithType, segmentName);
Review comment:
Yes, it waits for the server response which has the download url, and
update the ZK metadata. This is based on the logic in
`ControllerPeriodicTask.processTables`. Then in the
`RealtimeSegmentValidationManager.processTable`, each task is performed sync
and sequential.
For deep store upload fix, do you think which one is better: **async** vs
**sync**? Most of the time, the missing deep store copy is rare.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]