This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 21f3d283d4 Allow server level configuration for Upsert metadata class
(#11851)
21f3d283d4 is described below
commit 21f3d283d42e1b4f5b16f3ef84549eb34b8b7031
Author: Kartik Khare <[email protected]>
AuthorDate: Wed Jan 17 16:40:47 2024 +0530
Allow server level configuration for Upsert metadata class (#11851)
* Allow server level configuration for Upsert metadata class
* Fix null pointer exceptions
* Add tests
* Fix tests
* fix tests
* Fix tests
* Move logic to parse upsert configs from instance data manager to table
upsert factory
* Address review comments
---------
Co-authored-by: Kartik Khare
<[email protected]>
---
.../manager/realtime/RealtimeTableDataManager.java | 8 +-
.../tests/UpsertTableIntegrationTest.java | 121 ++++++++++++++++-----
.../UpsertTableSegmentPreloadIntegrationTest.java | 11 +-
.../models/DummyTableUpsertMetadataManager.java | 115 ++++++++++++++++++++
.../upsert/BaseTableUpsertMetadataManager.java | 1 +
.../upsert/TableUpsertMetadataManagerFactory.java | 28 ++++-
.../MutableSegmentImplUpsertComparisonColTest.java | 3 +-
.../mutable/MutableSegmentImplUpsertTest.java | 3 +-
.../helix/HelixInstanceDataManagerConfig.java | 7 ++
.../config/instance/InstanceDataManagerConfig.java | 2 +
10 files changed, 265 insertions(+), 34 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 60cd58199f..333be09b0c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -207,7 +207,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
// NOTE: Set _tableUpsertMetadataManager before initializing it because
when preloading is enabled, we need to
// load segments into it
- _tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig);
+ _tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig,
+
_tableDataManagerConfig.getInstanceDataManagerConfig().getUpsertConfigs());
_tableUpsertMetadataManager.init(tableConfig, schema, this,
_helixManager, _segmentPreloadExecutor);
}
@@ -697,6 +698,11 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return _instanceId;
}
+ @VisibleForTesting
+ public TableUpsertMetadataManager getTableUpsertMetadataManager() {
+ return _tableUpsertMetadataManager;
+ }
+
/**
* Validate a schema against the table config for real-time record
consumption.
* Ideally, we should validate these things when schema is added or table is
created, but either of these
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index e140020a39..634390effc 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -18,20 +18,36 @@
*/
package org.apache.pinot.integration.tests;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import
org.apache.pinot.integration.tests.models.DummyTableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
@@ -82,8 +98,9 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
addSchema(schema);
Map<String, String> csvDecoderProperties =
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
- TableConfig tableConfig = createCSVUpsertTableConfig(getTableName(),
getKafkaTopic(),
- getNumKafkaPartitions(), csvDecoderProperties, null, PRIMARY_KEY_COL);
+ TableConfig tableConfig =
+ createCSVUpsertTableConfig(getTableName(), getKafkaTopic(),
getNumKafkaPartitions(), csvDecoderProperties, null,
+ PRIMARY_KEY_COL);
addTableConfig(tableConfig);
// Wait for all documents loaded
@@ -136,8 +153,7 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
private Schema createSchema(String schemaFileName)
throws IOException {
- InputStream inputStream =
-
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
+ InputStream inputStream =
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
Assert.assertNotNull(inputStream);
return Schema.fromInputStream(inputStream);
}
@@ -181,8 +197,9 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
Schema upsertSchema = createSchema();
upsertSchema.setSchemaName(tableName);
addSchema(upsertSchema);
- TableConfig tableConfig = createCSVUpsertTableConfig(tableName,
kafkaTopicName,
- getNumKafkaPartitions(), csvDecoderProperties, upsertConfig,
PRIMARY_KEY_COL);
+ TableConfig tableConfig =
+ createCSVUpsertTableConfig(tableName, kafkaTopicName,
getNumKafkaPartitions(), csvDecoderProperties,
+ upsertConfig, PRIMARY_KEY_COL);
addTableConfig(tableConfig);
// Push initial 10 upsert records - 3 pks 100, 101 and 102
@@ -222,9 +239,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101");
// Validate deleted records
- rs = getPinotConnection()
- .execute("SELECT playerId FROM " + tableName
- + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
+ rs = getPinotConnection().execute(
+ "SELECT playerId FROM " + tableName + " WHERE deleted = true
OPTION(skipUpsert=true)").getResultSet(0);
Assert.assertEquals(rs.getRowCount(), 2);
for (int i = 0; i < rs.getRowCount(); i++) {
String playerId = rs.getString(i, 0);
@@ -245,17 +261,16 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
}, 100L, 600_000L, "Failed to load all upsert records for
testDeleteWithFullUpsert");
// Validate: pk is queryable and all columns are overwritten with new value
- rs = getPinotConnection()
- .execute("SELECT playerId, name, game FROM " + tableName + " WHERE
playerId = 100").getResultSet(0);
+ rs = getPinotConnection().execute("SELECT playerId, name, game FROM " +
tableName + " WHERE playerId = 100")
+ .getResultSet(0);
Assert.assertEquals(rs.getRowCount(), 1);
Assert.assertEquals(rs.getInt(0, 0), 100);
Assert.assertEquals(rs.getString(0, 1), "Zook-New");
Assert.assertEquals(rs.getString(0, 2), "null");
// Validate: pk lineage still exists
- rs = getPinotConnection()
- .execute("SELECT playerId, name FROM " + tableName
- + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0);
+ rs = getPinotConnection().execute(
+ "SELECT playerId, name FROM " + tableName + " WHERE playerId = 100
OPTION(skipUpsert=true)").getResultSet(0);
Assert.assertTrue(rs.getRowCount() > 1);
@@ -269,8 +284,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
final UpsertConfig upsertConfig = new
UpsertConfig(UpsertConfig.Mode.PARTIAL);
upsertConfig.setDeleteRecordColumn(DELETE_COL);
- testDeleteWithPartialUpsert(getKafkaTopic() +
"-partial-upsert-with-deletes",
- "gameScoresPartialUpsertWithDelete", upsertConfig);
+ testDeleteWithPartialUpsert(getKafkaTopic() +
"-partial-upsert-with-deletes", "gameScoresPartialUpsertWithDelete",
+ upsertConfig);
}
protected void testDeleteWithPartialUpsert(String kafkaTopicName, String
tableName, UpsertConfig upsertConfig)
@@ -288,8 +303,9 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
Schema partialUpsertSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA);
partialUpsertSchema.setSchemaName(tableName);
addSchema(partialUpsertSchema);
- TableConfig tableConfig = createCSVUpsertTableConfig(tableName,
kafkaTopicName,
- getNumKafkaPartitions(), csvDecoderProperties, upsertConfig,
PRIMARY_KEY_COL);
+ TableConfig tableConfig =
+ createCSVUpsertTableConfig(tableName, kafkaTopicName,
getNumKafkaPartitions(), csvDecoderProperties,
+ upsertConfig, PRIMARY_KEY_COL);
addTableConfig(tableConfig);
// Push initial 10 upsert records - 3 pks 100, 101 and 102
@@ -329,9 +345,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101");
// Validate deleted records
- rs = getPinotConnection()
- .execute("SELECT playerId FROM " + tableName
- + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
+ rs = getPinotConnection().execute(
+ "SELECT playerId FROM " + tableName + " WHERE deleted = true
OPTION(skipUpsert=true)").getResultSet(0);
Assert.assertEquals(rs.getRowCount(), 2);
for (int i = 0; i < rs.getRowCount(); i++) {
String playerId = rs.getString(i, 0);
@@ -352,22 +367,74 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
}, 100L, 600_000L, "Failed to load all upsert records for
testDeleteWithFullUpsert");
// Validate: pk is queryable and all columns are overwritten with new value
- rs = getPinotConnection()
- .execute("SELECT playerId, name, game FROM " + tableName
- + " WHERE playerId = 100").getResultSet(0);
+ rs = getPinotConnection().execute("SELECT playerId, name, game FROM " +
tableName + " WHERE playerId = 100")
+ .getResultSet(0);
Assert.assertEquals(rs.getRowCount(), 1);
Assert.assertEquals(rs.getInt(0, 0), 100);
Assert.assertEquals(rs.getString(0, 1), "Zook");
Assert.assertEquals(rs.getString(0, 2), "[\"null\"]");
// Validate: pk lineage still exists
- rs = getPinotConnection()
- .execute("SELECT playerId, name FROM " + tableName
- + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0);
+ rs = getPinotConnection().execute(
+ "SELECT playerId, name FROM " + tableName + " WHERE playerId = 100
OPTION(skipUpsert=true)").getResultSet(0);
Assert.assertTrue(rs.getRowCount() > 1);
// TEARDOWN
dropRealtimeTable(tableName);
}
+
+ @Test
+ public void testDefaultMetadataManagerClass()
+ throws Exception {
+ PinotConfiguration config = getServerConf(12345);
+
config.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
+ HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+
TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_METADATA_MANAGER_CLASS),
+ DummyTableUpsertMetadataManager.class.getName());
+
+ BaseServerStarter serverStarter = null;
+ try {
+ serverStarter = startOneServer(config);
+ HelixManager helixManager =
serverStarter.getServerInstance().getHelixManager();
+ InstanceConfig instanceConfig =
HelixHelper.getInstanceConfig(helixManager, serverStarter.getInstanceId());
+ // updateInstanceTags
+ String tagsString = "DummyTag_REALTIME,DummyTag_OFFLINE";
+ List<String> newTags = Arrays.asList(StringUtils.split(tagsString, ','));
+
instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(),
newTags);
+
+ if
(!_helixDataAccessor.setProperty(_helixDataAccessor.keyBuilder().instanceConfig(serverStarter.getInstanceId()),
+ instanceConfig)) {
+ throw new RuntimeException("Failed to set instance config for
instance: " + serverStarter.getInstanceId());
+ }
+
+ String dummyTableName = "dummyTable123";
+ Map<String, String> csvDecoderProperties =
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+
+ TableConfig tableConfig =
+ createCSVUpsertTableConfig(dummyTableName, getKafkaTopic(),
getNumKafkaPartitions(), csvDecoderProperties,
+ null, PRIMARY_KEY_COL);
+
+ TenantConfig tenantConfig = new
TenantConfig(TagNameUtils.DEFAULT_TENANT_NAME, "DummyTag", null);
+
+ tableConfig.setTenantConfig(tenantConfig);
+ Schema schema = createSchema();
+ schema.setSchemaName(dummyTableName);
+ addSchema(schema);
+ addTableConfig(tableConfig);
+
+ Thread.sleep(1000L);
+ RealtimeTableDataManager tableDataManager =
+ (RealtimeTableDataManager)
serverStarter.getServerInstance().getInstanceDataManager()
+
.getTableDataManager(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(dummyTableName));
+ Assert.assertTrue(tableDataManager.getTableUpsertMetadataManager()
instanceof DummyTableUpsertMetadataManager);
+ dropRealtimeTable(dummyTableName);
+ deleteSchema(dummyTableName);
+ waitForEVToDisappear(dummyTableName);
+ } finally {
+ if (serverStarter != null) {
+ serverStarter.stop();
+ }
+ }
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index b88f5fbdab..f10a58bf72 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.integration.tests;
+import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -26,8 +27,10 @@ import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -87,8 +90,6 @@ public class UpsertTableSegmentPreloadIntegrationTest extends
BaseClusterIntegra
TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0),
PRIMARY_KEY_COL, null, getNumKafkaPartitions());
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
assertNotNull(upsertConfig);
- upsertConfig.setEnableSnapshot(true);
- upsertConfig.setEnablePreload(true);
addTableConfig(tableConfig);
// Create and upload segments
@@ -104,6 +105,12 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
protected void overrideServerConf(PinotConfiguration serverConf) {
serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX
+ ".max.segment.preload.threads",
"1");
+
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
+ HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+ TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_SNAPSHOT),
"true");
+
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
+ HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT,
+ TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_PRELOAD),
"true");
}
@AfterClass
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
new file mode 100644
index 0000000000..502834a6b6
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.models;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import
org.apache.pinot.segment.local.upsert.BasePartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.BaseTableUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.RecordInfo;
+import org.apache.pinot.segment.local.upsert.UpsertContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class DummyTableUpsertMetadataManager extends
BaseTableUpsertMetadataManager {
+
+ private TableConfig _tableConfig;
+ private Schema _schema;
+
+ public DummyTableUpsertMetadataManager() {
+ super();
+ }
+
+ @Override
+ public void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager, HelixManager helixManager,
+ @org.jetbrains.annotations.Nullable ExecutorService
segmentPreloadExecutor) {
+ super.init(tableConfig, schema, tableDataManager, helixManager,
segmentPreloadExecutor);
+ _tableConfig = tableConfig;
+ _schema = schema;
+ }
+
+ @Override
+ public PartitionUpsertMetadataManager getOrCreatePartitionManager(int
partitionId) {
+ UpsertContext context = new
UpsertContext.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+ .setPrimaryKeyColumns(_schema.getPrimaryKeyColumns())
+
.setComparisonColumns(Collections.singletonList(_tableConfig.getValidationConfig().getTimeColumnName()))
+ .setHashFunction(HashFunction.NONE).setTableIndexDir(new
File("/tmp/tableIndexDirDummy")).build();
+
+ return new DummyPartitionUpsertMetadataManager("dummy", partitionId,
context);
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+
+ class DummyPartitionUpsertMetadataManager extends
BasePartitionUpsertMetadataManager {
+ public DummyPartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, UpsertContext context) {
+ super(tableNameWithType, partitionId, context);
+ }
+
+ @Override
+ protected long getNumPrimaryKeys() {
+ return 0;
+ }
+
+ @Override
+ protected void addOrReplaceSegment(ImmutableSegmentImpl segment,
ThreadSafeMutableRoaringBitmap validDocIds,
+ @org.jetbrains.annotations.Nullable ThreadSafeMutableRoaringBitmap
queryableDocIds,
+ Iterator<RecordInfo> recordInfoIterator,
@org.jetbrains.annotations.Nullable IndexSegment oldSegment,
+ @org.jetbrains.annotations.Nullable MutableRoaringBitmap
validDocIdsForOldSegment) {
+ }
+
+ @Override
+ protected boolean doAddRecord(MutableSegment segment, RecordInfo
recordInfo) {
+ return false;
+ }
+
+ @Override
+ protected void removeSegment(IndexSegment segment, MutableRoaringBitmap
validDocIds) {
+ }
+
+ @Override
+ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo
recordInfo) {
+ return null;
+ }
+
+ @Override
+ protected void doRemoveExpiredPrimaryKeys() {
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 845930a54c..72f57d4086 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -116,6 +116,7 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
initCustomVariables();
if (enableSnapshot && enablePreload && segmentPreloadExecutor != null) {
+
// Preloading the segments with snapshots for fast upsert metadata
recovery.
// Note that there is an implicit waiting logic between the thread doing
the segment preloading here and the
// other helix threads about to process segment state transitions (e.g.
taking segments from OFFLINE to ONLINE).
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 1750aaefc9..1ee4a834e1 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -19,9 +19,11 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,20 +33,42 @@ public class TableUpsertMetadataManagerFactory {
}
private static final Logger LOGGER =
LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
+ public static final String UPSERT_DEFAULT_METADATA_MANAGER_CLASS =
"default.metadata.manager.class";
+ public static final String UPSERT_DEFAULT_ENABLE_SNAPSHOT =
"default.enable.snapshot";
+ public static final String UPSERT_DEFAULT_ENABLE_PRELOAD =
"default.enable.preload";
- public static TableUpsertMetadataManager create(TableConfig tableConfig) {
+ public static TableUpsertMetadataManager create(TableConfig tableConfig,
+ @Nullable PinotConfiguration instanceUpsertConfigs) {
String tableNameWithType = tableConfig.getTableName();
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
Preconditions.checkArgument(upsertConfig != null, "Must provide upsert
config for table: %s", tableNameWithType);
TableUpsertMetadataManager metadataManager;
String metadataManagerClass = upsertConfig.getMetadataManagerClass();
+
+ if (instanceUpsertConfigs != null) {
+ if (metadataManagerClass == null) {
+ metadataManagerClass =
instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_METADATA_MANAGER_CLASS);
+ }
+ // Server level config honoured only when table level config is not set
to true
+ if (!upsertConfig.isEnableSnapshot()) {
+ upsertConfig.setEnableSnapshot(
+
Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_SNAPSHOT,
"false")));
+ }
+
+ // Server level config honoured only when table level config is not set
to true
+ if (!upsertConfig.isEnablePreload()) {
+ upsertConfig.setEnablePreload(
+
Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD,
"false")));
+ }
+ }
+
if (StringUtils.isNotEmpty(metadataManagerClass)) {
LOGGER.info("Creating TableUpsertMetadataManager with class: {} for
table: {}", metadataManagerClass,
tableNameWithType);
try {
metadataManager =
- (TableUpsertMetadataManager)
Class.forName(metadataManagerClass).getConstructor().newInstance();
+ (TableUpsertMetadataManager)
Class.forName(metadataManagerClass).newInstance();
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while constructing
TableUpsertMetadataManager with class: %s for table: %s",
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 38bc66ade5..d3508b7540 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -86,7 +86,8 @@ public class MutableSegmentImplUpsertComparisonColTest {
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
- TableUpsertMetadataManager tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(_tableConfig);
+ TableUpsertMetadataManager tableUpsertMetadataManager =
+ TableUpsertMetadataManagerFactory.create(_tableConfig, null);
tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager,
mock(HelixManager.class),
mock(ExecutorService.class));
_partitionUpsertMetadataManager =
tableUpsertMetadataManager.getOrCreatePartitionManager(0);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 702bd47f80..4f14bdd408 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -96,7 +96,8 @@ public class MutableSegmentImplUpsertTest {
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
- TableUpsertMetadataManager tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(_tableConfig);
+ TableUpsertMetadataManager tableUpsertMetadataManager =
+ TableUpsertMetadataManagerFactory.create(_tableConfig, null);
tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager,
mock(HelixManager.class),
mock(ExecutorService.class));
_partitionUpsertMetadataManager =
tableUpsertMetadataManager.getOrCreatePartitionManager(0);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 9c69e8afd4..c7e15fe106 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -131,6 +131,8 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS =
"external.view.dropped.max.wait.ms";
private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS =
"external.view.dropped.check.interval.ms";
+ public static final String PREFIX_OF_CONFIG_OF_UPSERT = "upsert";
+
private final static String[] REQUIRED_KEYS = {INSTANCE_ID};
private static final long DEFAULT_ERROR_CACHE_SIZE = 100L;
private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000;
@@ -294,6 +296,11 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS);
}
+ @Override
+ public PinotConfiguration getUpsertConfigs() {
+ return
_instanceDataManagerConfiguration.subset(PREFIX_OF_CONFIG_OF_UPSERT);
+ }
+
@Override
public String toString() {
String configString = "";
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index a97f1639e1..6d035890c9 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -68,4 +68,6 @@ public interface InstanceDataManagerConfig {
long getExternalViewDroppedMaxWaitMs();
long getExternalViewDroppedCheckIntervalMs();
+
+ PinotConfiguration getUpsertConfigs();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]