This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ec452a49f3 Refine PeerServerSegmentFinder (#12933)
ec452a49f3 is described below
commit ec452a49f3c885308613bc45dfa44b48a16076ba
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Apr 15 17:02:18 2024 -0700
Refine PeerServerSegmentFinder (#12933)
---
.../common/utils/fetcher/BaseSegmentFetcher.java | 9 +-
.../common/utils/fetcher/HttpSegmentFetcher.java | 28 ++--
.../pinot/core/util/PeerServerSegmentFinder.java | 101 ++++++--------
.../utils/fetcher/HttpSegmentFetcherTest.java | 152 +++++++--------------
.../realtime/PinotLLCRealtimeSegmentManager.java | 3 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 91 +++++-------
.../core/data/manager/BaseTableDataManager.java | 9 +-
.../manager/realtime/RealtimeTableDataManager.java | 28 ++--
.../data/manager/BaseTableDataManagerTest.java | 4 +-
.../core/util/PeerServerSegmentFinderTest.java | 128 ++++++++---------
.../utils/retry/ExponentialBackoffRetryPolicy.java | 6 +-
11 files changed, 220 insertions(+), 339 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index d33c7ead43..5fb82388f2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -42,13 +42,13 @@ public abstract class BaseSegmentFetcher implements
SegmentFetcher {
public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY =
"retry.delay.scale.factor";
public static final int DEFAULT_RETRY_COUNT = 3;
public static final int DEFAULT_RETRY_WAIT_MS = 100;
- public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
+ public static final double DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
protected final Logger _logger =
LoggerFactory.getLogger(getClass().getSimpleName());
protected int _retryCount;
protected int _retryWaitMs;
- protected int _retryDelayScaleFactor;
+ protected double _retryDelayScaleFactor;
protected AuthProvider _authProvider;
@Override
@@ -58,9 +58,8 @@ public abstract class BaseSegmentFetcher implements
SegmentFetcher {
_retryDelayScaleFactor =
config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY,
DEFAULT_RETRY_DELAY_SCALE_FACTOR);
_authProvider = AuthProviderUtils.extractAuthProvider(config,
CommonConstants.KEY_OF_AUTH);
doInit(config);
- _logger
- .info("Initialized with retryCount: {}, retryWaitMs: {},
retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
- _retryDelayScaleFactor);
+ _logger.info("Initialized with retryCount: {}, retryWaitMs: {},
retryDelayScaleFactor: {}", _retryCount,
+ _retryWaitMs, _retryDelayScaleFactor);
}
/**
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 170327dc5b..6872ac7714 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -44,23 +44,16 @@ import org.apache.pinot.spi.utils.retry.RetryPolicies;
public class HttpSegmentFetcher extends BaseSegmentFetcher {
protected FileUploadDownloadClient _httpClient;
- @Override
- protected void doInit(PinotConfiguration config) {
- _httpClient = new
FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build());
- }
-
- public HttpSegmentFetcher() {
- }
-
@VisibleForTesting
- protected HttpSegmentFetcher(FileUploadDownloadClient httpClient,
PinotConfiguration config) {
+ void setHttpClient(FileUploadDownloadClient httpClient) {
_httpClient = httpClient;
- _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY,
DEFAULT_RETRY_COUNT);
- _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY,
DEFAULT_RETRY_WAIT_MS);
- _retryDelayScaleFactor =
config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY,
DEFAULT_RETRY_DELAY_SCALE_FACTOR);
- _logger
- .info("Initialized with retryCount: {}, retryWaitMs: {},
retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
- _retryDelayScaleFactor);
+ }
+
+ @Override
+ protected void doInit(PinotConfiguration config) {
+ if (_httpClient == null) {
+ _httpClient = new
FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build());
+ }
}
@Override
@@ -87,9 +80,8 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" +
port));
}
int statusCode = _httpClient.downloadFile(uri, dest, _authProvider,
httpHeaders);
- _logger
- .info("Downloaded segment from: {} to: {} of size: {}; Response
status code: {}", uri, dest, dest.length(),
- statusCode);
+ _logger.info("Downloaded segment from: {} to: {} of size: {}; Response
status code: {}", uri, dest,
+ dest.length(), statusCode);
return true;
} catch (HttpErrorStatusException e) {
int statusCode = e.getStatusCode();
diff --git
a/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
index e2c9d509f6..7f26d75935 100644
---
a/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
+++
b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
@@ -19,21 +19,19 @@
package org.apache.pinot.core.util;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.commons.collections.ListUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.StringUtil;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,93 +45,74 @@ public class PeerServerSegmentFinder {
private PeerServerSegmentFinder() {
}
- private static final Logger _logger =
LoggerFactory.getLogger(PeerServerSegmentFinder.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PeerServerSegmentFinder.class);
private static final int MAX_NUM_ATTEMPTS = 5;
private static final int INITIAL_DELAY_MS = 500;
private static final double DELAY_SCALE_FACTOR = 2;
/**
- *
- * @param segmentName
- * @param downloadScheme Can be either http or https.
- * @param helixManager
- * @return a list of uri strings of the form
http(s)://hostname:port/segments/tablenameWithType/segmentName
- * for the servers hosting ONLINE segments; empty list if no such server
found.
+ * Returns a list of URIs of the form
'http(s)://hostname:port/segments/tableNameWithType/segmentName' for the servers
+ * hosting ONLINE segments; empty list if no such server found. The download
scheme can be either 'http' or 'https'.
*/
- public static List<URI> getPeerServerURIs(String segmentName, String
downloadScheme, HelixManager helixManager) {
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- String tableNameWithType =
-
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(llcSegmentName.getTableName());
- return getPeerServerURIs(segmentName, downloadScheme, helixManager,
tableNameWithType);
- }
-
- public static List<URI> getPeerServerURIs(String segmentName, String
downloadScheme,
- HelixManager helixManager, String tableNameWithType) {
+ public static List<URI> getPeerServerURIs(HelixManager helixManager, String
tableNameWithType, String segmentName,
+ String downloadScheme) {
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
String clusterName = helixManager.getClusterName();
- if (clusterName == null) {
- _logger.error("ClusterName not found");
- return ListUtils.EMPTY_LIST;
- }
- final List<URI> onlineServerURIs = new ArrayList<>();
+ List<URI> onlineServerURIs = new ArrayList<>();
try {
RetryPolicies.exponentialBackoffRetryPolicy(MAX_NUM_ATTEMPTS,
INITIAL_DELAY_MS, DELAY_SCALE_FACTOR)
.attempt(() -> {
- getOnlineServersFromExternalView(segmentName, downloadScheme,
tableNameWithType, helixAdmin, clusterName,
+ getOnlineServersFromExternalView(helixAdmin, clusterName,
tableNameWithType, segmentName, downloadScheme,
onlineServerURIs);
return !onlineServerURIs.isEmpty();
});
+ } catch (AttemptsExceededException e) {
+ LOGGER.error("Failed to find ONLINE servers for segment: {} in table: {}
after {} attempts", segmentName,
+ tableNameWithType, MAX_NUM_ATTEMPTS);
} catch (Exception e) {
- _logger.error("Failure in getting online servers for segment {}",
segmentName, e);
+ LOGGER.error("Caught exception while getting peer server URIs for
segment: {} in table: {}", segmentName,
+ tableNameWithType, e);
}
return onlineServerURIs;
}
- private static void getOnlineServersFromExternalView(String segmentName,
String downloadScheme,
- String tableNameWithType, HelixAdmin helixAdmin, String clusterName,
List<URI> onlineServerURIs) {
- ExternalView externalViewForResource =
- HelixHelper.getExternalViewForResource(helixAdmin, clusterName,
tableNameWithType);
- if (externalViewForResource == null) {
- _logger.warn("External View not found for table {}", tableNameWithType);
+ private static void getOnlineServersFromExternalView(HelixAdmin helixAdmin,
String clusterName,
+ String tableNameWithType, String segmentName, String downloadScheme,
List<URI> onlineServerURIs)
+ throws Exception {
+ ExternalView externalView =
helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
+ if (externalView == null) {
+ LOGGER.warn("Failed to find external view for table: {}",
tableNameWithType);
return;
}
// Find out the ONLINE servers serving the segment.
- Map<String, String> instanceToStateMap =
externalViewForResource.getStateMap(segmentName);
- for (Map.Entry<String, String> instanceState :
instanceToStateMap.entrySet()) {
- if ("ONLINE".equals(instanceState.getValue())) {
+ Map<String, String> instanceStateMap =
externalView.getStateMap(segmentName);
+ if (instanceStateMap == null) {
+ LOGGER.warn("Failed to find segment: {} in table: {}", segmentName,
tableNameWithType);
+ return;
+ }
+ for (Map.Entry<String, String> instanceState :
instanceStateMap.entrySet()) {
+ if (SegmentStateModel.ONLINE.equals(instanceState.getValue())) {
String instanceId = instanceState.getKey();
- _logger.info("Found ONLINE server {} for segment {}.", instanceId,
segmentName);
+ LOGGER.info("Found ONLINE server: {} for segment: {} in table: {}",
instanceId, segmentName, tableNameWithType);
InstanceConfig instanceConfig =
helixAdmin.getInstanceConfig(clusterName, instanceId);
String hostName = instanceConfig.getHostName();
- int port = getServerAdminPort(helixAdmin, clusterName, instanceId,
downloadScheme);
- try {
- onlineServerURIs.add(new URI(StringUtil
- .join("/", downloadScheme + "://" + hostName + ":" + port,
"segments", tableNameWithType, segmentName)));
- } catch (URISyntaxException e) {
- _logger.warn("Error in uri syntax: ", e);
- }
+ String adminPortKey = getAdminPortKey(downloadScheme);
+ int port = instanceConfig.getRecord().getIntField(adminPortKey,
Server.DEFAULT_ADMIN_API_PORT);
+ onlineServerURIs.add(new URI(
+ StringUtil.join("/", downloadScheme + "://" + hostName + ":" +
port, "segments", tableNameWithType,
+ segmentName)));
}
}
}
- private static int getServerAdminPort(HelixAdmin helixAdmin, String
clusterName, String instanceId,
- String downloadScheme) {
- try {
- return Integer.parseInt(HelixHelper.getInstanceConfigsMapFor(instanceId,
clusterName, helixAdmin)
- .get(getServerAdminPortKey(downloadScheme)));
- } catch (Exception e) {
- _logger.warn("Failed to retrieve ADMIN PORT for instanceId {} in the
cluster {} ", instanceId, clusterName, e);
- return CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
- }
- }
-
- private static String getServerAdminPortKey(String downloadScheme) {
+ private static String getAdminPortKey(String downloadScheme) {
switch (downloadScheme) {
- case CommonConstants.HTTPS_PROTOCOL:
- return CommonConstants.Helix.Instance.ADMIN_HTTPS_PORT_KEY;
case CommonConstants.HTTP_PROTOCOL:
+ return Instance.ADMIN_PORT_KEY;
+ case CommonConstants.HTTPS_PROTOCOL:
+ return Instance.ADMIN_HTTPS_PORT_KEY;
default:
- return CommonConstants.Helix.Instance.ADMIN_PORT_KEY;
+ throw new IllegalArgumentException("Unsupported download scheme: " +
downloadScheme);
}
}
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
index 3159168dab..1a567901b9 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
@@ -19,153 +19,97 @@
package org.apache.pinot.common.utils.fetcher;
import java.io.File;
-import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.List;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.exception.HttpErrorStatusException;
+import java.util.function.Supplier;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
-import org.mockito.MockedStatic;
-import org.testng.Assert;
-import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
public class HttpSegmentFetcherTest {
- private MockedStatic<PeerServerSegmentFinder> _peerServerSegmentFinder =
mockStatic(PeerServerSegmentFinder.class);
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final File SEGMENT_FILE = new
File(FileUtils.getTempDirectory(), SEGMENT_NAME);
+
private PinotConfiguration _fetcherConfig;
- @BeforeSuite
- public void initTest() {
+ @BeforeClass
+ public void setUp() {
_fetcherConfig = new PinotConfiguration();
_fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 3);
+ _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY,
10);
+
_fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY,
1.1);
+ }
+
+ private HttpSegmentFetcher getSegmentFetcher(FileUploadDownloadClient
client) {
+ HttpSegmentFetcher segmentFetcher = new HttpSegmentFetcher();
+ segmentFetcher.setHttpClient(client);
+ segmentFetcher.init(_fetcherConfig);
+ return segmentFetcher;
}
@Test
public void testFetchSegmentToLocalSucceedAtFirstAttempt()
- throws URISyntaxException, IOException, HttpErrorStatusException {
+ throws Exception {
FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
when(client.downloadFile(any(), any(), any())).thenReturn(200);
- HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client,
_fetcherConfig);
- HelixManager helixManager = mock(HelixManager.class);
-
- List<URI> uris = new ArrayList<>();
- uris.add(new URI("http://h1:8080"));
- uris.add(new URI("http://h2:8080"));
- _peerServerSegmentFinder.when(() ->
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
- .thenReturn(uris);
- try {
- httpSegmentFetcher.fetchSegmentToLocal("seg",
- () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http",
helixManager), new File("/file"));
- } catch (Exception e) {
- // If we reach here, the download fails.
- Assert.assertTrue(false, "Download segment failed");
- Assert.assertTrue(e instanceof AttemptsExceededException);
- }
- _peerServerSegmentFinder.reset();
+ HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+ List<URI> uris = List.of(new URI("http://h1:8080"), new
URI("http://h2:8080"));
+ segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE);
}
- @Test
+ @Test(expectedExceptions = AttemptsExceededException.class)
public void testFetchSegmentToLocalAllDownloadAttemptsFailed()
- throws URISyntaxException, IOException, HttpErrorStatusException {
+ throws Exception {
FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
- // All three attempts fails.
- when(client.downloadFile(any(), any(),
any())).thenReturn(300).thenReturn(300).thenReturn(300);
- HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client,
_fetcherConfig);
- HelixManager helixManager = mock(HelixManager.class);
- List<URI> uris = new ArrayList<>();
- uris.add(new URI("http://h1:8080"));
- uris.add(new URI("http://h2:8080"));
-
- _peerServerSegmentFinder.when(() ->
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
- .thenReturn(uris);
- try {
- httpSegmentFetcher.fetchSegmentToLocal("seg",
- () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http",
helixManager), new File("/file"));
- // The test should not reach here because the fetch will throw exception.
- Assert.assertTrue(false, "Download segment failed");
- } catch (Exception e) {
- // If we reach here, the download fails.
- Assert.assertTrue(true, "Download segment failed");
- }
+ // All attempts failed
+ when(client.downloadFile(any(), any(), any())).thenReturn(300);
+ HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+ List<URI> uris = List.of(new URI("http://h1:8080"), new
URI("http://h2:8080"));
+ segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE);
}
@Test
public void testFetchSegmentToLocalSuccessAfterRetry()
- throws URISyntaxException, IOException, HttpErrorStatusException {
+ throws Exception {
FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
- // the first two attempts failed until the last attempt succeeds
+ // The first two attempts failed and the last attempt succeeded
when(client.downloadFile(any(), any(),
any())).thenReturn(300).thenReturn(300).thenReturn(200);
- HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client,
_fetcherConfig);
- HelixManager helixManager = mock(HelixManager.class);
- List<URI> uris = new ArrayList<>();
- uris.add(new URI("http://h1:8080"));
- uris.add(new URI("http://h2:8080"));
-
- _peerServerSegmentFinder.when(() ->
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
- .thenReturn(uris);
- try {
- httpSegmentFetcher.fetchSegmentToLocal("seg",
- () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http",
helixManager), new File("/file"));
- } catch (Exception e) {
- // If we reach here, the download fails.
- Assert.assertTrue(false, "Download segment failed");
- }
+ HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+ List<URI> uris = List.of(new URI("http://h1:8080"), new
URI("http://h2:8080"));
+ segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE);
}
@Test
public void
testFetchSegmentToLocalSuccessAfterFirstTwoAttemptsFoundNoPeerServers()
- throws URISyntaxException, IOException, HttpErrorStatusException {
+ throws Exception {
FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
- // The download always succeeds.
+ // The download always succeeds
when(client.downloadFile(any(), any(), any())).thenReturn(200);
- HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client,
_fetcherConfig);
- HelixManager helixManager = mock(HelixManager.class);
- List<URI> uris = new ArrayList<>();
- uris.add(new URI("http://h1:8080"));
- uris.add(new URI("http://h2:8080"));
-
- // The first two attempts find NO peers hosting the segment but the last
one found two servers.
- _peerServerSegmentFinder.when(() ->
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
- .thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris);
- try {
- httpSegmentFetcher.fetchSegmentToLocal("seg",
- () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http",
helixManager), new File("/file"));
- } catch (Exception e) {
- // If we reach here, the download fails.
- Assert.assertTrue(false, "Download segment failed");
- }
+ HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+ List<URI> uris = List.of(new URI("http://h1:8080"), new
URI("http://h2:8080"));
+ // The first two attempts found NO peers hosting the segment, and the last
one found two servers
+ //noinspection unchecked
+ Supplier<List<URI>> uriSupplier = mock(Supplier.class);
+
when(uriSupplier.get()).thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris);
+ segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, uriSupplier,
SEGMENT_FILE);
}
- @Test
+ @Test(expectedExceptions = AttemptsExceededException.class)
public void testFetchSegmentToLocalFailureWithNoPeerServers()
- throws IOException, HttpErrorStatusException {
+ throws Exception {
FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
- // the download always succeeds.
+ // The download always succeeds
when(client.downloadFile(any(), any(), any())).thenReturn(200);
- HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client,
_fetcherConfig);
- HelixManager helixManager = mock(HelixManager.class);
-
- _peerServerSegmentFinder.when(() ->
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
- .thenReturn(List.of()).thenReturn(List.of()).thenReturn(List.of());
- try {
- httpSegmentFetcher.fetchSegmentToLocal("seg",
- () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http",
helixManager), new File("/file"));
- // The test should not reach here because the fetch will throw exception.
- Assert.assertTrue(false, "Download segment failed");
- } catch (Exception e) {
- Assert.assertTrue(true, "Download segment failed");
- Assert.assertTrue(e instanceof AttemptsExceededException);
- }
+ HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+ List<URI> uris = List.of();
+ segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 25e40084ab..838a03a268 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1483,7 +1483,8 @@ public class PinotLLCRealtimeSegmentManager {
LOGGER.info("Fixing LLC segment {} whose deep store copy is
unavailable", segmentName);
// Find servers which have online replica
List<URI> peerSegmentURIs =
- PeerServerSegmentFinder.getPeerServerURIs(segmentName,
CommonConstants.HTTP_PROTOCOL, _helixManager);
+ PeerServerSegmentFinder.getPeerServerURIs(_helixManager,
realtimeTableName, segmentName,
+ CommonConstants.HTTP_PROTOCOL);
if (peerSegmentURIs.isEmpty()) {
throw new IllegalStateException(
String.format("Failed to upload segment %s to deep store
because no online replica is found",
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 60b83ba24a..f0496a8ee7 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -43,7 +43,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -75,10 +74,10 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
@@ -91,8 +90,6 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
import static
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
@@ -101,6 +98,7 @@ import static org.testng.Assert.*;
public class PinotLLCRealtimeSegmentManagerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"PinotLLCRealtimeSegmentManagerTest");
private static final String SCHEME = "file:";
+ private static final String CLUSTER_NAME = "testCluster";
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
@@ -927,13 +925,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
(ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
- when(helixManager.getClusterName()).thenReturn("cluster_name");
+ when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);
// init fake PinotLLCRealtimeSegmentManager
ControllerConf controllerConfig = new ControllerConf();
- controllerConfig.setProperty(
-
ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
true);
+
controllerConfig.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
+ true);
controllerConfig.setDataDir(TEMP_DIR.toString());
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager,
controllerConfig);
@@ -946,19 +944,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentsValidationAndRetentionConfig.setRetentionTimeUnit(TimeUnit.DAYS.toString());
segmentsValidationAndRetentionConfig.setRetentionTimeValue("3");
segmentManager._tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
- List<SegmentZKMetadata> segmentsZKMetadata =
- new ArrayList<>(segmentManager._segmentZKMetadataMap.values());
+ List<SegmentZKMetadata> segmentsZKMetadata = new
ArrayList<>(segmentManager._segmentZKMetadataMap.values());
Assert.assertEquals(segmentsZKMetadata.size(), 5);
// Set up external view for this table
ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
- when(helixAdmin.getResourceExternalView("cluster_name",
REALTIME_TABLE_NAME))
- .thenReturn(externalView);
- when(helixAdmin.getConfigKeys(any(HelixConfigScope.class))).thenReturn(new
ArrayList<>());
- String adminPort = "2077";
- Map<String, String> instanceConfigMap = new HashMap<>();
- instanceConfigMap.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY,
adminPort);
- when(helixAdmin.getConfig(any(HelixConfigScope.class),
any(List.class))).thenReturn(instanceConfigMap);
+ when(helixAdmin.getResourceExternalView(CLUSTER_NAME,
REALTIME_TABLE_NAME)).thenReturn(externalView);
// Change 1st segment status to be DONE, but with default peer download
url.
// Verify later the download url is fixed after upload success.
@@ -966,28 +957,26 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 1st segment
String instance0 = "instance0";
+ int adminPort = 2077;
externalView.setState(segmentsZKMetadata.get(0).getSegmentName(),
instance0, "ONLINE");
InstanceConfig instanceConfig0 = new InstanceConfig(instance0);
instanceConfig0.setHostName(instance0);
- when(helixAdmin.getInstanceConfig(any(String.class),
eq(instance0))).thenReturn(instanceConfig0);
+ instanceConfig0.getRecord().setIntField(Instance.ADMIN_PORT_KEY,
adminPort);
+ when(helixAdmin.getInstanceConfig(CLUSTER_NAME,
instance0)).thenReturn(instanceConfig0);
// mock the request/response for 1st segment upload
- String serverUploadRequestUrl0 = StringUtil
- .join("/",
- CommonConstants.HTTP_PROTOCOL + "://" + instance0 + ":" +
adminPort,
- "segments",
- REALTIME_TABLE_NAME,
- segmentsZKMetadata.get(0).getSegmentName(),
- "upload") + "?uploadTimeoutMs=-1";
+ String serverUploadRequestUrl0 =
+ String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1",
instance0, adminPort,
+ REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName());
// tempSegmentFileLocation is the location where the segment uploader will
upload the segment. This usually ends
// with a random UUID
File tempSegmentFileLocation = new File(TEMP_DIR,
segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID());
FileUtils.write(tempSegmentFileLocation, "test");
// After the deep-store retry task gets the segment location returned by
Pinot server, it will move the segment to
// its final location. This is the expected segment location.
- String expectedSegmentLocation =
segmentManager.createSegmentPath(RAW_TABLE_NAME,
- segmentsZKMetadata.get(0).getSegmentName()).toString();
- when(segmentManager._mockedFileUploadDownloadClient
-
.uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(tempSegmentFileLocation.getPath());
+ String expectedSegmentLocation =
+ segmentManager.createSegmentPath(RAW_TABLE_NAME,
segmentsZKMetadata.get(0).getSegmentName()).toString();
+
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(
+ tempSegmentFileLocation.getPath());
// Change 2nd segment status to be DONE, but with default peer download
url.
// Verify later the download url isn't fixed after upload failure.
@@ -998,25 +987,20 @@ public class PinotLLCRealtimeSegmentManagerTest {
externalView.setState(segmentsZKMetadata.get(1).getSegmentName(),
instance1, "ONLINE");
InstanceConfig instanceConfig1 = new InstanceConfig(instance1);
instanceConfig1.setHostName(instance1);
- when(helixAdmin.getInstanceConfig(any(String.class),
eq(instance1))).thenReturn(instanceConfig1);
+ instanceConfig1.getRecord().setIntField(Instance.ADMIN_PORT_KEY,
adminPort);
+ when(helixAdmin.getInstanceConfig(CLUSTER_NAME,
instance1)).thenReturn(instanceConfig1);
// mock the request/response for 2nd segment upload
- String serverUploadRequestUrl1 = StringUtil
- .join("/",
- CommonConstants.HTTP_PROTOCOL + "://" + instance1 + ":" +
adminPort,
- "segments",
- REALTIME_TABLE_NAME,
- segmentsZKMetadata.get(1).getSegmentName(),
- "upload") + "?uploadTimeoutMs=-1";
- when(segmentManager._mockedFileUploadDownloadClient
- .uploadToSegmentStore(serverUploadRequestUrl1))
- .thenThrow(new HttpErrorStatusException(
- "failed to upload segment",
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()));
+ String serverUploadRequestUrl1 =
+ String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1",
instance1, adminPort,
+ REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName());
+
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl1)).thenThrow(
+ new HttpErrorStatusException("failed to upload segment",
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()));
// Change 3rd segment status to be DONE, but with default peer download
url.
// Verify later the download url isn't fixed because no ONLINE replica
found in any server.
segmentsZKMetadata.get(2).setStatus(Status.DONE);
- segmentsZKMetadata.get(2).setDownloadUrl(
- METADATA_URI_FOR_PEER_DOWNLOAD);
+ segmentsZKMetadata.get(2).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 3rd segment
String instance2 = "instance2";
externalView.setState(segmentsZKMetadata.get(2).getSegmentName(),
instance2, "OFFLINE");
@@ -1029,11 +1013,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Keep 5th segment status as IN_PROGRESS.
- List<String> segmentNames = segmentsZKMetadata.stream()
- .map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
- when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME))
- .thenReturn(segmentManager._tableConfig);
-
+ List<String> segmentNames =
+
segmentsZKMetadata.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
+
when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(segmentManager._tableConfig);
// Verify the result
segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig,
segmentsZKMetadata);
@@ -1042,23 +1024,18 @@ public class PinotLLCRealtimeSegmentManagerTest {
TestUtils.waitForCondition(aVoid ->
segmentManager.deepStoreUploadExecutorPendingSegmentsIsEmpty(), 30_000L,
"Timed out waiting for upload retry tasks to finish");
- assertEquals(
- segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(0), null).getDownloadUrl(),
+ assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(0), null).getDownloadUrl(),
expectedSegmentLocation);
assertFalse(tempSegmentFileLocation.exists(),
"Deep-store retry task should move the file from temp location to
permanent location");
- assertEquals(
- segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(1), null).getDownloadUrl(),
+ assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(1), null).getDownloadUrl(),
METADATA_URI_FOR_PEER_DOWNLOAD);
- assertEquals(
- segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(2), null).getDownloadUrl(),
+ assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(2), null).getDownloadUrl(),
METADATA_URI_FOR_PEER_DOWNLOAD);
- assertEquals(
- segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(3), null).getDownloadUrl(),
+ assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(3), null).getDownloadUrl(),
defaultDownloadUrl);
- assertNull(
- segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(4), null).getDownloadUrl());
+ assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
segmentNames.get(4), null).getDownloadUrl());
}
@Test
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index c46a85690d..1237db547a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -153,6 +153,13 @@ public abstract class BaseTableDataManager implements
TableDataManager {
if (_peerDownloadScheme == null) {
_peerDownloadScheme =
instanceDataManagerConfig.getSegmentPeerDownloadScheme();
}
+ if (_peerDownloadScheme != null) {
+ _peerDownloadScheme = _peerDownloadScheme.toLowerCase();
+ Preconditions.checkState(
+ CommonConstants.HTTP_PROTOCOL.equals(_peerDownloadScheme) ||
CommonConstants.HTTPS_PROTOCOL.equals(
+ _peerDownloadScheme), "Unsupported peer download scheme: %s for
table: %s", _peerDownloadScheme,
+ _tableNameWithType);
+ }
_streamSegmentDownloadUntarRateLimitBytesPerSec =
instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
@@ -691,7 +698,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
throws Exception {
Preconditions.checkState(_peerDownloadScheme != null, "Download peers
require non null peer download scheme");
List<URI> peerSegmentURIs =
- PeerServerSegmentFinder.getPeerServerURIs(segmentName,
_peerDownloadScheme, _helixManager, _tableNameWithType);
+ PeerServerSegmentFinder.getPeerServerURIs(_helixManager,
_tableNameWithType, segmentName, _peerDownloadScheme);
if (peerSegmentURIs.isEmpty()) {
String msg = String.format("segment %s doesn't have any peers",
segmentName);
LOGGER.warn(msg);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 8e50049028..b120867d6b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -635,17 +635,15 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
} catch (Exception e) {
_logger.warn("Download segment {} from deepstore uri {} failed.",
segmentName, uri, e);
// Download from deep store failed; try to download from peer if peer
download is setup for the table.
- if (isPeerSegmentDownloadEnabled(tableConfig)) {
- downloadSegmentFromPeer(segmentName,
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(),
- indexLoadingConfig);
+ if (_peerDownloadScheme != null) {
+ downloadSegmentFromPeer(segmentName, indexLoadingConfig);
} else {
throw e;
}
}
} else {
- if (isPeerSegmentDownloadEnabled(tableConfig)) {
- downloadSegmentFromPeer(segmentName,
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(),
- indexLoadingConfig);
+ if (_peerDownloadScheme != null) {
+ downloadSegmentFromPeer(segmentName, indexLoadingConfig);
} else {
throw new RuntimeException("Peer segment download not enabled for
segment " + segmentName);
}
@@ -687,23 +685,16 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
replaceLLSegment(segmentName, indexLoadingConfig);
}
- private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
- return
-
CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
- || CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(
- tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
- }
-
- private void downloadSegmentFromPeer(String segmentName, String
downloadScheme,
- IndexLoadingConfig indexLoadingConfig) {
+ private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig
indexLoadingConfig) {
File tempRootDir = null;
try {
tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." +
System.currentTimeMillis());
File segmentTarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
// Next download the segment from a randomly chosen server using
configured download scheme (http or https).
-
SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(segmentName,
() -> {
+
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
() -> {
List<URI> peerServerURIs =
- PeerServerSegmentFinder.getPeerServerURIs(segmentName,
downloadScheme, _helixManager);
+ PeerServerSegmentFinder.getPeerServerURIs(_helixManager,
_tableNameWithType, segmentName,
+ _peerDownloadScheme);
Collections.shuffle(peerServerURIs);
return peerServerURIs;
}, segmentTarFile);
@@ -711,7 +702,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
segmentTarFile.length());
untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile,
tempRootDir);
} catch (Exception e) {
- _logger.warn("Download and move segment {} from peer with scheme {}
failed.", segmentName, downloadScheme, e);
+ _logger.warn("Download and move segment {} from peer with scheme {}
failed.", segmentName, _peerDownloadScheme,
+ e);
throw new RuntimeException(e);
} finally {
FileUtils.deleteQuietly(tempRootDir);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index d4c5f4fc29..261fe0f238 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -660,8 +660,8 @@ public class BaseTableDataManagerTest {
File destFile = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
try (MockedStatic<PeerServerSegmentFinder> mockPeerSegFinder =
mockStatic(PeerServerSegmentFinder.class)) {
mockPeerSegFinder.when(
- () -> PeerServerSegmentFinder.getPeerServerURIs("seg01", "http",
helixManager, TABLE_NAME_WITH_TYPE))
- .thenReturn(Collections.singletonList(uri));
+ () -> PeerServerSegmentFinder.getPeerServerURIs(helixManager,
TABLE_NAME_WITH_TYPE, "seg01",
+ CommonConstants.HTTP_PROTOCOL)).thenReturn(List.of(uri));
tmgr.downloadFromPeersWithoutStreaming("seg01",
mock(SegmentZKMetadata.class), destFile);
}
assertEquals(FileUtils.readFileToString(destFile), "this is from somewhere
remote");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java
index 4b6c6fb910..2af972695f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java
@@ -19,103 +19,93 @@
package org.apache.pinot.core.util;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.StringUtil;
-import org.testng.Assert;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
public class PeerServerSegmentFinderTest {
- private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
- private static final String SEGMENT_1 = "testTable__0__0__t11";
- private static final String SEGMENT_2 = "testTable__0__1__t11";
- private static final String CLUSTER_NAME = "dummyCluster";
- private static final String INSTANCE_ID1 = "Server_localhost_1000";
- private static final String INSTANCE_ID2 = "Server_localhost_1001";
- private static final String INSTANCE_ID3 = "Server_localhost_1003";
- public static final String ADMIN_PORT = "1008";
- public static final String HOST_1_NAME = "s1";
- public static final String HOST_2_NAME = "s2";
- public static final String HOST_3_NAME = "s3";
+ private static final String CLUSTER_NAME = "testCluster";
+ private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+ private static final String SEGMENT_1 = "testSegment1";
+ private static final String SEGMENT_2 = "testSegment2";
+ private static final String INSTANCE_ID_1 = "Server_s1_1007";
+ private static final String INSTANCE_ID_2 = "Server_s2_1007";
+ private static final String INSTANCE_ID_3 = "Server_s3_1007";
+ private static final String HOSTNAME_1 = "s1";
+ private static final String HOSTNAME_2 = "s2";
+ private static final String HOSTNAME_3 = "s3";
+ private static final int HELIX_PORT = 1007;
+ private static final int HTTP_ADMIN_PORT = 1008;
+ private static final int HTTPS_ADMIN_PORT = 1009;
+
private HelixManager _helixManager;
@BeforeClass
- public void initSegmentFetcherFactoryWithPeerServerSegmentFetcher()
- throws Exception {
- HelixAdmin helixAdmin;
- {
- ExternalView ev = new ExternalView(TABLE_NAME_WITH_TYPE);
- ev.setState(SEGMENT_1, INSTANCE_ID1, "ONLINE");
- ev.setState(SEGMENT_1, INSTANCE_ID2, "OFFLINE");
- ev.setState(SEGMENT_1, INSTANCE_ID3, "ONLINE");
- ev.setState(SEGMENT_2, INSTANCE_ID1, "OFFLINE");
- ev.setState(SEGMENT_2, INSTANCE_ID2, "OFFLINE");
- _helixManager = mock(HelixManager.class);
- helixAdmin = mock(HelixAdmin.class);
- when(_helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
- when(_helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
- when(helixAdmin.getResourceExternalView(CLUSTER_NAME,
TABLE_NAME_WITH_TYPE)).thenReturn(ev);
-
when(helixAdmin.getConfigKeys(any(HelixConfigScope.class))).thenReturn(new
ArrayList<>());
- Map<String, String> instanceConfigMap = new HashMap<>();
- instanceConfigMap.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY,
ADMIN_PORT);
- when(helixAdmin.getConfig(any(HelixConfigScope.class),
any(List.class))).thenReturn(instanceConfigMap);
- InstanceConfig instanceConfig1 = new InstanceConfig(INSTANCE_ID1);
- instanceConfig1.setHostName(HOST_1_NAME);
- instanceConfig1.setPort("1000");
- when(helixAdmin.getInstanceConfig(any(String.class),
eq(INSTANCE_ID1))).thenReturn(instanceConfig1);
+ public void initSegmentFetcherFactoryWithPeerServerSegmentFetcher() {
+ ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
+ externalView.setState(SEGMENT_1, INSTANCE_ID_1, "ONLINE");
+ externalView.setState(SEGMENT_1, INSTANCE_ID_2, "OFFLINE");
+ externalView.setState(SEGMENT_1, INSTANCE_ID_3, "ONLINE");
+ externalView.setState(SEGMENT_2, INSTANCE_ID_1, "OFFLINE");
+ externalView.setState(SEGMENT_2, INSTANCE_ID_2, "OFFLINE");
- InstanceConfig instanceConfig2 = new InstanceConfig(INSTANCE_ID2);
- instanceConfig2.setHostName(HOST_2_NAME);
- instanceConfig2.setPort("1000");
- when(helixAdmin.getInstanceConfig(any(String.class),
eq(INSTANCE_ID2))).thenReturn(instanceConfig2);
+ _helixManager = mock(HelixManager.class);
+ HelixAdmin helixAdmin = mock(HelixAdmin.class);
+ when(_helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+ when(_helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
+ when(helixAdmin.getResourceExternalView(CLUSTER_NAME,
REALTIME_TABLE_NAME)).thenReturn(externalView);
+ when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_1)).thenReturn(
+ getInstanceConfig(INSTANCE_ID_1, HOSTNAME_1));
+ when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_2)).thenReturn(
+ getInstanceConfig(INSTANCE_ID_2, HOSTNAME_2));
+ when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_3)).thenReturn(
+ getInstanceConfig(INSTANCE_ID_3, HOSTNAME_3));
+ }
- InstanceConfig instanceConfig3 = new InstanceConfig(INSTANCE_ID3);
- instanceConfig3.setHostName(HOST_3_NAME);
- instanceConfig3.setPort("1000");
- when(helixAdmin.getInstanceConfig(any(String.class),
eq(INSTANCE_ID3))).thenReturn(instanceConfig3);
- }
+ private static InstanceConfig getInstanceConfig(String instanceId, String
hostName) {
+ InstanceConfig instanceConfig = new InstanceConfig(instanceId);
+ instanceConfig.setHostName(hostName);
+ instanceConfig.setPort(Integer.toString(HELIX_PORT));
+ instanceConfig.getRecord().setIntField(Instance.ADMIN_PORT_KEY,
HTTP_ADMIN_PORT);
+ instanceConfig.getRecord().setIntField(Instance.ADMIN_HTTPS_PORT_KEY,
HTTPS_ADMIN_PORT);
+ return instanceConfig;
}
@Test
public void testSegmentFoundSuccessfully()
throws Exception {
// SEGMENT_1 has only 2 online replicas.
- List<URI> httpServerURIs =
- PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_1,
CommonConstants.HTTP_PROTOCOL, _helixManager);
- assertEquals(2, httpServerURIs.size());
- httpServerURIs.contains(new URI(
- StringUtil.join("/", "http://" + HOST_1_NAME + ":" + ADMIN_PORT,
"segments", TABLE_NAME_WITH_TYPE, SEGMENT_1)));
- httpServerURIs.contains(new URI(
- StringUtil.join("/", "http://" + HOST_3_NAME + ":" + ADMIN_PORT,
"segments", TABLE_NAME_WITH_TYPE, SEGMENT_1)));
- List<URI> httpsServerURIs =
- PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_1,
CommonConstants.HTTPS_PROTOCOL, _helixManager);
- assertEquals(2, httpsServerURIs.size());
- httpServerURIs.contains(new URI(StringUtil
- .join("/", "https://" + HOST_1_NAME + ":" + ADMIN_PORT, "segments",
TABLE_NAME_WITH_TYPE, SEGMENT_1)));
- httpServerURIs.contains(new URI(StringUtil
- .join("/", "https://" + HOST_3_NAME + ":" + ADMIN_PORT, "segments",
TABLE_NAME_WITH_TYPE, SEGMENT_1)));
+ List<URI> httpServerURIs =
PeerServerSegmentFinder.getPeerServerURIs(_helixManager, REALTIME_TABLE_NAME,
SEGMENT_1,
+ CommonConstants.HTTP_PROTOCOL);
+ assertEquals(httpServerURIs.size(), 2);
+ assertTrue(httpServerURIs.contains(new URI(
+ String.format("http://%s:%d/segments/%s/%s", HOSTNAME_1,
HTTP_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1))));
+ assertTrue(httpServerURIs.contains(new URI(
+ String.format("http://%s:%d/segments/%s/%s", HOSTNAME_3,
HTTP_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1))));
+ List<URI> httpsServerURIs =
PeerServerSegmentFinder.getPeerServerURIs(_helixManager, REALTIME_TABLE_NAME,
SEGMENT_1,
+ CommonConstants.HTTPS_PROTOCOL);
+ assertEquals(httpsServerURIs.size(), 2);
+ assertTrue(httpsServerURIs.contains(new URI(
+ String.format("https://%s:%d/segments/%s/%s", HOSTNAME_1,
HTTPS_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1))));
+ assertTrue(httpsServerURIs.contains(new URI(
+ String.format("https://%s:%d/segments/%s/%s", HOSTNAME_3,
HTTPS_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1))));
}
@Test
- public void testSegmentNotFound()
- throws Exception {
- Assert.assertEquals(0,
- PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_2,
CommonConstants.HTTP_PROTOCOL, _helixManager).size());
+ public void testSegmentNotFound() {
+ assertTrue(PeerServerSegmentFinder.getPeerServerURIs(_helixManager,
REALTIME_TABLE_NAME, SEGMENT_2,
+ CommonConstants.HTTP_PROTOCOL).isEmpty());
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java
index 6151aab06f..e5b9b7dc1a 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java
@@ -39,8 +39,8 @@ public class ExponentialBackoffRetryPolicy extends
BaseRetryPolicy {
@Override
protected long getDelayMs(int currentAttempt) {
- double minDelayMs = _initialDelayMs * Math.pow(_delayScaleFactor,
currentAttempt);
- double maxDelayMs = minDelayMs * _delayScaleFactor;
- return _random.nextLong((long) minDelayMs, (long) maxDelayMs);
+ long minDelayMs = (long) (_initialDelayMs * Math.pow(_delayScaleFactor,
currentAttempt));
+ long maxDelayMs = (long) (minDelayMs * _delayScaleFactor);
+ return minDelayMs < maxDelayMs ? _random.nextLong(minDelayMs, maxDelayMs)
: minDelayMs;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]