rajagopr commented on code in PR #13646:
URL: https://github.com/apache/pinot/pull/13646#discussion_r1755531409
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -110,6 +115,61 @@ public void completeSegmentOperations(String
tableNameWithType, SegmentMetadata
}
}
+ // Complete segment operations for a list of segments in batch mode
+ public void completeSegmentsOperations(String tableNameWithType,
FileUploadType uploadType,
+ boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders
headers,
+ List<SegmentUploadMetadata> segmentUploadMetadataList)
+ throws Exception {
+ boolean refreshOnly =
+
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));
+ List<SegmentUploadMetadata> newSegmentsList = new ArrayList<>();
+ List<SegmentUploadMetadata> existingSegmentsList = new ArrayList<>();
+ for (SegmentUploadMetadata segmentUploadMetadata:
segmentUploadMetadataList) {
+ SegmentMetadata segmentMetadata =
segmentUploadMetadata.getSegmentMetadata();
+ String segmentName = segmentMetadata.getName();
+
+ ZNRecord existingSegmentMetadataZNRecord =
+
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
+ if (existingSegmentMetadataZNRecord != null &&
shouldProcessAsNewSegment(tableNameWithType, segmentName,
+ existingSegmentMetadataZNRecord, enableParallelPushProtection)) {
+ LOGGER.warn("Removing segment ZK metadata (recovering from previous
upload failure) for table: {}, segment: {}",
+ tableNameWithType, segmentName);
+
Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType,
segmentName),
+ "Failed to remove segment ZK metadata for table: %s, segment: %s",
tableNameWithType, segmentName);
+ existingSegmentMetadataZNRecord = null;
+ }
+
+ if (existingSegmentMetadataZNRecord == null) {
+ // Add a new segment
+ if (refreshOnly) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Cannot refresh non-existing segment: %s for
table: %s", segmentName, tableNameWithType),
+ Response.Status.GONE);
+ }
+ LOGGER.info("Adding new segment: {} to table: {}", segmentName,
tableNameWithType);
+ newSegmentsList.add(segmentUploadMetadata);
+ } else {
+ // Refresh an existing segment
+ if (!allowRefresh) {
+ // We cannot perform this check up-front in UploadSegment API call.
If a segment doesn't exist during the
+ // check done up-front but ends up getting created before the check
here, we could incorrectly refresh an
+ // existing segment.
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Segment: %s already exists in table: %s. Refresh
not permitted.", segmentName,
+ tableNameWithType), Response.Status.CONFLICT);
+ }
+ LOGGER.info("Segment: {} already exists in table: {}, refreshing it",
segmentName, tableNameWithType);
+
segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord);
+ existingSegmentsList.add(segmentUploadMetadata);
+ }
+ }
+ // process new segments
+ processNewSegments(tableNameWithType, uploadType,
enableParallelPushProtection, headers, newSegmentsList);
+
+ // process existing segments
+ processExistingSegments(tableNameWithType, uploadType,
enableParallelPushProtection, headers, existingSegmentsList);
Review Comment:
Yes, if the batch call fails after processing a subset of the segments it
would be safe to call the method completeSegmentOperations again with the same
set of inputs and the segments would be automatically classified as new or
existing segments. If the segment already exists it would get refreshed – if
nothing has changed, the segment refreshed timestamp would get updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]