This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a5f3dc5 Move segment download and segment reload into TableManager
(#7319)
a5f3dc5 is described below
commit a5f3dc507e6441baca35dae5bbfad122356683b6
Author: Xiaobing <[email protected]>
AuthorDate: Wed Sep 15 09:38:14 2021 -0700
Move segment download and segment reload into TableManager (#7319)
Deletes SegmentFetcherAndLoader; and move its functionality to
BaseTableManager, preparing to combine the download logic with
RealtimeTableManager. Besides, adding UTs.
---
.../core/data/manager/BaseTableDataManager.java | 230 +++++++
.../core/data/manager/InstanceDataManager.java | 21 +-
.../manager/realtime/RealtimeTableDataManager.java | 25 +-
...=> BaseTableDataManagerAcquireSegmentTest.java} | 37 +-
.../data/manager/BaseTableDataManagerTest.java | 664 ++++++++++-----------
.../realtime/RealtimeTableDataManagerTest.java | 56 ++
.../local/data/manager/TableDataManager.java | 24 +
.../server/starter/helix/BaseServerStarter.java | 28 +-
.../starter/helix/HelixInstanceDataManager.java | 96 ++-
.../starter/helix/SegmentFetcherAndLoader.java | 257 --------
.../helix/SegmentMessageHandlerFactory.java | 20 +-
.../SegmentOnlineOfflineStateModelFactory.java | 9 +-
.../apache/pinot/spi/utils/CommonConstants.java | 6 +-
13 files changed, 751 insertions(+), 722 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 2933aa8..99bb521 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -18,32 +18,46 @@
*/
package org.apache.pinot.core.data.manager;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.LoadingCache;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.Pair;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -256,4 +270,220 @@ public abstract class BaseTableDataManager implements
TableDataManager {
.collect(Collectors.toMap(map -> map.getKey().getSecond(),
Map.Entry::getValue));
}
}
+
+ @Override
+ public void reloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
+ SegmentMetadata localMetadata, @Nullable Schema schema, boolean
forceDownload)
+ throws Exception {
+ File indexDir = localMetadata.getIndexDir();
+ Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is
not a directory", indexDir);
+
+ File parentFile = indexDir.getParentFile();
+ File segmentBackupDir =
+ new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+
+ try {
+ // First rename index directory to segment backup directory so that
original segment have all file descriptors
+ // point to the segment backup directory to ensure original segment
serves queries properly
+
+ // Rename index directory to segment backup directory (atomic)
+ Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
+ "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
+
+ // Download from remote or copy from local backup directory into index
directory,
+ // and then continue to load the segment from index directory.
+ boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata,
localMetadata);
+ if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+ if (forceDownload) {
+ LOGGER.info("Segment: {} of table: {} is forced to download",
segmentName, _tableNameWithType);
+ } else {
+ LOGGER.info("Download segment:{} of table: {} as local crc: {}
mismatches remote crc: {}", segmentName,
+ _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+ }
+ indexDir = downloadSegment(segmentName, zkMetadata);
+ } else {
+ LOGGER.info("Reload the local copy of segment: {} of table: {}",
segmentName, _tableNameWithType);
+ FileUtils.copyDirectory(segmentBackupDir, indexDir);
+ }
+
+ // Load from index directory and replace the old segment in memory.
+ addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
schema));
+
+ // Rename segment backup directory to segment temporary directory
(atomic)
+ // The reason to first rename then delete is that, renaming is an atomic
operation, but deleting is not. When we
+ // rename the segment backup directory to segment temporary directory,
we know the reload already succeeded, so
+ // that we can safely delete the segment temporary directory
+ File segmentTempDir = new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
+ Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
+ "Failed to rename segment backup directory: %s to segment temporary
directory: %s", segmentBackupDir,
+ segmentTempDir);
+ FileUtils.deleteDirectory(segmentTempDir);
+ } catch (Exception reloadFailureException) {
+ try {
+ LoaderUtils.reloadFailureRecovery(indexDir);
+ } catch (Exception recoveryFailureException) {
+ LOGGER.error("Failed to recover after reload failure",
recoveryFailureException);
+ reloadFailureException.addSuppressed(recoveryFailureException);
+ }
+ throw reloadFailureException;
+ }
+ }
+
+ @Override
+ public void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
+ SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+ throws Exception {
+ if (!isNewSegment(zkMetadata, localMetadata)) {
+ LOGGER.info("Segment: {} of table: {} has crc: {} same as before,
already loaded, do nothing", segmentName,
+ _tableNameWithType, localMetadata.getCrc());
+ return;
+ }
+
+ // Try to recover if no local metadata is provided.
+ if (localMetadata == null) {
+ LOGGER.info("Segment: {} of table: {} is not loaded, checking disk",
segmentName, _tableNameWithType);
+ localMetadata = recoverSegmentQuietly(segmentName);
+ if (!isNewSegment(zkMetadata, localMetadata)) {
+ LOGGER.info("Segment: {} of table {} has crc: {} same as before,
loading", segmentName, _tableNameWithType,
+ localMetadata.getCrc());
+ if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
+ return;
+ }
+ // Set local metadata to null to indicate that the local segment fails
to load,
+ // although it exists and has same crc with the remote one.
+ localMetadata = null;
+ }
+ }
+
+ Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment:
%s of table: %s does not allow download",
+ segmentName, _tableNameWithType);
+
+ // Download segment and replace the local one, either due to failure to
recover local segment,
+ // or the segment data is updated and has new CRC now.
+ if (localMetadata == null) {
+ LOGGER.info("Download segment: {} of table: {} as no good one exists
locally", segmentName, _tableNameWithType);
+ } else {
+ LOGGER.info("Download segment: {} of table: {} as local crc: {}
mismatches remote crc: {}.", segmentName,
+ _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+ }
+ File indexDir = downloadSegment(segmentName, zkMetadata);
+ addSegment(indexDir, indexLoadingConfig);
+ LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}",
segmentName, _tableNameWithType,
+ zkMetadata.getCrc());
+ }
+
+ protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
+ return true;
+ }
+
+ protected File downloadSegment(String segmentName, SegmentZKMetadata
zkMetadata)
+ throws Exception {
+ // TODO: may support download from peer servers for RealTime table.
+ return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+ }
+
+ /**
+ * Server restart during segment reload might leave segment directory in
inconsistent state, like the index
+ * directory might not exist but segment backup directory existed. This
method tries to recover from reload
+ * failure before checking the existence of the index directory and loading
segment metadata from it.
+ */
+ private SegmentMetadata recoverSegmentQuietly(String segmentName) {
+ File indexDir = getSegmentDataDir(segmentName);
+ try {
+ LoaderUtils.reloadFailureRecovery(indexDir);
+ if (!indexDir.exists()) {
+ LOGGER.info("Segment: {} of table: {} is not found on disk",
segmentName, _tableNameWithType);
+ return null;
+ }
+ SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
+ LOGGER.info("Segment: {} of table: {} with crc: {} from disk is ready
for loading", segmentName,
+ _tableNameWithType, localMetadata.getCrc());
+ return localMetadata;
+ } catch (Exception e) {
+ LOGGER.error("Failed to recover segment: {} of table: {} from disk",
segmentName, _tableNameWithType, e);
+ FileUtils.deleteQuietly(indexDir);
+ return null;
+ }
+ }
+
+ private boolean loadSegmentQuietly(String segmentName, IndexLoadingConfig
indexLoadingConfig) {
+ File indexDir = getSegmentDataDir(segmentName);
+ try {
+ addSegment(indexDir, indexLoadingConfig);
+ LOGGER.info("Loaded segment: {} of table: {} from disk", segmentName,
_tableNameWithType);
+ return true;
+ } catch (Exception e) {
+ FileUtils.deleteQuietly(indexDir);
+ LOGGER.error("Failed to load segment: {} of table: {} from disk",
segmentName, _tableNameWithType, e);
+ return false;
+ }
+ }
+
+ private File downloadSegmentFromDeepStore(String segmentName,
SegmentZKMetadata zkMetadata)
+ throws Exception {
+ File tempRootDir = getSegmentDataDir("tmp-" + segmentName + "-" +
UUID.randomUUID());
+ FileUtils.forceMkdir(tempRootDir);
+ try {
+ File tarFile = downloadAndDecrypt(segmentName, zkMetadata, tempRootDir);
+ return untarAndMoveSegment(segmentName, tarFile, tempRootDir);
+ } finally {
+ FileUtils.deleteQuietly(tempRootDir);
+ }
+ }
+
+ @VisibleForTesting
+ File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata,
File tempRootDir)
+ throws Exception {
+ File tarFile = new File(tempRootDir, segmentName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ String uri = zkMetadata.getDownloadUrl();
+ try {
+ SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile,
zkMetadata.getCrypterName());
+ LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to:
{}, file length: {}", segmentName,
+ _tableNameWithType, uri, tarFile, tarFile.length());
+ return tarFile;
+ } catch (AttemptsExceededException e) {
+ LOGGER.error("Attempts exceeded when downloading segment: {} for table:
{} from: {} to: {}", segmentName,
+ _tableNameWithType, uri, tarFile);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
+ throw e;
+ }
+ }
+
+ @VisibleForTesting
+ File untarAndMoveSegment(String segmentName, File tarFile, File tempRootDir)
+ throws IOException {
+ File untarDir = new File(tempRootDir, segmentName);
+ try {
+ // If an exception is thrown when untarring, it means the tar file is
broken
+ // or not found after the retry. Thus, there's no need to retry again.
+ File untaredSegDir = TarGzCompressionUtils.untar(tarFile,
untarDir).get(0);
+ LOGGER.info("Uncompressed tar file: {} into target dir: {}", tarFile,
untarDir);
+ // Replace the existing index directory.
+ File indexDir = getSegmentDataDir(segmentName);
+ FileUtils.deleteDirectory(indexDir);
+ FileUtils.moveDirectory(untaredSegDir, indexDir);
+ LOGGER.info("Successfully downloaded segment: {} of table: {} to index
dir: {}", segmentName, _tableNameWithType,
+ indexDir);
+ return indexDir;
+ } catch (Exception e) {
+ LOGGER.error("Failed to untar segment: {} of table: {} from: {} to: {}",
segmentName, _tableNameWithType, tarFile,
+ untarDir);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.UNTAR_FAILURES, 1L);
+ throw e;
+ }
+ }
+
+ @VisibleForTesting
+ File getSegmentDataDir(String segmentName) {
+ return new File(_indexDir, segmentName);
+ }
+
+ @VisibleForTesting
+ static boolean isNewSegment(SegmentZKMetadata zkMetadata, @Nullable
SegmentMetadata localMetadata) {
+ return localMetadata == null || !hasSameCRC(zkMetadata, localMetadata);
+ }
+
+ private static boolean hasSameCRC(SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata) {
+ return zkMetadata.getCrc() == Long.parseLong(localMetadata.getCrc());
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 4cb2da2..66074a8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -80,15 +80,26 @@ public interface InstanceDataManager {
throws Exception;
/**
- * Reloads a segment in a table.
+ * Reloads a segment in a table. This method can download a new segment to
replace the local
+ * one before loading. Download happens when local segment's CRC mismatches
the one of
+ * the remote segment; but can also be forced to do regardless of CRC.
*/
- void reloadSegment(String tableNameWithType, String segmentName)
+ void reloadSegment(String tableNameWithType, String segmentName, boolean
forceDownload)
throws Exception;
/**
- * Reloads all segment in a table.
+ * Reloads all segments in a table.
*/
- void reloadAllSegments(String tableNameWithType)
+ void reloadAllSegments(String tableNameWithType, boolean forceDownload)
+ throws Exception;
+
+ /**
+ * Adds or replaces a segment in a table. Different from segment reloading,
this method
+ * doesn't assume the existence of TableDataManager object and it can
actually initialize
+ * the TableDataManager for the segment. A new segment is downloaded if the
local one is
+ * not working or has a different CRC from the remote one.
+ */
+ void addOrReplaceSegment(String tableNameWithType, String segmentName)
throws Exception;
/**
@@ -116,7 +127,7 @@ public interface InstanceDataManager {
/**
* Returns the directory for un-tarred segment data.
*/
- String getSegmentDataDirectory();
+ File getSegmentDataDirectory(String tableNameWithType, String segmentName);
/**
* Returns the directory for tarred segment files.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 931498a..f8d57ff 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -251,15 +251,12 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
* This call comes in one of two ways:
* For HL Segments:
* - We are being directed by helix to own up all the segments that we
committed and are still in retention. In
- * this case
- * we treat it exactly like how OfflineTableDataManager would -- wrap it
into an OfflineSegmentDataManager, and
- * put it
- * in the map.
+ * this case we treat it exactly like how OfflineTableDataManager would --
wrap it into an
+ * OfflineSegmentDataManager, and put it in the map.
* - We are being asked to own up a new realtime segment. In this case, we
wrap the segment with a
- * RealTimeSegmentDataManager
- * (that kicks off consumption). When the segment is committed we get
notified via the notifySegmentCommitted
- * call, at
- * which time we replace the segment with the OfflineSegmentDataManager
+ * RealTimeSegmentDataManager (that kicks off consumption). When the segment
is committed we get notified via the
+ * notifySegmentCommitted call, at which time we replace the segment with
the OfflineSegmentDataManager
+ *
* For LL Segments:
* - We are being asked to start consuming from a partition.
* - We did not know about the segment and are being asked to download and
own the segment (re-balancing, or
@@ -412,7 +409,17 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
partitionUpsertMetadataManager.addSegment(immutableSegment,
recordInfoIterator);
}
- public void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata
segmentZKMetadata,
+ @Override
+ protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
+ // Only LLC immutable segment allows download.
+ if (SegmentName.isHighLevelConsumerSegmentName(segmentName) ||
zkMetadata.getStatus() == Status.IN_PROGRESS) {
+ return false;
+ }
+ // TODO: may support download from peer servers as well.
+ return !METADATA_URI_FOR_PEER_DOWNLOAD.equals(zkMetadata.getDownloadUrl());
+ }
+
+ void downloadAndReplaceSegment(String segmentName, SegmentZKMetadata
segmentZKMetadata,
IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
String uri = segmentZKMetadata.getDownloadUrl();
if (!METADATA_URI_FOR_PEER_DOWNLOAD.equals(uri)) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
similarity index 94%
copy from
pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
copy to
pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 32edb2e..a34b569 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -49,14 +49,13 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
-public class BaseTableDataManagerTest {
-
- private static final Random RANDOM = new Random();
+public class BaseTableDataManagerAcquireSegmentTest {
private static final String TABLE_NAME = "testTable";
private static final String SEGMENT_PREFIX = "segment";
// Set once for the suite
private File _tmpDir;
+ private Random _random;
// Set once for every test
private volatile int _nDestroys;
@@ -82,6 +81,10 @@ public class BaseTableDataManagerTest {
throws Exception {
_tmpDir = File.createTempFile("OfflineTableDataManagerTest", null);
_tmpDir.deleteOnExit();
+
+ long seed = System.currentTimeMillis();
+ _random = new Random(seed);
+ System.out.printf("Record random seed: %d to reproduce test results upon
failure\n", seed);
}
@AfterSuite
@@ -229,18 +232,15 @@ public class BaseTableDataManagerTest {
// With the current parameters, 3k ops take about 15 seconds, create about
90 segments and drop about half of them
// Running with coverage, it provides complete coverage of the (relevant)
lines in OfflineTableDataManager
- Random random = new Random();
TableDataManager tableDataManager = makeTestableManager();
for (int i = _lo; i <= _hi; i++) {
final String segName = SEGMENT_PREFIX + i;
- tableDataManager.addSegment(makeImmutableSegment(segName,
random.nextInt()));
+ tableDataManager.addSegment(makeImmutableSegment(segName,
_random.nextInt()));
_allSegManagers.add(_internalSegMap.get(segName));
}
runStorageServer(numQueryThreads, runTimeSec, tableDataManager); //
replaces segments while online
-
-// System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys +
",nCreates=" + _allSegments.size());
tableDataManager.shutDown();
}
@@ -249,14 +249,15 @@ public class BaseTableDataManagerTest {
// Start 1 helix worker thread and as many query threads as configured.
List<Thread> queryThreads = new ArrayList<>(numQueryThreads);
for (int i = 0; i < numQueryThreads; i++) {
- BaseTableDataManagerTest.TestSegmentUser segUser = new
BaseTableDataManagerTest.TestSegmentUser(tableDataManager);
+ BaseTableDataManagerAcquireSegmentTest.TestSegmentUser segUser =
+ new
BaseTableDataManagerAcquireSegmentTest.TestSegmentUser(tableDataManager);
Thread segUserThread = new Thread(segUser);
queryThreads.add(segUserThread);
segUserThread.start();
}
- BaseTableDataManagerTest.TestHelixWorker helixWorker =
- new BaseTableDataManagerTest.TestHelixWorker(tableDataManager);
+ BaseTableDataManagerAcquireSegmentTest.TestHelixWorker helixWorker =
+ new
BaseTableDataManagerAcquireSegmentTest.TestHelixWorker(tableDataManager);
Thread helixWorkerThread = new Thread(helixWorker);
helixWorkerThread.start();
_masterThread = Thread.currentThread();
@@ -325,7 +326,7 @@ public class BaseTableDataManagerTest {
while (!_closing) {
try {
List<SegmentDataManager> segmentDataManagers = null;
- double probability = RANDOM.nextDouble();
+ double probability = _random.nextDouble();
if (probability <= ACQUIRE_ALL_PROBABILITY) {
segmentDataManagers = _tableDataManager.acquireAllSegments();
} else {
@@ -344,7 +345,7 @@ public class BaseTableDataManagerTest {
}
// To simulate real use case, may be we can add a small percent that
is returned right away after pruning?
try {
- int sleepTime = RANDOM.nextInt(_maxUseTimeMs - _minUseTimeMs + 1)
+ _minUseTimeMs;
+ int sleepTime = _random.nextInt(_maxUseTimeMs - _minUseTimeMs + 1)
+ _minUseTimeMs;
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
_closing = true;
@@ -366,7 +367,7 @@ public class BaseTableDataManagerTest {
Set<Integer> segmentIds = new HashSet<>(totalSegs);
final int nSegments = totalSegs * _nSegsPercent / 100;
while (segmentIds.size() != nSegments) {
- segmentIds.add(RANDOM.nextInt(totalSegs) + lo);
+ segmentIds.add(_random.nextInt(totalSegs) + lo);
}
return segmentIds;
}
@@ -394,7 +395,7 @@ public class BaseTableDataManagerTest {
public void run() {
while (!_closing) {
try {
- int nextInt = RANDOM.nextInt(100);
+ int nextInt = _random.nextInt(100);
if (nextInt < _removePercent) {
removeSegment();
} else if (nextInt < _removePercent + _replacePercent) {
@@ -403,7 +404,7 @@ public class BaseTableDataManagerTest {
addSegment();
}
try {
- int sleepTime = RANDOM.nextInt(_maxSleepMs - _minSleepMs + 1) +
_minSleepMs;
+ int sleepTime = _random.nextInt(_maxSleepMs - _minSleepMs + 1) +
_minSleepMs;
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
_closing = true;
@@ -419,16 +420,16 @@ public class BaseTableDataManagerTest {
private void addSegment() {
final int segmentToAdd = _hi + 1;
final String segName = SEGMENT_PREFIX + segmentToAdd;
- _tableDataManager.addSegment(makeImmutableSegment(segName,
RANDOM.nextInt()));
+ _tableDataManager.addSegment(makeImmutableSegment(segName,
_random.nextInt()));
_allSegManagers.add(_internalSegMap.get(segName));
_hi = segmentToAdd;
}
// Replace a segment between _lo and _hi
private void replaceSegment() {
- int segToReplace = RANDOM.nextInt(_hi - _lo + 1) + _lo;
+ int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo;
final String segName = SEGMENT_PREFIX + segToReplace;
- _tableDataManager.addSegment(makeImmutableSegment(segName,
RANDOM.nextInt()));
+ _tableDataManager.addSegment(makeImmutableSegment(segName,
_random.nextInt()));
_allSegManagers.add(_internalSegMap.get(segName));
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 32edb2e..1a5a3d5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -19,428 +19,382 @@
package org.apache.pinot.core.data.manager;
import java.io.File;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
+import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
-import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.testng.Assert;
-import org.testng.annotations.AfterSuite;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.spi.crypt.PinotCrypter;
+import org.apache.pinot.spi.crypt.PinotCrypterFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.*;
+import static
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY;
+import static
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY;
+import static
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class BaseTableDataManagerTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"OfflineTableDataManagerTest");
- private static final Random RANDOM = new Random();
- private static final String TABLE_NAME = "testTable";
- private static final String SEGMENT_PREFIX = "segment";
-
- // Set once for the suite
- private File _tmpDir;
-
- // Set once for every test
- private volatile int _nDestroys;
- private volatile boolean _closing;
- private Set<ImmutableSegment> _allSegments = new HashSet<>();
- private Set<SegmentDataManager> _accessedSegManagers =
- Collections.newSetFromMap(new ConcurrentHashMap<SegmentDataManager,
Boolean>());
- private Set<SegmentDataManager> _allSegManagers =
- Collections.newSetFromMap(new ConcurrentHashMap<SegmentDataManager,
Boolean>());
- private AtomicInteger _numQueries = new AtomicInteger(0);
- private Map<String, ImmutableSegmentDataManager> _internalSegMap;
- private Throwable _exception;
- private Thread _masterThread;
- // Segment numbers in place.
- // When we add a segment, we add hi+1, and bump _hi.
- // When we remove a segment, we remove _lo and bump _lo
- // When we replace a segment, we pick a number between _hi and _lo
(inclusive)
- private volatile int _lo;
- private volatile int _hi;
-
- @BeforeSuite
+ private static final String TABLE_NAME = "__table01__";
+
+ @BeforeMethod
public void setUp()
throws Exception {
- _tmpDir = File.createTempFile("OfflineTableDataManagerTest", null);
- _tmpDir.deleteOnExit();
+ TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+ initSegmentFetcher();
}
- @AfterSuite
- public void tearDown() {
- if (_tmpDir != null) {
- org.apache.commons.io.FileUtils.deleteQuietly(_tmpDir);
- }
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
}
- @BeforeMethod
- public void beforeMethod() {
- _nDestroys = 0;
- _closing = false;
- _allSegments.clear();
- _accessedSegManagers.clear();
- _allSegManagers.clear();
- _numQueries.set(0);
- _exception = null;
- _masterThread = null;
- }
+ private BaseTableDataManager makeTestableManager() {
+ TableDataManagerConfig config = mock(TableDataManagerConfig.class);
+ when(config.getTableName()).thenReturn(TABLE_NAME);
+ when(config.getDataDir()).thenReturn(new File(TEMP_DIR,
TABLE_NAME).getAbsolutePath());
- private TableDataManager makeTestableManager()
- throws Exception {
- TableDataManager tableDataManager = new OfflineTableDataManager();
- TableDataManagerConfig config;
- {
- config = mock(TableDataManagerConfig.class);
- when(config.getTableName()).thenReturn(TABLE_NAME);
- when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
- }
+ OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(config, "dummyInstance",
mock(ZkHelixPropertyStore.class),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null);
tableDataManager.start();
- Field segsMapField =
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
- segsMapField.setAccessible(true);
- _internalSegMap = (Map<String, ImmutableSegmentDataManager>)
segsMapField.get(tableDataManager);
return tableDataManager;
}
- private ImmutableSegment makeImmutableSegment(String segmentName, int
totalDocs) {
- ImmutableSegment immutableSegment = mock(ImmutableSegment.class);
- SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
- when(immutableSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
- when(immutableSegment.getSegmentName()).thenReturn(segmentName);
-
when(immutableSegment.getSegmentMetadata().getTotalDocs()).thenReturn(totalDocs);
- doAnswer(invocation -> {
- _nDestroys++;
- return null;
- }).when(immutableSegment).destroy();
- _allSegments.add(immutableSegment);
- return immutableSegment;
+ @Test
+ public void testReloadSegmentNewData()
+ throws Exception {
+ BaseTableDataManager tmgr = makeTestableManager();
+ File tempRootDir = tmgr.getSegmentDataDir("test-new-data");
+
+ // Create an empty segment and compress it to tar.gz as the one in deep
store.
+ // All input and intermediate files are put in the tempRootDir.
+ File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tempInputDir = new File(tempRootDir, "seg01_input");
+ FileUtils
+ .write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=remove");
+ TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+ FileUtils.deleteQuietly(tempInputDir);
+
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+ File indexDir = tmgr.getSegmentDataDir("seg01");
+ FileUtils.write(new File(indexDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+
+ // Different CRCs leading to segment download.
+ SegmentMetadata llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn("10240");
+ when(llmd.getIndexDir()).thenReturn(indexDir);
+
+ tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd,
null, false);
+ assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+ assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+ .contains("k=remove"));
}
@Test
- public void basicTest()
+ public void testReloadSegmentLocalCopy()
throws Exception {
- TableDataManager tableDataManager = makeTestableManager();
- Assert.assertEquals(tableDataManager.getNumSegments(), 0);
- final String segmentName = "TestSegment";
- final int totalDocs = 23456;
- // Add the segment, get it for use, remove the segment, and then return it.
- // Make sure that the segment is not destroyed before return.
- ImmutableSegment immutableSegment = makeImmutableSegment(segmentName,
totalDocs);
- tableDataManager.addSegment(immutableSegment);
- Assert.assertEquals(tableDataManager.getNumSegments(), 1);
- SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
- Assert.assertEquals(segmentDataManager.getReferenceCount(), 2);
- tableDataManager.removeSegment(segmentName);
- Assert.assertEquals(tableDataManager.getNumSegments(), 0);
- Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
- Assert.assertEquals(_nDestroys, 0);
- tableDataManager.releaseSegment(segmentDataManager);
- Assert.assertEquals(segmentDataManager.getReferenceCount(), 0);
- Assert.assertEquals(_nDestroys, 1);
-
- // Now the segment should not be available for use.Also, returning a null
reader is fine
- segmentDataManager = tableDataManager.acquireSegment(segmentName);
- Assert.assertNull(segmentDataManager);
- List<SegmentDataManager> segmentDataManagers =
tableDataManager.acquireAllSegments();
- Assert.assertEquals(segmentDataManagers.size(), 0);
-
- // Removing the segment again is fine.
- tableDataManager.removeSegment(segmentName);
- Assert.assertEquals(tableDataManager.getNumSegments(), 0);
-
- // Add a new segment and remove it in order this time.
- final String anotherSeg = "AnotherSegment";
- ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
- tableDataManager.addSegment(ix1);
- Assert.assertEquals(tableDataManager.getNumSegments(), 1);
- SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg);
- Assert.assertNotNull(sdm1);
- Assert.assertEquals(sdm1.getReferenceCount(), 2);
- // acquire all segments
- List<SegmentDataManager> segmentDataManagersList =
tableDataManager.acquireAllSegments();
- Assert.assertEquals(segmentDataManagersList.size(), 1);
- Assert.assertEquals(sdm1.getReferenceCount(), 3);
- for (SegmentDataManager dataManager : segmentDataManagersList) {
- tableDataManager.releaseSegment(dataManager);
- }
- // count is back to original
- Assert.assertEquals(sdm1.getReferenceCount(), 2);
- tableDataManager.releaseSegment(sdm1);
- Assert.assertEquals(sdm1.getReferenceCount(), 1);
- // Now replace the segment with another one.
- ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1);
- tableDataManager.addSegment(ix2);
- Assert.assertEquals(tableDataManager.getNumSegments(), 1);
- // Now the previous one should have been destroyed, and
- Assert.assertEquals(sdm1.getReferenceCount(), 0);
- verify(ix1, times(1)).destroy();
- // Delete ix2 without accessing it.
- SegmentDataManager sdm2 = _internalSegMap.get(anotherSeg);
- Assert.assertEquals(sdm2.getReferenceCount(), 1);
- tableDataManager.removeSegment(anotherSeg);
- Assert.assertEquals(tableDataManager.getNumSegments(), 0);
- Assert.assertEquals(sdm2.getReferenceCount(), 0);
- verify(ix2, times(1)).destroy();
- tableDataManager.shutDown();
+ BaseTableDataManager tmgr = makeTestableManager();
+ File tempRootDir = tmgr.getSegmentDataDir("test-local-copy");
+
+ // Create an empty segment and compress it to tar.gz as the one in deep
store.
+ // All input and intermediate files are put in the tempRootDir.
+ File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tempInputDir = new File(tempRootDir, "seg01_input");
+ FileUtils
+ .write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
+ TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+ FileUtils.deleteQuietly(tempInputDir);
+
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+ File indexDir = tmgr.getSegmentDataDir("seg01");
+ FileUtils.write(new File(indexDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+
+ // Same CRCs so load the local copy.
+ SegmentMetadata llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn("1024");
+ when(llmd.getIndexDir()).thenReturn(indexDir);
+
+ tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd,
null, false);
+ assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+ assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+ .contains("k=local"));
+ }
+
+ @Test
+ public void testReloadSegmentForceDownload()
+ throws Exception {
+ BaseTableDataManager tmgr = makeTestableManager();
+ File tempRootDir = tmgr.getSegmentDataDir("test-force-download");
+
+ // Create an empty segment and compress it to tar.gz as the one in deep
store.
+ // All input and intermediate files are put in the tempRootDir.
+ File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tempInputDir = new File(tempRootDir, "seg01_input");
+ FileUtils
+ .write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
+ TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+ FileUtils.deleteQuietly(tempInputDir);
+
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+ File indexDir = tmgr.getSegmentDataDir("seg01");
+ FileUtils.write(new File(indexDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+
+ // Same CRC but force to download
+ SegmentMetadata llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn("1024");
+ when(llmd.getIndexDir()).thenReturn(indexDir);
+
+ tmgr.reloadSegment("seg01", newDummyIndexLoadingConfig(), zkmd, llmd,
null, true);
+ assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+ assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+ .contains("k=remote"));
}
- /*
- * These tests simulate the access of segments via OfflineTableDataManager.
- *
- * It creates 31 segments (0..30) to start with and adds them to the
tableDataManager (hi = 30, lo = 0)
- * It spawns 10 "query" threads, and one "helix" thread.
- *
- * The query threads pick up a random of 70% the segments and 'get' them,
wait a random period of time (5 to 80ms)
- * and then 'release' the segments back, and does this in a continuous loop.
- *
- * The helix thread decides to do one of the following:
- * - Add a segment (hi+1), and bumps hi by 1 (does this 20% of the time)
- * - Remove a segment (lo) and bumps up lo by 1 (does this 20% of the time)
- * - Replaces a segment (a randomm one between (lo,hi), 60% of the time)
- * and then waits for a random of 50-300ms before attempting one of the ops
again.
- */
+ @Test
+ public void testAddOrReplaceSegmentNewData()
+ throws Exception {
+ BaseTableDataManager tmgr = makeTestableManager();
+ File tempRootDir = tmgr.getSegmentDataDir("test-new-data");
+
+ // Create an empty segment and compress it to tar.gz as the one in deep
store.
+ // All input and intermediate files are put in the tempRootDir.
+ File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tempInputDir = new File(tempRootDir, "seg01_input");
+ FileUtils.write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01");
+ TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+ FileUtils.deleteQuietly(tempInputDir);
+
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+ // Different CRCs leading to segment download.
+ SegmentMetadata llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn("10240");
+
+ assertFalse(tmgr.getSegmentDataDir("seg01").exists());
+ tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd,
llmd);
+ assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+ assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+ .contains("docs=0"));
+ }
@Test
- public void testReplace()
+ public void testAddOrReplaceSegmentNoop()
throws Exception {
- _lo = 0;
- _hi = 30; // Total number of segments we have in the server.
- final int numQueryThreads = 10;
- final int runTimeSec = 20;
- // With the current parameters, 3k ops take about 15 seconds, create about
90 segments and drop about half of them
- // Running with coverage, it provides complete coverage of the (relevant)
lines in OfflineTableDataManager
-
- Random random = new Random();
- TableDataManager tableDataManager = makeTestableManager();
-
- for (int i = _lo; i <= _hi; i++) {
- final String segName = SEGMENT_PREFIX + i;
- tableDataManager.addSegment(makeImmutableSegment(segName,
random.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
- }
+ BaseTableDataManager tmgr = makeTestableManager();
+
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
- runStorageServer(numQueryThreads, runTimeSec, tableDataManager); //
replaces segments while online
+ SegmentMetadata llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn("1024");
-// System.out.println("Nops = " + _numQueries + ",nDrops=" + _nDestroys +
",nCreates=" + _allSegments.size());
- tableDataManager.shutDown();
+ assertFalse(tmgr.getSegmentDataDir("seg01").exists());
+ tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd,
llmd);
+ // As CRC is same, the index dir is left as is, so not get created by the
test.
+ assertFalse(tmgr.getSegmentDataDir("seg01").exists());
}
- private void runStorageServer(int numQueryThreads, int runTimeSec,
TableDataManager tableDataManager)
+ @Test
+ public void testAddOrReplaceSegmentRecovered()
throws Exception {
- // Start 1 helix worker thread and as many query threads as configured.
- List<Thread> queryThreads = new ArrayList<>(numQueryThreads);
- for (int i = 0; i < numQueryThreads; i++) {
- BaseTableDataManagerTest.TestSegmentUser segUser = new
BaseTableDataManagerTest.TestSegmentUser(tableDataManager);
- Thread segUserThread = new Thread(segUser);
- queryThreads.add(segUserThread);
- segUserThread.start();
- }
+ BaseTableDataManager tmgr = makeTestableManager();
- BaseTableDataManagerTest.TestHelixWorker helixWorker =
- new BaseTableDataManagerTest.TestHelixWorker(tableDataManager);
- Thread helixWorkerThread = new Thread(helixWorker);
- helixWorkerThread.start();
- _masterThread = Thread.currentThread();
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ // Make this equal to the default crc value, so no need to make a dummy
creation.meta file.
+ when(zkmd.getCrc()).thenReturn(Long.MIN_VALUE);
- try {
- Thread.sleep(runTimeSec * 1000);
- } catch (InterruptedException e) {
+ File backup = tmgr.getSegmentDataDir("seg01" +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ FileUtils.write(new File(backup, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01");
- }
- _closing = true;
+ assertFalse(tmgr.getSegmentDataDir("seg01").exists());
+ tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd,
null);
+ assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+ assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+ .contains("docs=0"));
+ }
- helixWorkerThread.join();
- for (Thread t : queryThreads) {
- t.join();
- }
+ @Test
+ public void testAddOrReplaceSegmentNotRecovered()
+ throws Exception {
+ BaseTableDataManager tmgr = makeTestableManager();
+ File tempRootDir = tmgr.getSegmentDataDir("test-force-download");
+
+ // Create an empty segment and compress it to tar.gz as the one in deep
store.
+ // All input and intermediate files are put in the tempRootDir.
+ File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tempInputDir = new File(tempRootDir, "seg01_input");
+ FileUtils
+ .write(new File(tempInputDir, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=remote");
+ TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+ FileUtils.deleteQuietly(tempInputDir);
+
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempTar.getAbsolutePath());
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+
+ // Though can recover from backup, but CRC is different. Local CRC is
Long.MIN_VALUE.
+ File backup = tmgr.getSegmentDataDir("seg01" +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ FileUtils.write(new File(backup, "metadata.properties"),
"segment.total.docs=0\nsegment.name=seg01\nk=local");
+
+ assertFalse(tmgr.getSegmentDataDir("seg01").exists());
+ tmgr.addOrReplaceSegment("seg01", newDummyIndexLoadingConfig(), zkmd,
null);
+ assertTrue(tmgr.getSegmentDataDir("seg01").exists());
+ assertTrue(FileUtils.readFileToString(new
File(tmgr.getSegmentDataDir("seg01"), "metadata.properties"))
+ .contains("k=remote"));
+ }
- if (_exception != null) {
- Assert.fail("One of the threads failed", _exception);
- }
+ @Test
+ public void testDownloadAndDecrypt()
+ throws Exception {
+ File tempInput = new File(TEMP_DIR, "tmp.txt");
+ FileUtils.write(tempInput, "this is from somewhere remote");
- // tableDataManager should be quiescent now.
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getDownloadUrl()).thenReturn("file://" +
tempInput.getAbsolutePath());
- // All segments we ever created must have a corresponding segment manager.
- Assert.assertEquals(_allSegManagers.size(), _allSegments.size());
+ BaseTableDataManager tmgr = makeTestableManager();
+ File tempRootDir = tmgr.getSegmentDataDir("test-download-decrypt");
- final int nSegsAcccessed = _accessedSegManagers.size();
- for (SegmentDataManager segmentDataManager : _internalSegMap.values()) {
- Assert.assertEquals(segmentDataManager.getReferenceCount(), 1);
- // We should never have called destroy on these segments. Remove it from
the list of accessed segments.
- verify(segmentDataManager.getSegment(), never()).destroy();
- _allSegManagers.remove(segmentDataManager);
- _accessedSegManagers.remove(segmentDataManager);
- }
+ File tarFile = tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
+ assertEquals(FileUtils.readFileToString(tarFile), "this is from somewhere
remote");
- // For the remaining segments in accessed list, destroy must have been
called exactly once.
- for (SegmentDataManager segmentDataManager : _allSegManagers) {
- verify(segmentDataManager.getSegment(), times(1)).destroy();
- // Also their count should be 0
- Assert.assertEquals(segmentDataManager.getReferenceCount(), 0);
- }
+ when(zkmd.getCrypterName()).thenReturn("fakePinotCrypter");
+ tarFile = tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
+ assertEquals(FileUtils.readFileToString(tarFile), "this is from somewhere
remote");
- // The number of segments we accessed must be <= total segments created.
- Assert.assertTrue(nSegsAcccessed <= _allSegments.size(),
- "Accessed=" + nSegsAcccessed + ",created=" + _allSegments.size());
- // The number of segments we have seen and that are not there anymore,
must be <= number destroyed.
- Assert.assertTrue(_accessedSegManagers.size() <= _nDestroys,
- "SeenButUnavailableNow=" + _accessedSegManagers.size() + ",Destroys="
+ _nDestroys);
+ FakePinotCrypter fakeCrypter = (FakePinotCrypter)
PinotCrypterFactory.create("fakePinotCrypter");
+
assertTrue(fakeCrypter._origFile.getAbsolutePath().endsWith("__table01__/test-download-decrypt/seg01.tar.gz.enc"));
+
assertTrue(fakeCrypter._decFile.getAbsolutePath().endsWith("__table01__/test-download-decrypt/seg01.tar.gz"));
- // The current number of segments must be the as expected (hi-lo+1)
- Assert.assertEquals(_internalSegMap.size(), _hi - _lo + 1);
+ try {
+ // Set maxRetry to 0 to cause retry failure immediately.
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(RETRY_COUNT_CONFIG_KEY, 0);
+ SegmentFetcherFactory.init(new PinotConfiguration(properties));
+ tmgr.downloadAndDecrypt("seg01", zkmd, tempRootDir);
+ fail();
+ } catch (AttemptsExceededException e) {
+ assertEquals(e.getMessage(), "Operation failed after 0 attempts");
+ }
}
- private class TestSegmentUser implements Runnable {
- private static final double ACQUIRE_ALL_PROBABILITY = 0.20;
- private final int _minUseTimeMs = 5;
- private final int _maxUseTimeMs = 80;
- private final int _nSegsPercent = 70; // We use 70% of the segments for
any query.
- private final TableDataManager _tableDataManager;
-
- private TestSegmentUser(TableDataManager tableDataManager) {
- _tableDataManager = tableDataManager;
- }
+ @Test
+ public void testUntarAndMoveSegment()
+ throws IOException {
+ BaseTableDataManager tmgr = makeTestableManager();
+ File tempRootDir = tmgr.getSegmentDataDir("test-untar-move");
+
+ // All input and intermediate files are put in the tempRootDir.
+ File tempTar = new File(tempRootDir, "seg01" +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+ File tempInputDir = new File(tempRootDir, "seg01_input");
+ FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment
dir");
+ TarGzCompressionUtils.createTarGzFile(tempInputDir, tempTar);
+ FileUtils.deleteQuietly(tempInputDir);
+
+ // The destination is the segment directory at the same level of
tempRootDir.
+ File indexDir = tmgr.untarAndMoveSegment("seg01", tempTar, tempRootDir);
+ assertEquals(indexDir, tmgr.getSegmentDataDir("seg01"));
+ assertEquals(FileUtils.readFileToString(new File(indexDir, "tmp.txt")),
"this is in segment dir");
- @Override
- public void run() {
- while (!_closing) {
- try {
- List<SegmentDataManager> segmentDataManagers = null;
- double probability = RANDOM.nextDouble();
- if (probability <= ACQUIRE_ALL_PROBABILITY) {
- segmentDataManagers = _tableDataManager.acquireAllSegments();
- } else {
- Set<Integer> segmentIds = pickSegments();
- List<String> segmentList = new ArrayList<>(segmentIds.size());
- for (Integer segmentId : segmentIds) {
- segmentList.add(SEGMENT_PREFIX + segmentId);
- }
- segmentDataManagers =
_tableDataManager.acquireSegments(segmentList);
- }
- // Some of them may be rejected, but that is OK.
-
- // Keep track of all segment data managers we ever accessed.
- for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- _accessedSegManagers.add(segmentDataManager);
- }
- // To simulate real use case, may be we can add a small percent that
is returned right away after pruning?
- try {
- int sleepTime = RANDOM.nextInt(_maxUseTimeMs - _minUseTimeMs + 1)
+ _minUseTimeMs;
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- _closing = true;
- }
- for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- _tableDataManager.releaseSegment(segmentDataManager);
- }
- } catch (Throwable t) {
- _masterThread.interrupt();
- _exception = t;
- }
- }
+ try {
+ tmgr.untarAndMoveSegment("seg01", new File(tempRootDir, "unknown.txt"),
TEMP_DIR);
+ fail();
+ } catch (Exception e) {
+ // expected.
}
+ }
- private Set<Integer> pickSegments() {
- int hi = _hi;
- int lo = _lo;
- int totalSegs = hi - lo + 1;
- Set<Integer> segmentIds = new HashSet<>(totalSegs);
- final int nSegments = totalSegs * _nSegsPercent / 100;
- while (segmentIds.size() != nSegments) {
- segmentIds.add(RANDOM.nextInt(totalSegs) + lo);
- }
- return segmentIds;
- }
+ @Test
+ public void testIsNewSegmentMetadata()
+ throws IOException {
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getCrc()).thenReturn(Long.valueOf(1024));
+ assertTrue(BaseTableDataManager.isNewSegment(zkmd, null));
+
+ SegmentMetadata llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn("1024");
+ assertFalse(BaseTableDataManager.isNewSegment(zkmd, llmd));
+
+ llmd = mock(SegmentMetadata.class);
+ when(llmd.getCrc()).thenReturn("10245");
+ assertTrue(BaseTableDataManager.isNewSegment(zkmd, llmd));
}
- private class TestHelixWorker implements Runnable {
- private final int _removePercent;
- private final int _replacePercent;
- private final int _addPercent;
- private final int _minSleepMs;
- private final int _maxSleepMs;
- private final TableDataManager _tableDataManager;
-
- private TestHelixWorker(TableDataManager tableDataManager) {
- _tableDataManager = tableDataManager;
-
- _removePercent = 20;
- _addPercent = 20;
- _replacePercent = 60;
- _minSleepMs = 50;
- _maxSleepMs = 300;
- }
+ // Has to be public class for the class loader to work.
+ public static class FakePinotCrypter implements PinotCrypter {
+ private File _origFile;
+ private File _decFile;
@Override
- public void run() {
- while (!_closing) {
- try {
- int nextInt = RANDOM.nextInt(100);
- if (nextInt < _removePercent) {
- removeSegment();
- } else if (nextInt < _removePercent + _replacePercent) {
- replaceSegment();
- } else {
- addSegment();
- }
- try {
- int sleepTime = RANDOM.nextInt(_maxSleepMs - _minSleepMs + 1) +
_minSleepMs;
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- _closing = true;
- }
- } catch (Throwable t) {
- _masterThread.interrupt();
- _exception = t;
- }
- }
+ public void init(PinotConfiguration config) {
}
- // Add segment _hi + 1,bump hi.
- private void addSegment() {
- final int segmentToAdd = _hi + 1;
- final String segName = SEGMENT_PREFIX + segmentToAdd;
- _tableDataManager.addSegment(makeImmutableSegment(segName,
RANDOM.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
- _hi = segmentToAdd;
+ @Override
+ public void encrypt(File origFile, File encFile) {
}
- // Replace a segment between _lo and _hi
- private void replaceSegment() {
- int segToReplace = RANDOM.nextInt(_hi - _lo + 1) + _lo;
- final String segName = SEGMENT_PREFIX + segToReplace;
- _tableDataManager.addSegment(makeImmutableSegment(segName,
RANDOM.nextInt()));
- _allSegManagers.add(_internalSegMap.get(segName));
+ @Override
+ public void decrypt(File origFile, File decFile) {
+ _origFile = origFile;
+ _decFile = decFile;
}
+ }
- // Remove the segment _lo and then bump _lo
- private void removeSegment() {
- // Keep at least one segment in place.
- if (_hi > _lo) {
- _tableDataManager.removeSegment(SEGMENT_PREFIX + _lo);
- _lo++;
- } else {
- addSegment();
- }
- }
+ private static void initSegmentFetcher()
+ throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(RETRY_COUNT_CONFIG_KEY, 3);
+ properties.put(RETRY_WAIT_MS_CONFIG_KEY, 100);
+ properties.put(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, 5);
+ SegmentFetcherFactory.init(new PinotConfiguration(properties));
+
+ // Setup crypter
+ properties.put("class.fakePinotCrypter", FakePinotCrypter.class.getName());
+ PinotCrypterFactory.init(new PinotConfiguration(properties));
+ }
+
+ private static IndexLoadingConfig newDummyIndexLoadingConfig() {
+ IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
+ when(indexLoadingConfig.getReadMode()).thenReturn(ReadMode.mmap);
+ when(indexLoadingConfig.getSegmentVersion()).thenReturn(SegmentVersion.v3);
+ return indexLoadingConfig;
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
new file mode 100644
index 0000000..e2afd3c
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.HLCSegmentName;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.testng.annotations.Test;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class RealtimeTableDataManagerTest {
+ @Test
+ public void testAllowDownload() {
+ RealtimeTableDataManager mgr = new RealtimeTableDataManager(null);
+
+ String groupId = "myTable_REALTIME_1234567_0";
+ String partitionRange = "ALL";
+ String sequenceNumber = "1234567";
+ HLCSegmentName hlc = new HLCSegmentName(groupId, partitionRange,
sequenceNumber);
+ assertFalse(mgr.allowDownload(hlc.getSegmentName(), null));
+
+ LLCSegmentName llc = new LLCSegmentName("tbl01", 0, 1000000,
System.currentTimeMillis());
+ SegmentZKMetadata zkmd = mock(SegmentZKMetadata.class);
+ when(zkmd.getStatus()).thenReturn(Status.IN_PROGRESS);
+ assertFalse(mgr.allowDownload(llc.getSegmentName(), zkmd));
+
+ when(zkmd.getStatus()).thenReturn(Status.DONE);
+ when(zkmd.getDownloadUrl()).thenReturn("");
+ assertFalse(mgr.allowDownload(llc.getSegmentName(), zkmd));
+
+ when(zkmd.getDownloadUrl()).thenReturn("remote");
+ assertTrue(mgr.allowDownload(llc.getSegmentName(), zkmd));
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 4078bda..36634db 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -27,11 +27,14 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.Pair;
@@ -79,6 +82,27 @@ public interface TableDataManager {
throws Exception;
/**
+ * Reloads an existing immutable segment for the table, which can be an
OFFLINE or REALTIME table.
+ * A new segment may be downloaded if the local one has a different CRC; or
can be forced to download
+ * if forceDownload flag is true. This operation is conducted within a
failure handling framework
+ * and made transparent to ongoing queries, because the segment is in online
serving state.
+ */
+ void reloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
+ SegmentMetadata localMetadata, @Nullable Schema schema, boolean
forceDownload)
+ throws Exception;
+
+ /**
+ * Adds or replaces an immutable segment for the table, which can be an
OFFLINE or REALTIME table.
+ * A new segment may be downloaded if the local one has a different CRC or
doesn't work as expected.
+ * This operation is conducted outside the failure handling framework as
used in segment reloading,
+ * because the segment is not yet online serving queries, e.g. this method
is used to add a new segment,
+ * or transition a segment to online serving state.
+ */
+ void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
+ @Nullable SegmentMetadata localMetadata)
+ throws Exception;
+
+ /**
* Removes a segment from the table.
*/
void removeSegment(String segmentName);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index d83ed50..e58d020 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -54,6 +54,7 @@ import
org.apache.pinot.common.restlet.resources.SystemResourceInfo;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.ServiceStatus.Status;
import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -70,6 +71,7 @@ import
org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.server.starter.ServerQueriesDisabledTracker;
+import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.environmentprovider.PinotEnvironmentProvider;
import
org.apache.pinot.spi.environmentprovider.PinotEnvironmentProviderFactory;
@@ -375,10 +377,9 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_serverInstance = new ServerInstance(serverInstanceConfig, _helixManager);
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
- SegmentFetcherAndLoader fetcherAndLoader =
- new SegmentFetcherAndLoader(_serverConf, instanceDataManager,
serverMetrics);
+ initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
- new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager, fetcherAndLoader);
+ new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager);
_helixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
stateModelFactory);
// Start the server instance as a pre-connect callback so that it starts
after connecting to the ZK in order to
@@ -448,7 +449,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
// Register message handler factory
SegmentMessageHandlerFactory messageHandlerFactory =
- new SegmentMessageHandlerFactory(fetcherAndLoader,
instanceDataManager, serverMetrics);
+ new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
_helixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
messageHandlerFactory);
@@ -708,4 +709,23 @@ public abstract class BaseServerStarter implements
ServiceStartable {
instanceConfig.getRecord().setMapField(Helix.Instance.SYSTEM_RESOURCE_INFO_KEY,
systemResourceMap);
helixAdmin.setInstanceConfig(helixClusterName, instanceId, instanceConfig);
}
+
+ /**
+ * Initialize the components to download segments from deep store. They used
to be
+ * initialized in SegmentFetcherAndLoader, which has been removed to
consolidate
+ * segment download functionality for both Offline and Realtime tables. So
those
+ * components are initialized where SegmentFetcherAndLoader was initialized.
+ */
+ private void initSegmentFetcher(PinotConfiguration config)
+ throws Exception {
+ PinotConfiguration pinotFSConfig =
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
+ PinotFSFactory.init(pinotFSConfig);
+
+ PinotConfiguration segmentFetcherFactoryConfig =
+
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
+ SegmentFetcherFactory.init(segmentFetcherFactoryConfig);
+
+ PinotConfiguration pinotCrypterConfig =
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
+ PinotCrypterFactory.init(pinotCrypterConfig);
+ }
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 78aee36..3cf0bbb 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -32,11 +32,11 @@ import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -47,11 +47,8 @@ import
org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
-import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -64,8 +61,6 @@ import org.slf4j.LoggerFactory;
/**
* The class <code>HelixInstanceDataManager</code> is the instance data
manager based on Helix.
- *
- * TODO: move SegmentFetcherAndLoader into this class to make this the top
level manager
*/
@ThreadSafe
public class HelixInstanceDataManager implements InstanceDataManager {
@@ -192,7 +187,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
}
@Override
- public void reloadSegment(String tableNameWithType, String segmentName)
+ public void reloadSegment(String tableNameWithType, String segmentName,
boolean forceDownload)
throws Exception {
LOGGER.info("Reloading single segment: {} in table: {}", segmentName,
tableNameWithType);
SegmentMetadata segmentMetadata = getSegmentMetadata(tableNameWithType,
segmentName);
@@ -206,13 +201,13 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableNameWithType);
- reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema);
+ reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema,
forceDownload);
LOGGER.info("Reloaded single segment: {} in table: {}", segmentName,
tableNameWithType);
}
@Override
- public void reloadAllSegments(String tableNameWithType)
+ public void reloadAllSegments(String tableNameWithType, boolean
forceDownload)
throws Exception {
LOGGER.info("Reloading all segments in table: {}", tableNameWithType);
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
@@ -221,17 +216,18 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableNameWithType);
for (SegmentMetadata segmentMetadata :
getAllSegmentsMetadata(tableNameWithType)) {
- reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema);
+ reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema,
forceDownload);
}
LOGGER.info("Reloaded all segments in table: {}", tableNameWithType);
}
private void reloadSegment(String tableNameWithType, SegmentMetadata
segmentMetadata, TableConfig tableConfig,
- @Nullable Schema schema)
+ @Nullable Schema schema, boolean forceDownload)
throws Exception {
String segmentName = segmentMetadata.getName();
- LOGGER.info("Reloading segment: {} in table: {}", segmentName,
tableNameWithType);
+ LOGGER.info("Reloading segment: {} in table: {} with forceDownload: {}",
segmentName, tableNameWithType,
+ forceDownload);
TableDataManager tableDataManager =
_tableDataManagerMap.get(tableNameWithType);
if (tableDataManager == null) {
@@ -259,53 +255,49 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
return;
}
- Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is
not a directory", indexDir);
- File parentFile = indexDir.getParentFile();
- File segmentBackupDir =
- new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
+ SegmentZKMetadata zkMetadata =
+ ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentName);
+ Preconditions.checkNotNull(zkMetadata);
// This method might modify the file on disk. Use segment lock to prevent
race condition
Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType,
segmentName);
try {
segmentLock.lock();
- // First rename index directory to segment backup directory so that
original segment have all file descriptors
- // point to the segment backup directory to ensure original segment
serves queries properly
-
- // Rename index directory to segment backup directory (atomic)
- Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
- "Failed to rename index directory: %s to segment backup directory:
%s", indexDir, segmentBackupDir);
+ // Reloads an existing segment, and the local segment metadata is
existing as asserted above.
+ tableDataManager.reloadSegment(segmentName, new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig),
+ zkMetadata, segmentMetadata, schema, forceDownload);
+ LOGGER.info("Reloaded segment: {} of table: {}", segmentName,
tableNameWithType);
+ } finally {
+ segmentLock.unlock();
+ }
+ }
- // Copy from segment backup directory back to index directory
- FileUtils.copyDirectory(segmentBackupDir, indexDir);
+ @Override
+ public void addOrReplaceSegment(String tableNameWithType, String segmentName)
+ throws Exception {
+ LOGGER.info("Adding or replacing segment: {} for table: {}", segmentName,
tableNameWithType);
- // Load from index directory
- ImmutableSegment immutableSegment = ImmutableSegmentLoader
- .load(indexDir, new IndexLoadingConfig(_instanceDataManagerConfig,
tableConfig), schema);
+ // Get updated table config and segment metadata from Zookeeper.
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Preconditions.checkNotNull(tableConfig);
+ SegmentZKMetadata zkMetadata =
+ ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentName);
+ Preconditions.checkNotNull(zkMetadata);
- // Replace the old segment in memory
- tableDataManager.addSegment(immutableSegment);
+ // This method might modify the file on disk. Use segment lock to prevent
race condition
+ Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType,
segmentName);
+ try {
+ segmentLock.lock();
- // Rename segment backup directory to segment temporary directory
(atomic)
- // The reason to first rename then delete is that, renaming is an atomic
operation, but deleting is not. When we
- // rename the segment backup directory to segment temporary directory,
we know the reload already succeeded, so
- // that we can safely delete the segment temporary directory
- File segmentTempDir = new File(parentFile, indexDir.getName() +
CommonConstants.Segment.SEGMENT_TEMP_DIR_SUFFIX);
- Preconditions.checkState(segmentBackupDir.renameTo(segmentTempDir),
- "Failed to rename segment backup directory: %s to segment temporary
directory: %s", segmentBackupDir,
- segmentTempDir);
- LOGGER.info("Reloaded segment: {} in table: {}", segmentName,
tableNameWithType);
+ // But if table mgr is not created or the segment is not loaded yet, the
localMetadata
+ // is set to null. Then, addOrReplaceSegment method will load the
segment accordingly.
+ SegmentMetadata localMetadata = getSegmentMetadata(tableNameWithType,
segmentName);
- // Delete segment temporary directory
- FileUtils.deleteDirectory(segmentTempDir);
- } catch (Exception reloadFailureException) {
- try {
- LoaderUtils.reloadFailureRecovery(indexDir);
- } catch (Exception recoveryFailureException) {
- LOGGER.error("Failed to recover after reload failure",
recoveryFailureException);
- reloadFailureException.addSuppressed(recoveryFailureException);
- }
- throw reloadFailureException;
+ _tableDataManagerMap.computeIfAbsent(tableNameWithType, k ->
createTableDataManager(k, tableConfig))
+ .addOrReplaceSegment(segmentName, new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig),
+ zkMetadata, localMetadata);
+ LOGGER.info("Added or replaced segment: {} of table: {}", segmentName,
tableNameWithType);
} finally {
segmentLock.unlock();
}
@@ -361,9 +353,13 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
}
}
+ /**
+ * Assemble the path to segment dir directly, when table mgr object is not
+ * created for the given table yet.
+ */
@Override
- public String getSegmentDataDirectory() {
- return _instanceDataManagerConfig.getInstanceDataDir();
+ public File getSegmentDataDirectory(String tableNameWithType, String
segmentName) {
+ return new File(new File(_instanceDataManagerConfig.getInstanceDataDir(),
tableNameWithType), segmentName);
}
@Override
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
deleted file mode 100644
index 9a4ede4..0000000
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.starter.helix;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
-import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.crypt.PinotCrypter;
-import org.apache.pinot.spi.crypt.PinotCrypterFactory;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class SegmentFetcherAndLoader {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentFetcherAndLoader.class);
-
- private static final String TAR_GZ_SUFFIX = ".tar.gz";
- private static final String ENCODED_SUFFIX = ".enc";
-
- private final InstanceDataManager _instanceDataManager;
- private final ServerMetrics _serverMetrics;
-
- public SegmentFetcherAndLoader(PinotConfiguration config,
InstanceDataManager instanceDataManager,
- ServerMetrics serverMetrics)
- throws Exception {
- _instanceDataManager = instanceDataManager;
- _serverMetrics = serverMetrics;
-
- PinotConfiguration pinotFSConfig =
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
- PinotConfiguration segmentFetcherFactoryConfig =
-
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
- PinotConfiguration pinotCrypterConfig =
config.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
-
- PinotFSFactory.init(pinotFSConfig);
- SegmentFetcherFactory.init(segmentFetcherFactoryConfig);
- PinotCrypterFactory.init(pinotCrypterConfig);
- }
-
- public void replaceAllOfflineSegments(String tableNameWithType) {
- LOGGER.info("Replacing all segments in table: {}", tableNameWithType);
- List<SegmentMetadata> segMds =
_instanceDataManager.getAllSegmentsMetadata(tableNameWithType);
- for (SegmentMetadata segMd : segMds) {
- addOrReplaceOfflineSegment(tableNameWithType, segMd.getName(), true);
- }
- LOGGER.info("Replaced all segments in table: {}", tableNameWithType);
- }
-
- public void addOrReplaceOfflineSegment(String tableNameWithType, String
segmentName) {
- addOrReplaceOfflineSegment(tableNameWithType, segmentName, false);
- }
-
- /**
- * Add a new segment or replace an existing segment for offline table. The
method checks
- * the local segment CRC with the one set in ZK by controller. If both
equal, the method
- * simply loads the local segment, otherwise it downloads the new segment
then load. When
- * forceDownload is set to true, the server always downloads the segment.
- */
- public void addOrReplaceOfflineSegment(String tableNameWithType, String
segmentName, boolean forceDownload) {
- SegmentZKMetadata newSegmentZKMetadata = ZKMetadataProvider
- .getSegmentZKMetadata(_instanceDataManager.getPropertyStore(),
tableNameWithType, segmentName);
- Preconditions.checkNotNull(newSegmentZKMetadata);
-
- LOGGER.info("Adding or replacing segment {} for table {}, metadata {},
downloadIsForced {}", segmentName,
- tableNameWithType, newSegmentZKMetadata, forceDownload);
-
- // This method might modify the file on disk. Use segment lock to prevent
race condition
- Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType,
segmentName);
- try {
- segmentLock.lock();
-
- // We lock the segment in order to get its metadata, and then release
the lock, so it is possible
- // that the segment is dropped after we get its metadata.
- SegmentMetadata localSegmentMetadata =
_instanceDataManager.getSegmentMetadata(tableNameWithType, segmentName);
-
- if (localSegmentMetadata == null) {
- LOGGER.info("Segment {} of table {} is not loaded in memory, checking
disk", segmentName, tableNameWithType);
- File indexDir = new File(getSegmentLocalDirectory(tableNameWithType,
segmentName));
- // Restart during segment reload might leave segment in inconsistent
state (index directory might not exist but
- // segment backup directory existed), need to first try to recover
from reload failure before checking the
- // existence of the index directory and loading segment metadata from
it
- LoaderUtils.reloadFailureRecovery(indexDir);
- if (indexDir.exists()) {
- LOGGER.info("Segment {} of table {} found on disk, attempting to
load it", segmentName, tableNameWithType);
- try {
- localSegmentMetadata = new SegmentMetadataImpl(indexDir);
- LOGGER.info("Found segment {} of table {} with crc {} on disk",
segmentName, tableNameWithType,
- localSegmentMetadata.getCrc());
- } catch (Exception e) {
- // The localSegmentDir should help us get the table name,
- LOGGER.error("Failed to load segment metadata from {}. Deleting
it.", indexDir, e);
- FileUtils.deleteQuietly(indexDir);
- localSegmentMetadata = null;
- }
- try {
- if (!forceDownload && !isNewSegmentMetadata(tableNameWithType,
newSegmentZKMetadata,
- localSegmentMetadata)) {
- LOGGER.info("Segment metadata same as before, loading {} of
table {} (crc {}) from disk", segmentName,
- tableNameWithType, localSegmentMetadata.getCrc());
- _instanceDataManager.addOfflineSegment(tableNameWithType,
segmentName, indexDir);
- // TODO Update zk metadata with CRC for this instance
- return;
- }
- } catch (Exception e) {
- LOGGER
- .error("Failed to load {} of table {} from local, will try to
reload it from controller!", segmentName,
- tableNameWithType, e);
- FileUtils.deleteQuietly(indexDir);
- localSegmentMetadata = null;
- }
- }
- }
- // There is a very unlikely race condition that we may have gotten the
metadata of a
- // segment that was not dropped when we checked, but was dropped after
the check above.
- // That is possible only if we get two helix transitions (to drop, and
then to add back) the
- // segment at the same, or very close to each other.If the race
condition triggers, and the
- // two segments are same in metadata, then we may end up NOT adding back
the segment
- // that is in the process of being dropped.
-
- // If we get here, then either it is the case that we have the segment
loaded in memory (and therefore present
- // in disk) or, we need to load from the server. In the former case, we
still need to check if the metadata
- // that we have is different from that in zookeeper.
- if (forceDownload || isNewSegmentMetadata(tableNameWithType,
newSegmentZKMetadata, localSegmentMetadata)) {
- if (forceDownload) {
- LOGGER.info("Force to download segment {} of table {} from
controller.", segmentName, tableNameWithType);
- } else if (localSegmentMetadata == null) {
- LOGGER.info("Loading new segment {} of table {} from controller",
segmentName, tableNameWithType);
- } else {
- LOGGER.info("Trying to refresh segment {} of table {} with new
data.", segmentName, tableNameWithType);
- }
- String uri = newSegmentZKMetadata.getDownloadUrl();
- String crypterName = newSegmentZKMetadata.getCrypterName();
- PinotCrypter crypter = (crypterName != null) ?
PinotCrypterFactory.create(crypterName) : null;
-
- // Retry will be done here.
- String localSegmentDir = downloadSegmentToLocal(uri, crypter,
tableNameWithType, segmentName);
- SegmentMetadata segmentMetadata = new SegmentMetadataImpl(new
File(localSegmentDir));
- _instanceDataManager.addOfflineSegment(tableNameWithType, segmentName,
new File(localSegmentDir));
- LOGGER.info("Downloaded segment {} of table {} crc {} from
controller", segmentName, tableNameWithType,
- segmentMetadata.getCrc());
- } else {
- LOGGER.info("Got already loaded segment {} of table {} crc {} again,
will do nothing.", segmentName,
- tableNameWithType, localSegmentMetadata.getCrc());
- }
- } catch (final Exception e) {
- LOGGER.error("Cannot load segment : " + segmentName + " for table " +
tableNameWithType, e);
- Utils.rethrowException(e);
- throw new AssertionError("Should not reach this");
- } finally {
- segmentLock.unlock();
- }
- }
-
- private boolean isNewSegmentMetadata(String tableNameWithType,
SegmentZKMetadata newSegmentZKMetadata,
- @Nullable SegmentMetadata existedSegmentMetadata) {
- String segmentName = newSegmentZKMetadata.getSegmentName();
-
- if (existedSegmentMetadata == null) {
- LOGGER.info("Existed segment metadata is null for segment: {} in table:
{}", segmentName, tableNameWithType);
- return true;
- }
-
- long newCrc = newSegmentZKMetadata.getCrc();
- long existedCrc = Long.valueOf(existedSegmentMetadata.getCrc());
- LOGGER.info("New segment CRC: {}, existed segment CRC: {} for segment: {}
in table: {}", newCrc, existedCrc,
- segmentName, tableNameWithType);
- return newCrc != existedCrc;
- }
-
- private String downloadSegmentToLocal(String uri, PinotCrypter crypter,
String tableName, String segmentName)
- throws Exception {
- File tempDir = new File(new
File(_instanceDataManager.getSegmentFileDirectory(), tableName),
- "tmp-" + segmentName + "-" + UUID.randomUUID());
- FileUtils.forceMkdir(tempDir);
- File tempDownloadFile = new File(tempDir, segmentName + ENCODED_SUFFIX);
- File tempTarFile = new File(tempDir, segmentName + TAR_GZ_SUFFIX);
- File tempSegmentDir = new File(tempDir, segmentName);
- try {
- try {
- SegmentFetcherFactory.fetchSegmentToLocal(uri, tempDownloadFile);
- if (crypter != null) {
- crypter.decrypt(tempDownloadFile, tempTarFile);
- } else {
- tempTarFile = tempDownloadFile;
- }
- LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to:
{}, file length: {}", segmentName,
- tableName, uri, tempTarFile, tempTarFile.length());
- } catch (AttemptsExceededException e) {
- LOGGER.error("Attempts exceeded when downloading segment: {} for
table: {} from: {} to: {}", segmentName,
- tableName, uri, tempTarFile);
- _serverMetrics.addMeteredTableValue(tableName,
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
- Utils.rethrowException(e);
- return null;
- }
-
- try {
- // If an exception is thrown when untarring, it means the tar file is
broken OR not found after the retry.
- // Thus, there's no need to retry again.
- File tempIndexDir = TarGzCompressionUtils.untar(tempTarFile,
tempSegmentDir).get(0);
- File indexDir = new File(new
File(_instanceDataManager.getSegmentDataDirectory(), tableName), segmentName);
- if (indexDir.exists()) {
- LOGGER.info("Deleting existing index directory for segment: {} for
table: {}", segmentName, tableName);
- FileUtils.deleteDirectory(indexDir);
- }
- FileUtils.moveDirectory(tempIndexDir, indexDir);
- LOGGER.info("Successfully downloaded segment: {} for table: {} to:
{}", segmentName, tableName, indexDir);
- return indexDir.getAbsolutePath();
- } catch (Exception e) {
- LOGGER.error("Exception when untarring segment: {} for table: {} from
{} to {}", segmentName, tableName,
- tempTarFile, tempSegmentDir);
- _serverMetrics.addMeteredTableValue(tableName,
ServerMeter.UNTAR_FAILURES, 1L);
- Utils.rethrowException(e);
- return null;
- }
- } finally {
- FileUtils.deleteQuietly(tempDir);
- }
- }
-
- public String getSegmentLocalDirectory(String tableName, String segmentId) {
- return _instanceDataManager.getSegmentDataDirectory() + "/" + tableName +
"/" + segmentId;
- }
-}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index 97f1d23..ad4a386 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -40,14 +40,10 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
// We only allow limited number of segments refresh/reload happen at the
same time
// The reason for that is segment refresh/reload will temporarily use
double-sized memory
private final Semaphore _refreshThreadSemaphore;
-
- private final SegmentFetcherAndLoader _fetcherAndLoader;
private final InstanceDataManager _instanceDataManager;
private final ServerMetrics _metrics;
- public SegmentMessageHandlerFactory(SegmentFetcherAndLoader
fetcherAndLoader, InstanceDataManager instanceDataManager,
- ServerMetrics metrics) {
- _fetcherAndLoader = fetcherAndLoader;
+ public SegmentMessageHandlerFactory(InstanceDataManager instanceDataManager,
ServerMetrics metrics) {
_instanceDataManager = instanceDataManager;
_metrics = metrics;
int maxParallelRefreshThreads =
instanceDataManager.getMaxParallelRefreshThreads();
@@ -119,7 +115,7 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
try {
acquireSema(_segmentName, _logger);
// The number of retry times depends on the retry count in Constants.
- _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType,
_segmentName);
+ _instanceDataManager.addOrReplaceSegment(_tableNameWithType,
_segmentName);
result.setSuccess(true);
} catch (Exception e) {
_metrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REFRESH_FAILURES, 1);
@@ -150,19 +146,11 @@ public class SegmentMessageHandlerFactory implements
MessageHandlerFactory {
acquireSema("ALL", _logger);
// NOTE: the method aborts if any segment reload encounters an
unhandled exception,
// and can lead to inconsistent state across segments
- if (_forceDownload) {
- _fetcherAndLoader.replaceAllOfflineSegments(_tableNameWithType);
- } else {
- _instanceDataManager.reloadAllSegments(_tableNameWithType);
- }
+ _instanceDataManager.reloadAllSegments(_tableNameWithType,
_forceDownload);
} else {
// Reload one segment
acquireSema(_segmentName, _logger);
- if (_forceDownload) {
- _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType,
_segmentName, true);
- } else {
- _instanceDataManager.reloadSegment(_tableNameWithType,
_segmentName);
- }
+ _instanceDataManager.reloadSegment(_tableNameWithType, _segmentName,
_forceDownload);
}
helixTaskResult.setSuccess(true);
} catch (Throwable e) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 1a5efe5..d2e4c30 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -53,13 +53,10 @@ import org.slf4j.LoggerFactory;
public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<StateModel> {
private final String _instanceId;
private final InstanceDataManager _instanceDataManager;
- private final SegmentFetcherAndLoader _fetcherAndLoader;
- public SegmentOnlineOfflineStateModelFactory(String instanceId,
InstanceDataManager instanceDataManager,
- SegmentFetcherAndLoader fetcherAndLoader) {
+ public SegmentOnlineOfflineStateModelFactory(String instanceId,
InstanceDataManager instanceDataManager) {
_instanceId = instanceId;
_instanceDataManager = instanceDataManager;
- _fetcherAndLoader = fetcherAndLoader;
}
public static String getStateModelName() {
@@ -162,7 +159,7 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(message.getResourceName());
Preconditions.checkNotNull(tableType);
if (tableType == TableType.OFFLINE) {
- _fetcherAndLoader.addOrReplaceOfflineSegment(tableNameWithType,
segmentName);
+ _instanceDataManager.addOrReplaceSegment(tableNameWithType,
segmentName);
} else {
_instanceDataManager.addRealtimeSegment(tableNameWithType,
segmentName);
}
@@ -208,7 +205,7 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
try {
segmentLock.lock();
- final File segmentDir = new
File(_fetcherAndLoader.getSegmentLocalDirectory(tableNameWithType,
segmentName));
+ File segmentDir =
_instanceDataManager.getSegmentDataDirectory(tableNameWithType, segmentName);
if (segmentDir.exists()) {
FileUtils.deleteQuietly(segmentDir);
_logger.info("Deleted segment directory {}", segmentDir);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2bc0de8..642fa04 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -453,9 +453,11 @@ public class CommonConstants {
public static class Realtime {
public enum Status {
// Means the segment is in CONSUMING state.
- IN_PROGRESS, // Means the segment is in ONLINE state (segment
completed consuming and has been saved in
+ IN_PROGRESS,
+ // Means the segment is in ONLINE state (segment completed consuming
and has been saved in
// segment store).
- DONE, // Means the segment is uploaded to a Pinot controller by an
external party.
+ DONE,
+ // Means the segment is uploaded to a Pinot controller by an external
party.
UPLOADED
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]