This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9b39d9484a fixing the corner of missing segment file to download when
refresh segment with metadata then segment push (#11720)
9b39d9484a is described below
commit 9b39d9484a0cd21a69f6207c5e5a576d7444dd0c
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Oct 2 12:35:12 2023 -0700
fixing the corner of missing segment file to download when refresh segment
with metadata then segment push (#11720)
---
.../pinot/controller/api/upload/ZKOperator.java | 13 +-
.../pinot/integration/tests/ClusterTest.java | 10 +-
.../tests/OfflineClusterIntegrationTest.java | 224 ++++++---------------
3 files changed, 85 insertions(+), 162 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 38cf2919e7..b0aee83d0d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -212,9 +212,20 @@ public class ZKOperator {
segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
}
if (!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr))
{
+ // For offline ingestion, it is quite common that the download.uri
would change but the crc would be the same.
+ // E.g. a user re-runs the job which process the same data and
segments are stored/pushed from a different
+ // path from the Deepstore. Read more:
https://github.com/apache/pinot/issues/11535
LOGGER.info("Updating segment download url from: {} to: {} even
though crc is the same",
- segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr);
+ segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr);
segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr);
+ // When download URI changes, we also need to copy the segment to
the final location if existed.
+ // This typically means users changed the push type from METADATA to
SEGMENT or SEGMENT to METADATA.
+ // Note that switching push type from SEGMENT to METADATA may lead
orphan segments in the controller
+ // managed directory. Read more:
https://github.com/apache/pinot/pull/11720
+ if (finalSegmentLocationURI != null) {
+ copySegmentToDeepStore(tableNameWithType, segmentName, uploadType,
segmentFile, sourceDownloadURIStr,
+ finalSegmentLocationURI);
+ }
}
if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata, expectedVersion)) {
throw new RuntimeException(
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 9b41232f9b..d04b9efc6c 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -323,6 +323,14 @@ public abstract class ClusterTest extends ControllerTest {
uploadSegments(tableName, TableType.OFFLINE, tarDir);
}
+ /**
+ * Upload all segments inside the given directory to the cluster.
+ */
+ protected void uploadSegments(String tableName, List<File> tarDirs)
+ throws Exception {
+ uploadSegments(tableName, TableType.OFFLINE, tarDirs);
+ }
+
/**
* Upload all segments inside the given directory to the cluster.
*/
@@ -545,7 +553,7 @@ public abstract class ClusterTest extends ControllerTest {
@DataProvider(name = "systemColumns")
public Object[][] systemColumns() {
- return new Object[][] {
+ return new Object[][]{
{"$docId"},
{"$hostName"},
{"$segmentName"}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index d9d645c065..c29fbe59f1 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -203,11 +203,14 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
uploadSegments(getTableName(), TableType.OFFLINE, tarDirs);
} catch (Exception e) {
// If enableParallelPushProtection is enabled and the same segment is
uploaded concurrently, we could get one
- // of the two exception - 409 conflict of the second call enters
ProcessExistingSegment ; segmentZkMetadata
- // creation failure if both calls entered ProcessNewSegment. In/such
cases ensure that we upload all the
- // segments again/to ensure that the data is setup correctly.
+ // of the three exception:
+ // - 409 conflict of the second call enters ProcessExistingSegment ;
+ // - segmentZkMetadata creation failure if both calls entered
ProcessNewSegment.
+ // - Failed to copy segment tar file to final location due to the same
segment pushed twice concurrently.
+ // In such cases we upload all the segments again to ensure that the
data is setup correctly.
assertTrue(e.getMessage().contains("Another segment upload is in
progress for segment") || e.getMessage()
- .contains("Failed to create ZK metadata for segment"),
e.getMessage());
+ .contains("Failed to update ZK metadata for segment") ||
e.getMessage()
+ .contains("java.nio.file.FileAlreadyExistsException"),
e.getMessage());
uploadSegments(getTableName(), _tarDir);
}
@@ -224,6 +227,47 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
+
+ // Try to reload all the segments with force download from the controller
URI.
+ reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true,
getCountStarResult());
+
+ // Try to upload all the segments again with force download from the
controller URI.
+ try {
+ uploadSegments(getTableName(), tarDirs);
+ } catch (Exception e) {
+ // If enableParallelPushProtection is enabled and the same segment is
uploaded concurrently, we could get one
+ // of the three exception:
+ // - 409 conflict of the second call enters ProcessExistingSegment ;
+ // - segmentZkMetadata creation failure if both calls entered
ProcessNewSegment.
+ // - Failed to copy segment tar file to final location due to the same
segment pushed twice concurrently.
+ // In such cases we upload all the segments again to ensure that the
data is setup correctly.
+ assertTrue(e.getMessage().contains("Another segment upload is in
progress for segment") || e.getMessage()
+ .contains("Failed to update ZK metadata for segment") ||
e.getMessage()
+ .contains("java.nio.file.FileAlreadyExistsException"),
e.getMessage());
+ uploadSegments(getTableName(), _tarDir);
+ }
+
+ // Try to reload all the segments with force download from the controller
URI.
+ reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true,
getCountStarResult());
+ }
+
+ private void reloadAllSegments(String testQuery, boolean forceDownload, long
numTotalDocs)
+ throws IOException {
+ // Try to refresh all the segments again with force download from the
controller URI.
+ String reloadJob = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, forceDownload);
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode queryResponse = postQuery(testQuery);
+ if (!queryResponse.get("exceptions").isEmpty()) {
+ return false;
+ }
+ // Total docs should not change during reload
+ assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
+ return isReloadJobCompleted(reloadJob);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 600_000L, "Failed to reload table with force download");
}
@BeforeMethod
@@ -482,16 +526,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns());
updateTableConfig(tableConfig);
- String removeInvertedIndexJobId =
reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Total docs should not change during reload
-
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(),
numTotalDocs);
- return isReloadJobCompleted(removeInvertedIndexJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to cleanup obsolete index");
+ reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs);
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
numTotalDocs);
assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
@@ -544,16 +579,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns());
updateTableConfig(tableConfig);
- String forceDownloadJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, true);
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Total docs should not change during reload
-
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(),
numTotalDocs);
- return isReloadJobCompleted(forceDownloadJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to cleanup obsolete index in table");
+ reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs);
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
numTotalDocs);
// With force download, the table size gets back to the initial value.
assertEquals(getTableSize(getTableName()), DISK_SIZE_IN_BYTES);
@@ -566,22 +592,12 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
updateTableConfig(tableConfig);
- String reloadJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
// It takes a while to reload multiple segments, thus we retry the query
for some time.
// After all segments are reloaded, the inverted index is added on
DivActualElapsedTime.
// It's expected to have numEntriesScannedInFilter equal to 0, i.e. no
docs is scanned
// at filtering stage when inverted index can answer the predicate
directly.
- long numTotalDocs = getCountStarResult();
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Total docs should not change during reload
-
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(),
numTotalDocs);
- return isReloadJobCompleted(reloadJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to generate inverted index");
+ reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, false,
getCountStarResult());
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
0L);
}
@@ -1117,33 +1133,14 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Update table config and trigger reload
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS);
- updateTableConfig(tableConfig);
- String addIndexJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Total docs should not change during reload
-
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("totalDocs").asLong(),
numTotalDocs);
- return isReloadJobCompleted(addIndexJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to generate range index");
+ reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs);
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
0L);
// Update table config to remove the new range index, and check if the new
range index is removed
tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setRangeIndexColumns(getRangeIndexColumns());
updateTableConfig(tableConfig);
- String removeIndexJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Total docs should not change during reload
-
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("totalDocs").asLong(),
numTotalDocs);
- return isReloadJobCompleted(removeIndexJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to cleanup obsolete index");
+ reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs);
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
numTotalDocs);
assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
}
@@ -1158,16 +1155,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setBloomFilterColumns(UPDATED_BLOOM_FILTER_COLUMNS);
updateTableConfig(tableConfig);
- String addIndexJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Total docs should not change during reload
-
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("totalDocs").asLong(),
numTotalDocs);
- return isReloadJobCompleted(addIndexJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to generate bloom filter");
+ reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, false, numTotalDocs);
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(),
0L);
// Update table config to remove the new bloom filter, and
@@ -1175,16 +1163,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setBloomFilterColumns(getBloomFilterColumns());
updateTableConfig(tableConfig);
- String removeIndexJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Total docs should not change during reload
-
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("totalDocs").asLong(),
numTotalDocs);
- return isReloadJobCompleted(removeIndexJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to cleanup obsolete index");
+ reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, false, numTotalDocs);
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(),
NUM_SEGMENTS);
assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
}
@@ -1223,24 +1202,12 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
indexingConfig.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_INDEX_CONFIG_1));
indexingConfig.setEnableDynamicStarTreeCreation(true);
updateTableConfig(tableConfig);
- String addIndexJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
- // Result should not change during reload
-
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
firstQueryResult);
- // Total docs should not change during reload
- assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- return isReloadJobCompleted(addIndexJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to add first star-tree index");
+ reloadAllSegments(TEST_STAR_TREE_QUERY_1, false, numTotalDocs);
// With star-tree, 'numDocsScanned' should be the same as number of
segments (1 per segment)
assertEquals(postQuery(TEST_STAR_TREE_QUERY_1).get("numDocsScanned").asLong(),
NUM_SEGMENTS);
// Reload again should have no effect
- reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false);
+ reloadAllSegments(TEST_STAR_TREE_QUERY_1, false, numTotalDocs);
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
firstQueryResult);
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
@@ -1268,19 +1235,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Update table config with a different star-tree index config and trigger
reload
indexingConfig.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_INDEX_CONFIG_2));
updateTableConfig(tableConfig);
- String changeIndexJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
- // Result should not change during reload
-
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
secondQueryResult);
- // Total docs should not change during reload
- assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- return isReloadJobCompleted(changeIndexJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to change to second star-tree index");
+ reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs);
// With star-tree, 'numDocsScanned' should be the same as number of
segments (1 per segment)
assertEquals(postQuery(TEST_STAR_TREE_QUERY_2).get("numDocsScanned").asLong(),
NUM_SEGMENTS);
@@ -1289,7 +1244,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(),
firstQueryResult);
// Reload again should have no effect
- reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false);
+ reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs);
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
firstQueryResult);
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
@@ -1314,19 +1269,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Remove the star-tree index config and trigger reload
indexingConfig.setStarTreeIndexConfigs(null);
updateTableConfig(tableConfig);
- String removeIndexJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
- // Result should not change during reload
-
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
secondQueryResult);
- // Total docs should not change during reload
- assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- return isReloadJobCompleted(removeIndexJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to remove star-tree index");
+ reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs);
// Without star-tree, 'numDocsScanned' should be the same as the
'COUNT(*)' result
assertEquals(postQuery(TEST_STAR_TREE_QUERY_2).get("numDocsScanned").asLong(),
secondQueryResult);
assertEquals(getTableSize(getTableName()), tableSizeWithDefaultIndex);
@@ -1336,7 +1279,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(firstQueryResponse.get("numDocsScanned").asInt(),
firstQueryResult);
// Reload again should have no effect
- reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false);
+ reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs);
firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
firstQueryResult);
assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
@@ -1475,29 +1418,13 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
updateTableConfig(tableConfig);
// Trigger reload
- String reloadJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- JsonNode queryResponse = postQuery(TEST_EXTRA_COLUMNS_QUERY);
- if (!queryResponse.get("exceptions").isEmpty()) {
- // Schema is not refreshed on the broker side yet
- return false;
- }
- // Total docs should not change during reload
- assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- return isReloadJobCompleted(reloadJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to add default columns");
+ reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs);
assertEquals(postQuery(TEST_EXTRA_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(),
numTotalDocs);
}
private void reloadWithMissingColumns()
throws Exception {
- long numTotalDocs = getCountStarResult();
-
// Remove columns from the table config first to pass the validation of
the table config
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setIngestionConfig(null);
@@ -1513,16 +1440,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
addSchema(schema);
// Trigger reload
- String reloadJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Total docs should not change during reload
- assertEquals(postQuery(SELECT_STAR_QUERY).get("totalDocs").asLong(),
numTotalDocs);
- return isReloadJobCompleted(reloadJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to skip missing columns");
+ reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult());
JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
"*")));
assertEquals(segmentsMetadata.size(), 12);
@@ -1539,21 +1457,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
addSchema(createSchema());
// Trigger reload
- String reloadJobId = reloadTableAndValidateResponse(getTableName(),
TableType.OFFLINE, false);
- TestUtils.waitForCondition(aVoid -> {
- try {
- JsonNode queryResponse = postQuery(TEST_REGULAR_COLUMNS_QUERY);
- if (!queryResponse.get("exceptions").isEmpty()) {
- // Schema is not refreshed on the broker side yet
- return false;
- }
- // Total docs should not change during reload
- assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- return isReloadJobCompleted(reloadJobId);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to reload regular columns");
+ reloadAllSegments(SELECT_STAR_QUERY, true, numTotalDocs);
assertEquals(postQuery(TEST_REGULAR_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(),
numTotalDocs);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]