This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 61c9b8e580 Allow segment upload to deepstore only when server segment
store uri is configured (#10216)
61c9b8e580 is described below
commit 61c9b8e5800677c9aa82baef7101604935d7ec32
Author: Navina Ramesh <[email protected]>
AuthorDate: Mon Feb 6 15:09:27 2023 -0800
Allow segment upload to deepstore only when server segment store uri is
configured (#10216)
---
.../core/data/manager/offline/TableDataManagerProvider.java | 13 +++++++++++++
.../data/manager/realtime/LLRealtimeSegmentDataManager.java | 2 ++
.../core/data/manager/realtime/PinotFSSegmentUploader.java | 2 ++
.../core/data/manager/realtime/SegmentCommitterFactory.java | 6 ++++--
.../manager/realtime/LLRealtimeSegmentDataManagerTest.java | 1 +
.../data/manager/realtime/SegmentCommitterFactoryTest.java | 6 ++++--
.../pinot/tools/admin/command/StartControllerCommand.java | 1 +
.../pinot/tools/admin/command/StartServerCommand.java | 1 +
8 files changed, 28 insertions(+), 4 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 2a2671047a..6b83d33dce 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -19,8 +19,10 @@
package org.apache.pinot.core.data.manager.offline;
import com.google.common.cache.LoadingCache;
+import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -32,6 +34,9 @@ import
org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
/**
@@ -72,6 +77,14 @@ public class TableDataManagerProvider {
}
break;
case REALTIME:
+ Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMap(
+ tableDataManagerConfig.getTableConfig());
+ if
(Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
+ &&
StringUtils.isEmpty(tableDataManagerConfig.getInstanceDataManagerConfig().getSegmentStoreUri()))
{
+ throw new IllegalStateException(String.format("Table has enabled %s
config. But the server has not "
+ + "configured the segmentstore uri. Configure the server config
%s",
+ StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
+ }
tableDataManager = new
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
break;
default:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 7fe3239dcd..6e43a77202 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1000,6 +1000,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
SegmentCompletionProtocol.Response commitResponse =
commit(controllerVipUrl, isSplitCommit);
if
(!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS))
{
+ _segmentLogger.warn("Controller response was {} and not {}",
commitResponse.getStatus(),
+ SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
return false;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index e71cd468f1..2325a44e19 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -54,6 +54,8 @@ public class PinotFSSegmentUploader implements
SegmentUploader {
public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) {
+ LOGGER.error("Missing segment store uri. Failed to upload segment file
{} for {}.", segmentFile.getName(),
+ segmentName.getSegmentName());
return null;
}
Callable<URI> uploadTask = () -> {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 5a52103efc..37ff2e5a82 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -69,9 +69,11 @@ public class SegmentCommitterFactory {
boolean uploadToFs = _streamConfig.isServerUploadToDeepStore();
String peerSegmentDownloadScheme =
_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
- // TODO: exists for backwards compatibility. remove peerDownloadScheme
non-null check once users have migrated
+ String segmentStoreUri = _indexLoadingConfig.getSegmentStoreURI();
+
if (uploadToFs || peerSegmentDownloadScheme != null) {
- segmentUploader = new
PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(),
+ // TODO: peer scheme non-null check exists for backwards compatibility.
remove check once users have migrated
+ segmentUploader = new PinotFSSegmentUploader(segmentStoreUri,
PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
} else {
segmentUploader = new Server2ControllerSegmentUploader(_logger,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index fd75a9a0f0..95f19b419e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -841,6 +841,7 @@ public class LLRealtimeSegmentDataManagerTest {
when(tableDataManagerConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
when(tableDataManagerConfig.getTableType()).thenReturn(TableType.REALTIME);
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+ when(tableDataManagerConfig.getTableConfig()).thenReturn(tableConfig);
InstanceDataManagerConfig instanceDataManagerConfig =
mock(InstanceDataManagerConfig.class);
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
index c815d48a94..8e60b1e414 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
@@ -99,9 +99,11 @@ public class SegmentCommitterFactoryTest {
Map<String, String> streamConfigMap = new
HashMap<>(getMinimumStreamConfigMap());
streamConfigMap.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
"true");
TableConfig config = createRealtimeTableConfig("testDeepStoreConfig",
streamConfigMap).build();
+ IndexLoadingConfig indexLoadingConfig =
Mockito.mock(IndexLoadingConfig.class);
+
Mockito.when(indexLoadingConfig.getSegmentStoreURI()).thenReturn("file:///path/to/segment/store.txt");
SegmentCommitterFactory factory = new
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
- Mockito.mock(IndexLoadingConfig.class),
Mockito.mock(ServerMetrics.class));
+ indexLoadingConfig, Mockito.mock(ServerMetrics.class));
SegmentCommitter committer = factory.createSegmentCommitter(true,
requestParams, controllerVipUrl);
Assert.assertNotNull(committer);
Assert.assertTrue(committer instanceof SplitSegmentCommitter);
@@ -115,7 +117,7 @@ public class SegmentCommitterFactoryTest {
.build();
factory = new SegmentCommitterFactory(Mockito.mock(Logger.class),
protocolHandler, config1,
- Mockito.mock(IndexLoadingConfig.class),
Mockito.mock(ServerMetrics.class));
+ indexLoadingConfig, Mockito.mock(ServerMetrics.class));
committer = factory.createSegmentCommitter(true, requestParams,
controllerVipUrl);
Assert.assertNotNull(committer);
Assert.assertTrue(committer instanceof SplitSegmentCommitter);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
index 660ab1bfc4..1de1ca80f1 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
@@ -73,6 +73,7 @@ public class StartControllerCommand extends
AbstractBaseAdminCommand implements
// This can be set via the set method, or via config file input.
private boolean _tenantIsolation = true;
+ @CommandLine.Option(names = {"-configOverride"}, required = false, split =
",")
private Map<String, Object> _configOverrides = new HashMap<>();
@Override
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
index 80b2cd70bd..e515d13497 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
@@ -86,6 +86,7 @@ public class StartServerCommand extends
AbstractBaseAdminCommand implements Comm
// TODO support forbids = {"-serverHost", "-serverPort", "-dataDir",
"-segmentDir"}
private String _configFileName;
+ @CommandLine.Option(names = {"-configOverride"}, required = false, split =
",")
private Map<String, Object> _configOverrides = new HashMap<>();
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]