This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9321eabe78 Add round-robin logic during downloadSegmentFromPeer
(#12353)
9321eabe78 is described below
commit 9321eabe784f036102804c6090be5842cb164a69
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Wed Feb 14 02:53:55 2024 +0530
Add round-robin logic during downloadSegmentFromPeer (#12353)
---
.../pinot/common/utils/RoundRobinURIProvider.java | 58 +++++++++++++++-------
.../common/utils/fetcher/BaseSegmentFetcher.java | 6 +--
.../common/utils/fetcher/HttpSegmentFetcher.java | 4 +-
.../common/utils/RoundRobinURIProviderTest.java | 2 +-
.../minion/tasks/SegmentConversionUtils.java | 2 +-
5 files changed, 47 insertions(+), 25 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
index 39fe142ea2..e6ba13c764 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
@@ -23,44 +23,66 @@ import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import org.apache.http.client.utils.URIBuilder;
/**
- * RoundRobinURIProvider accept a URI, try to resolve it into multiple URIs
with IP address, and return a IP address URI
- * in a Round Robin way.
+ * RoundRobinURIProvider accept a list of URIs and whether to resolve them
into multiple URIs with IP address.
+ * If resolveHost = true, it returns a IP address URI in a Round Robin way.
+ * If resolveHost = false, then it returns a URI in a Round Robin way.
*/
public class RoundRobinURIProvider {
- private final URI[] _uris;
+ private final List<URI> _uris;
private int _index;
- public RoundRobinURIProvider(URI originalUri)
+ public RoundRobinURIProvider(List<URI> originalUris, boolean resolveHost)
throws UnknownHostException, URISyntaxException {
+ if (resolveHost) {
+ _uris = resolveHostsToIPAddresses(originalUris);
+ } else {
+ _uris = List.copyOf(originalUris);
+ }
+ _index = new Random().nextInt(_uris.size());
+ }
+
+ public int numAddresses() {
+ return _uris.size();
+ }
+
+ public URI next() {
+ URI result = _uris.get(_index);
+ _index = (_index + 1) % _uris.size();
+ return result;
+ }
+
+ private List<URI> resolveHostToIPAddresses(URI originalUri)
+ throws UnknownHostException, URISyntaxException {
+ List<URI> resolvedUris = new ArrayList<>();
String hostName = originalUri.getHost();
if (InetAddresses.isInetAddress(hostName)) {
- _uris = new URI[]{originalUri};
+ resolvedUris.add(originalUri);
} else {
// Resolve host name to IP addresses via DNS
InetAddress[] addresses = InetAddress.getAllByName(hostName);
- _uris = new URI[addresses.length];
URIBuilder uriBuilder = new URIBuilder(originalUri);
- for (int i = 0; i < addresses.length; i++) {
- String ip = addresses[i].getHostAddress();
- _uris[i] = uriBuilder.setHost(ip).build();
+ for (InetAddress address : addresses) {
+ String ip = address.getHostAddress();
+ resolvedUris.add(uriBuilder.setHost(ip).build());
}
}
- _index = new Random().nextInt(_uris.length);
- }
-
- public int numAddresses() {
- return _uris.length;
+ return resolvedUris;
}
- public URI next() {
- URI result = _uris[_index];
- _index = (_index + 1) % _uris.length;
- return result;
+ private List<URI> resolveHostsToIPAddresses(List<URI> originalUri)
+ throws UnknownHostException, URISyntaxException {
+ List<URI> resolvedUris = new ArrayList<>();
+ for (URI uri : originalUri) {
+ resolvedUris.addAll(resolveHostToIPAddresses(uri));
+ }
+ return resolvedUris;
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index 9deca98343..d33c7ead43 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -21,10 +21,10 @@ package org.apache.pinot.common.utils.fetcher;
import java.io.File;
import java.net.URI;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.utils.RoundRobinURIProvider;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -90,9 +90,9 @@ public abstract class BaseSegmentFetcher implements
SegmentFetcher {
if (uris == null || uris.isEmpty()) {
throw new IllegalArgumentException("The input uri list is null or
empty");
}
- Random r = new Random();
+ RoundRobinURIProvider roundRobinURIProvider = new
RoundRobinURIProvider(uris, false);
RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs,
_retryDelayScaleFactor).attempt(() -> {
- URI uri = uris.get(r.nextInt(uris.size()));
+ URI uri = roundRobinURIProvider.next();
try {
fetchSegmentToLocalWithoutRetry(uri, dest);
_logger.info("Fetched segment from: {} to: {} of size: {}", uri, dest,
dest.length());
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 2b986259b5..170327dc5b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -68,7 +68,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
throws Exception {
// Create a RoundRobinURIProvider to round robin IP addresses when retry
uploading. Otherwise may always try to
// download from a same broken host as: 1) DNS may not RR the IP addresses
2) OS cache the DNS resolution result.
- RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI);
+ RoundRobinURIProvider uriProvider = new
RoundRobinURIProvider(List.of(downloadURI), true);
int retryCount = getRetryCount(uriProvider);
@@ -124,7 +124,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
throws Exception {
// Create a RoundRobinURIProvider to round robin IP addresses when retry
uploading. Otherwise, may always try to
// download from a same broken host as: 1) DNS may not RR the IP addresses
2) OS cache the DNS resolution result.
- RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI);
+ RoundRobinURIProvider uriProvider = new
RoundRobinURIProvider(List.of(downloadURI), true);
int retryCount = getRetryCount(uriProvider);
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
index 3ef82a8e21..05dfff8a6f 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
@@ -162,7 +162,7 @@ public class RoundRobinURIProviderTest {
for (TestCase testCase : testCases) {
String uri = testCase._originalUri;
- RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new
URI(uri));
+ RoundRobinURIProvider uriProvider = new
RoundRobinURIProvider(List.of(new URI(uri)), true);
int n = testCase._expectedUris.size();
int previousIndex = -1;
int currentIndex;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 7bd2e434c4..76824abe0d 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -117,7 +117,7 @@ public class SegmentConversionUtils {
throws Exception {
// Create a RoundRobinURIProvider to round-robin IP addresses when retry
uploading. Otherwise, it may always try to
// upload to a same broken host as: 1) DNS may not RR the IP addresses 2)
OS cache the DNS resolution result.
- RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new
URI(uploadURL));
+ RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(List.of(new
URI(uploadURL)), true);
// Generate retry policy based on the config
String maxNumAttemptsConfigStr =
configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
int maxNumAttemptsFromConfig =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]