tibrewalpratik17 commented on code in PR #12317:
URL: https://github.com/apache/pinot/pull/12317#discussion_r1479756813
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -658,11 +657,10 @@ private void downloadSegmentFromPeer(String segmentName,
String downloadScheme,
try {
tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." +
System.currentTimeMillis());
File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
- // First find servers hosting the segment in a ONLINE state.
- List<URI> peerSegmentURIs =
PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme,
_helixManager);
- // Next download the segment from a randomly chosen server using
configured scheme.
-
SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(peerSegmentURIs,
segmentTarFile);
- _logger.info("Fetched segment {} from: {} to: {} of size: {}",
segmentName, peerSegmentURIs, segmentTarFile,
+ // Next download the segment from a randomly chosen server using
configured download scheme (http or https).
+
SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(segmentName,
+ () -> PeerServerSegmentFinder.getPeerServerURIs(segmentName,
downloadScheme, _helixManager), segmentTarFile);
Review Comment:
we use use the boolean flag returned here and throw exception if false.
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java:
##########
@@ -109,6 +111,46 @@ public File fetchUntarSegmentToLocalStreamed(URI uri, File
dest, long rateLimit,
throw new UnsupportedOperationException();
}
+ /**
+ * @param segmentName
+ * @param uriSupplier the supplier to the list of segment download uris.
+ * @param dest The destination to put the downloaded segment.
+ * @return true if and only if the segment fetch is successful. This method
keeps retrying (with exponential backoff)
+ * of the following steps until segment download is successful or the retry
limit is reached whichever comes first 1)
+ * Find servers hosting the segment in ONLINE state from the External View
of the table. 2) Shuffle the list of
+ * servers. 3) Go through the list of server http download URIs to fetch the
segment until success.
+ * @throws Exception
+ */
+ @Override
+ public boolean fetchSegmentToLocal(String segmentName, Supplier<List<URI>>
uriSupplier, File dest)
+ throws Exception {
+ try {
+ int attempt =
+ RetryPolicies.exponentialBackoffRetryPolicy(_retryCount,
_retryWaitMs, _retryDelayScaleFactor).attempt(() -> {
+ // First find servers hosting the segment in ONLINE state.
+ List<URI> peerSegmentURIs = uriSupplier.get();
+ // Shuffle the list of URIs.
+ Collections.shuffle(peerSegmentURIs);
+ // Next go through the list of URIs to fetch the segment until
success.
+ for (URI uri : peerSegmentURIs) {
+ try {
+ fetchSegmentToLocalWithoutRetry(uri, dest);
+ return true;
+ } catch (Exception e) {
+ _logger.warn("Download segment {} from peer {} failed.",
segmentName, uri, e);
+ }
+ }
+ // None of the URI works. Return false for retry.
+ return false;
+ });
+ _logger.info("Download segment {} successfully with {} attempts.",
segmentName, attempt + 1);
+ return true;
+ } catch (Exception e) {
Review Comment:
Major: The `AttemptsExceededException` will be caught over here and not
rethrown. Currently, at the end of `downloadSegmentFromPeer` we throw a
RuntimeException if the download fails. This change will silently capture the
exception and assume segment got downloaded.
PS: We are not using the boolean val in `RealtimeTableDataManager` in any
way.
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java:
##########
@@ -109,6 +111,46 @@ public File fetchUntarSegmentToLocalStreamed(URI uri, File
dest, long rateLimit,
throw new UnsupportedOperationException();
}
+ /**
+ * @param segmentName
+ * @param uriSupplier the supplier to the list of segment download uris.
+ * @param dest The destination to put the downloaded segment.
+ * @return true if and only if the segment fetch is successful. This method
keeps retrying (with exponential backoff)
+ * of the following steps until segment download is successful or the retry
limit is reached whichever comes first 1)
+ * Find servers hosting the segment in ONLINE state from the External View
of the table. 2) Shuffle the list of
+ * servers. 3) Go through the list of server http download URIs to fetch the
segment until success.
+ * @throws Exception
+ */
+ @Override
+ public boolean fetchSegmentToLocal(String segmentName, Supplier<List<URI>>
uriSupplier, File dest)
+ throws Exception {
Review Comment:
We can remove `throws Exception` as we are capturing those in the entire
method and returning true / false instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]