This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 76d0eb25db Reduce Heap Usage of OnHeapStringDictionary (#12223)
76d0eb25db is described below
commit 76d0eb25db180f1bf88933146bc5e2e25f66f8df
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Thu Jan 25 12:51:45 2024 -0800
Reduce Heap Usage of OnHeapStringDictionary (#12223)
* Add Interning capability for OnHeapStringDictionary
* Adding tests and some fixes
* Remove support for older indexingConfig
* Address review comments
---
.../pinot/common/utils/FALFInternerTest.java | 168 +++++++++++++++++++++
.../index/dictionary/DictionaryIndexType.java | 55 ++++++-
.../index/dictionary/DictionaryInternerHolder.java | 58 +++++++
.../index/readers/OnHeapStringDictionary.java | 17 ++-
.../index/dictionary/DictionaryIndexTypeTest.java | 70 ++++++++-
.../index/readers/ImmutableDictionaryTest.java | 2 +-
.../ImmutableDictionaryTypeConversionTest.java | 25 ++-
.../segment/spi/index/DictionaryIndexConfig.java | 37 ++++-
.../org/apache/pinot/spi/config/table/Intern.java | 79 ++++++++++
.../org/apache/pinot/spi/utils/FALFInterner.java | 148 ++++++++++++++++++
.../pinot/spi/config/table/IndexingConfigTest.java | 7 +-
11 files changed, 642 insertions(+), 24 deletions(-)
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/FALFInternerTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/FALFInternerTest.java
new file mode 100644
index 0000000000..7c36acb902
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/FALFInternerTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import java.util.Objects;
+import java.util.Random;
+import org.apache.pinot.spi.utils.FALFInterner;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FALFInternerTest {
+ @Test
+ public void testInterningByteBuffers() {
+ Random random = new Random(1);
+
+ int nUniqueObjs = 1024;
+ int nTotalObjs = 8 * nUniqueObjs;
+
+ String[] allObjs = new String[nTotalObjs];
+
+ // Create an array of objects where each object should have ~8 copies
+ for (int i = 0; i < nTotalObjs; i++) {
+ int next = random.nextInt(nUniqueObjs);
+ allObjs[i] = Integer.toString(next);
+ }
+
+ Interner<String> exactInterner = Interners.newStrongInterner();
+ Interner<String> falfInterner = new FALFInterner(nUniqueObjs);
+ Interner<String> falfInternerCustomHash = new FALFInterner(nUniqueObjs, s
-> hashCode((String) s), Objects::equals);
+
+ // Go over all objects and intern them using both exact and FALF interners
+ int nHits1 = runInterning(allObjs, exactInterner, true);
+ int nHits2 = runInterning(allObjs, falfInterner, true);
+ int nHits3 = runInterning(allObjs, falfInternerCustomHash, true);
+
+ System.out.println(nHits1);
+ System.out.println(nHits2);
+ System.out.println(nHits3);
+
+ // For the exact interner, we should get a hit for each object except the
+ // first nUniqueObjs.
+ Assert.assertEquals(nHits1, nTotalObjs - nUniqueObjs);
+
+ // For the FALF interner, due to its fixed size and thus almost inevitable
hash
+ // collisions, the number of hits is smaller. Let's verify that it's not
too small, though.
+ Assert.assertTrue(nHits2 > (nTotalObjs - nUniqueObjs) * 0.4);
+
+ // With the better hash function, FALF interner should have more hits
+ Assert.assertTrue(nHits3 > (nTotalObjs - nUniqueObjs) * 0.6);
+
+ // Ad hoc benchmarking code. Disabled to avoid test slowdown.
+ // In one run the MacBook laptop, FALFInterner below performs nearly twice
faster
+ // (1217 ms vs 2230 ms) With custom hash function, FALFInterner's speed is
about the
+ // same as the Guava interner.
+// for (int j = 0; j < 3; j++) {
+// long time0 = System.currentTimeMillis();
+// long totNHits = 0;
+// for (int i = 0; i < 10000; i++) {
+// totNHits += runInterning(allObjs, exactInterner, false);
+// }
+// long time1 = System.currentTimeMillis();
+// System.out.println("Guava interner. totNHits = " + totNHits + ", time
= " + (time1 - time0));
+//
+// time0 = System.currentTimeMillis();
+// totNHits = 0;
+// for (int i = 0; i < 10000; i++) {
+// totNHits += runInterning(allObjs, falfInterner, false);
+// }
+// time1 = System.currentTimeMillis();
+// System.out.println("FALF interner. totNHits = " + totNHits + ", time =
" + (time1 - time0));
+//
+// time0 = System.currentTimeMillis();
+// totNHits = 0;
+// for (int i = 0; i < 10000; i++) {
+// totNHits += runInterning(allObjs, falfInternerCustomHash, false);
+// }
+// time1 = System.currentTimeMillis();
+// System.out.println("FALF interner Custom Hash. totNHits = " + totNHits
+ ", time = " + (time1 - time0));
+// }
+ }
+
+ private int runInterning(String[] objs, Interner<String> interner, boolean
performAssert) {
+ int nHits = 0;
+ for (String origObj : objs) {
+ String internedObj = interner.intern(origObj);
+ if (performAssert) {
+ Assert.assertEquals(origObj, internedObj);
+ }
+ if (origObj != internedObj) {
+ nHits++;
+ }
+ }
+ return nHits;
+ }
+
+ // Custom hash code implementation, that gives better distribution than
standard hashCode()
+
+ private static final int C1 = 0xcc9e2d51;
+ private static final int C2 = 0x1b873593;
+
+ public static int hashCode(String s) {
+ int h1 = 0;
+
+ // step through value 2 chars at a time
+ for (int i = 1; i < s.length(); i += 2) {
+ int k1 = s.charAt(i - 1) | (s.charAt(i) << 16);
+ h1 = nextHashCode(k1, h1);
+ }
+
+ // deal with any remaining characters
+ if ((s.length() & 1) == 1) {
+ int k1 = s.charAt(s.length() - 1);
+ k1 = mixK1(k1);
+ h1 ^= k1;
+ }
+
+ return fmix(h1, s.length() * 2);
+ }
+
+ private static int nextHashCode(int value, int prevHashCode) {
+ int k1 = mixK1(value);
+ return mixH1(prevHashCode, k1);
+ }
+
+ private static int mixK1(int k1) {
+ k1 *= C1;
+ k1 = Integer.rotateLeft(k1, 15);
+ k1 *= C2;
+ return k1;
+ }
+
+ private static int mixH1(int h1, int k1) {
+ h1 ^= k1;
+ h1 = Integer.rotateLeft(h1, 13);
+ h1 = h1 * 5 + 0xe6546b64;
+ return h1;
+ }
+
+ private static int fmix(int h1, int len) {
+ // Force all bits to avalanche
+ h1 ^= len;
+ h1 ^= h1 >>> 16;
+ h1 *= 0x85ebca6b;
+ h1 ^= h1 >>> 13;
+ h1 *= 0xc2b2ae35;
+ h1 ^= h1 >>> 16;
+ return h1;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
index 102361a733..0bc7845b8b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.segment.index.dictionary;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
@@ -77,9 +78,11 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.Intern;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.FALFInterner;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,6 +114,7 @@ public class DictionaryIndexType
if (noDictionaryCols.contains(column)) {
result.put(column, DictionaryIndexConfig.disabled());
} else {
+ // Intern configs can only be used if dictionary is enabled through
FieldConfigLists.
result.put(column, new
DictionaryIndexConfig(onHeapCols.contains(column),
varLengthCols.contains(column)));
}
}
@@ -162,6 +166,8 @@ public class DictionaryIndexType
Set<String> varLength = new HashSet<>(
ic.getVarLengthDictionaryColumns() == null ? Collections.emptyList()
: ic.getVarLengthDictionaryColumns()
);
+
+ // Intern configs can only be used if dictionary is enabled through
FieldConfigLists.
Function<String, DictionaryIndexConfig> valueCalculator =
column -> new DictionaryIndexConfig(onHeap.contains(column),
varLength.contains(column));
return Sets.union(onHeap, varLength).stream()
@@ -281,11 +287,18 @@ public class DictionaryIndexType
public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata
metadata, DictionaryIndexConfig indexConfig)
throws IOException {
+ return read(dataBuffer, metadata, indexConfig, null);
+ }
+
+ public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata
metadata,
+ DictionaryIndexConfig indexConfig, String internIdentifierStr)
+ throws IOException {
+
FieldSpec.DataType dataType = metadata.getDataType();
boolean loadOnHeap = indexConfig.isOnHeap();
+ String columnName = metadata.getColumnName();
if (loadOnHeap) {
- String columnName = metadata.getColumnName();
- LOGGER.info("Loading on-heap dictionary for column: {}", columnName);
+ LOGGER.info("Loading on-heap dictionary for column: {}, intern={}",
columnName, internIdentifierStr != null);
}
int length = metadata.getCardinality();
@@ -308,7 +321,20 @@ public class DictionaryIndexType
: new BigDecimalDictionary(dataBuffer, length, numBytesPerValue);
case STRING:
numBytesPerValue = metadata.getColumnMaxLength();
- return loadOnHeap ? new OnHeapStringDictionary(dataBuffer, length,
numBytesPerValue)
+
+ // If interning is enabled, get the required interners.
+ FALFInterner<String> strInterner = null;
+ FALFInterner<byte[]> byteInterner = null;
+ Intern internConfig = indexConfig.getIntern();
+ if (internConfig != null && !internConfig.isDisabled()) {
+ Preconditions.checkState(loadOnHeap, "Interning is only supported
for on-heap dictionaries.");
+ DictionaryInternerHolder internerHolder =
DictionaryInternerHolder.getInstance();
+ strInterner = internerHolder.getStrInterner(internIdentifierStr,
internConfig.getCapacity());
+ byteInterner = internerHolder.getByteInterner(internIdentifierStr,
internConfig.getCapacity());
+ LOGGER.info("Enabling interning for dictionary column: {}",
columnName);
+ }
+
+ return loadOnHeap ? new OnHeapStringDictionary(dataBuffer, length,
numBytesPerValue, strInterner, byteInterner)
: new StringDictionary(dataBuffer, length, numBytesPerValue);
case BYTES:
numBytesPerValue = metadata.getColumnMaxLength();
@@ -352,9 +378,30 @@ public class DictionaryIndexType
@Override
protected Dictionary createIndexReader(PinotDataBuffer dataBuffer,
ColumnMetadata metadata,
DictionaryIndexConfig indexConfig)
- throws IOException, IndexReaderConstraintException {
+ throws IOException, IndexReaderConstraintException {
return DictionaryIndexType.read(dataBuffer, metadata, indexConfig);
}
+
+ @Override
+ public Dictionary createIndexReader(SegmentDirectory.Reader segmentReader,
FieldIndexConfigs fieldIndexConfigs,
+ ColumnMetadata metadata) throws IOException,
IndexReaderConstraintException {
+ String colName = metadata.getColumnName();
+
+ if (!segmentReader.hasIndexFor(colName, StandardIndexes.dictionary())) {
+ return null;
+ }
+
+ PinotDataBuffer buffer = segmentReader.getIndexFor(colName,
StandardIndexes.dictionary());
+ DictionaryIndexConfig config =
fieldIndexConfigs.getConfig(StandardIndexes.dictionary());
+ String tableName =
segmentReader.toSegmentDirectory().getSegmentMetadata().getTableName();
+ String internIdentifierStr =
DictionaryInternerHolder.getInstance().createIdentifier(tableName, colName);
+
+ try {
+ return DictionaryIndexType.read(buffer, metadata, config,
internIdentifierStr);
+ } catch (RuntimeException ex) {
+ throw new RuntimeException("Cannot read index " +
StandardIndexes.dictionary() + " for column " + colName, ex);
+ }
+ }
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryInternerHolder.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryInternerHolder.java
new file mode 100644
index 0000000000..45b49ab752
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryInternerHolder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.dictionary;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.utils.FALFInterner;
+
+
+/**
+ * This class holds the dictionary interners. It is currently used only for
OnHeapStringDictionary.
+ */
+public class DictionaryInternerHolder {
+ private static final DictionaryInternerHolder INSTANCE = new
DictionaryInternerHolder();
+
+ // Map containing tableName + columnName as key and the interner as value.
The interner is common across all the
+ // segments for a given table.
+ private Map<String, FALFInterner<String>> _strInternerInfoMap;
+ private Map<String, FALFInterner<byte[]>> _byteInternerInfoMap;
+
+ private DictionaryInternerHolder() {
+ _strInternerInfoMap = new ConcurrentHashMap<>();
+ _byteInternerInfoMap = new ConcurrentHashMap<>();
+ }
+
+ public static DictionaryInternerHolder getInstance() {
+ return INSTANCE;
+ }
+
+ public FALFInterner<String> getStrInterner(String columnIdentifier, int
capacity) {
+ return _strInternerInfoMap.computeIfAbsent(columnIdentifier, k -> new
FALFInterner<>(capacity));
+ }
+
+ public FALFInterner<byte[]> getByteInterner(String columnIdentifier, int
capacity) {
+ return _byteInternerInfoMap.computeIfAbsent(columnIdentifier, k -> new
FALFInterner<>(capacity, Arrays::hashCode));
+ }
+
+ public String createIdentifier(String tableName, String colName) {
+ return tableName + ":" + colName;
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
index 13ab350b7f..3a647112c5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/OnHeapStringDictionary.java
@@ -21,9 +21,11 @@ package org.apache.pinot.segment.local.segment.index.readers;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.math.BigDecimal;
import java.util.Arrays;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.FALFInterner;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -42,7 +44,8 @@ public class OnHeapStringDictionary extends
BaseImmutableDictionary {
private final byte[][] _unpaddedBytes;
private final Object2IntOpenHashMap<String> _unPaddedStringToIdMap;
- public OnHeapStringDictionary(PinotDataBuffer dataBuffer, int length, int
numBytesPerValue) {
+ public OnHeapStringDictionary(PinotDataBuffer dataBuffer, int length, int
numBytesPerValue,
+ @Nullable FALFInterner<String> strInterner, @Nullable
FALFInterner<byte[]> byteInterner) {
super(dataBuffer, length, numBytesPerValue);
_unpaddedBytes = new byte[length][];
@@ -51,9 +54,17 @@ public class OnHeapStringDictionary extends
BaseImmutableDictionary {
_unPaddedStringToIdMap.defaultReturnValue(Dictionary.NULL_VALUE_INDEX);
byte[] buffer = new byte[numBytesPerValue];
+ boolean enableInterning = strInterner != null && byteInterner != null;
+
for (int i = 0; i < length; i++) {
- _unpaddedBytes[i] = getUnpaddedBytes(i, buffer);
- _unpaddedStrings[i] = new String(_unpaddedBytes[i], UTF_8);
+ if (enableInterning) {
+ _unpaddedBytes[i] = byteInterner.intern(getUnpaddedBytes(i, buffer));
+ _unpaddedStrings[i] = strInterner.intern(new String(_unpaddedBytes[i],
UTF_8));
+ } else {
+ _unpaddedBytes[i] = getUnpaddedBytes(i, buffer);
+ _unpaddedStrings[i] = new String(_unpaddedBytes[i], UTF_8);
+ }
+
_unPaddedStringToIdMap.put(_unpaddedStrings[i], i);
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexTypeTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexTypeTest.java
index 8e3db4f99b..8a4ed209ec 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexTypeTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexTypeTest.java
@@ -21,12 +21,14 @@ package
org.apache.pinot.segment.local.segment.index.dictionary;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.Intern;
import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -38,7 +40,8 @@ public class DictionaryIndexTypeTest {
public static class ConfTest extends AbstractSerdeIndexContract {
protected void assertEquals(DictionaryIndexConfig expected) {
- Assert.assertEquals(getActualConfig("dimInt",
StandardIndexes.dictionary()), expected);
+ DictionaryIndexConfig actualConfig = getActualConfig("dimInt",
StandardIndexes.dictionary());
+ Assert.assertEquals(actualConfig, expected);
}
@Test
@@ -132,7 +135,7 @@ public class DictionaryIndexTypeTest {
throws IOException {
_tableConfig.getIndexingConfig()
.setOnHeapDictionaryColumns(JsonUtils.stringToObject("[\"dimInt\"]",
_stringListTypeRef));
- assertEquals(new DictionaryIndexConfig(true, null));
+ assertEquals(new DictionaryIndexConfig(true, null, null));
}
@Test
@@ -140,7 +143,7 @@ public class DictionaryIndexTypeTest {
throws IOException {
_tableConfig.getIndexingConfig()
.setVarLengthDictionaryColumns(JsonUtils.stringToObject("[\"dimInt\"]",
_stringListTypeRef));
- assertEquals(new DictionaryIndexConfig(false, true));
+ assertEquals(new DictionaryIndexConfig(false, true, null));
}
@Test
@@ -176,7 +179,7 @@ public class DictionaryIndexTypeTest {
+ " }"
+ " }\n"
+ " }");
- assertEquals(new DictionaryIndexConfig(true, true));
+ assertEquals(new DictionaryIndexConfig(true, true, null));
}
@Test
@@ -191,7 +194,60 @@ public class DictionaryIndexTypeTest {
+ " }"
+ " }\n"
+ " }");
- assertEquals(new DictionaryIndexConfig(true, false));
+ assertEquals(new DictionaryIndexConfig(true, false, null));
+ }
+
+ @Test
+ public void newOnHeapWithInternConfig()
+ throws IOException {
+ addFieldIndexConfig(""
+ + " {\n"
+ + " \"name\": \"dimInt\","
+ + " \"indexes\" : {\n"
+ + " \"dictionary\": {\n"
+ + " \"onHeap\": true,\n"
+ + " \"intern\": {\n"
+ + " \"capacity\":1000\n"
+ + " }"
+ + " }"
+ + " }\n"
+ + " }");
+ assertEquals(new DictionaryIndexConfig(true, false, new Intern(1000)));
+ }
+
+ @Test
+ public void newDisabledOnHeapWithInternConfig()
+ throws IOException {
+ addFieldIndexConfig(""
+ + " {\n"
+ + " \"name\": \"dimInt\","
+ + " \"indexes\" : {\n"
+ + " \"dictionary\": {\n"
+ + " \"onHeap\": false,\n"
+ + " \"intern\": {\n"
+ + " \"capacity\":1000\n"
+ + " }"
+ + " }"
+ + " }\n"
+ + " }");
+ assertThrows(UncheckedIOException.class, () -> getActualConfig("dimInt",
StandardIndexes.dictionary()));
+ }
+
+ @Test
+ public void newOnHeapWithEmptyConfig()
+ throws IOException {
+ addFieldIndexConfig(""
+ + " {\n"
+ + " \"name\": \"dimInt\","
+ + " \"indexes\" : {\n"
+ + " \"dictionary\": {\n"
+ + " \"onHeap\": true,\n"
+ + " \"intern\": {\n"
+ + " }"
+ + " }"
+ + " }\n"
+ + " }");
+ assertThrows(UncheckedIOException.class, () -> getActualConfig("dimInt",
StandardIndexes.dictionary()));
}
@Test
@@ -205,7 +261,7 @@ public class DictionaryIndexTypeTest {
+ " }"
+ " }\n"
+ " }");
- assertEquals(new DictionaryIndexConfig(false, false));
+ assertEquals(new DictionaryIndexConfig(false, false, null));
}
@Test
@@ -220,7 +276,7 @@ public class DictionaryIndexTypeTest {
+ " }"
+ " }\n"
+ " }");
- assertEquals(new DictionaryIndexConfig(false, true));
+ assertEquals(new DictionaryIndexConfig(false, true, null));
}
@Test
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
index bdab861630..c93768f083 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
@@ -372,7 +372,7 @@ public class ImmutableDictionaryTest {
try (OnHeapStringDictionary onHeapStringDictionary = new
OnHeapStringDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
new File(TEMP_DIR, STRING_COLUMN_NAME +
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES,
- _numBytesPerStringValue)) {
+ _numBytesPerStringValue, null, null)) {
testStringDictionary(onHeapStringDictionary);
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
index 303f87b520..78e7b41c1a 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
@@ -34,6 +34,7 @@ import org.apache.pinot.spi.utils.ArrayCopyUtils;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.FALFInterner;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -322,7 +323,29 @@ public class ImmutableDictionaryTypeConversionTest {
throws Exception {
try (OnHeapStringDictionary onHeapStringDictionary = new
OnHeapStringDictionary(
PinotDataBuffer.mapReadOnlyBigEndianFile(
- new File(TEMP_DIR, STRING_COLUMN_NAME +
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH)) {
+ new File(TEMP_DIR, STRING_COLUMN_NAME +
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
+ null, null)) {
+ testStringDictionary(onHeapStringDictionary);
+ }
+ }
+
+ @Test
+ public void testOnHeapStringDictionaryWithInterner()
+ throws Exception {
+ FALFInterner<String> strInterner = new FALFInterner<>(128);
+ FALFInterner<byte[]> byteInterner = new FALFInterner<>(128,
Arrays::hashCode);
+
+ try (OnHeapStringDictionary onHeapStringDictionary = new
OnHeapStringDictionary(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(
+ new File(TEMP_DIR, STRING_COLUMN_NAME +
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
+ strInterner, byteInterner)) {
+ testStringDictionary(onHeapStringDictionary);
+ }
+
+ try (OnHeapStringDictionary onHeapStringDictionary = new
OnHeapStringDictionary(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(
+ new File(TEMP_DIR, STRING_COLUMN_NAME +
V1Constants.Dict.FILE_EXTENSION)), NUM_VALUES, STRING_LENGTH,
+ strInterner, byteInterner)) {
testStringDictionary(onHeapStringDictionary);
}
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/DictionaryIndexConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/DictionaryIndexConfig.java
index f93f13c32c..d63979f744 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/DictionaryIndexConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/DictionaryIndexConfig.java
@@ -21,29 +21,46 @@ package org.apache.pinot.segment.spi.index;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.Intern;
public class DictionaryIndexConfig extends IndexConfig {
- public static final DictionaryIndexConfig DEFAULT = new
DictionaryIndexConfig(false, false, false);
- public static final DictionaryIndexConfig DISABLED = new
DictionaryIndexConfig(true, false, false);
+ public static final DictionaryIndexConfig DEFAULT = new
DictionaryIndexConfig(false, false, false, Intern.DISABLED);
+ public static final DictionaryIndexConfig DISABLED = new
DictionaryIndexConfig(true, false, false, Intern.DISABLED);
private final boolean _onHeap;
private final boolean _useVarLengthDictionary;
+ private final Intern _intern;
public DictionaryIndexConfig(Boolean onHeap, @Nullable Boolean
useVarLengthDictionary) {
- this(false, onHeap, useVarLengthDictionary);
+ this(onHeap, useVarLengthDictionary, null);
+ }
+
+ public DictionaryIndexConfig(Boolean onHeap, @Nullable Boolean
useVarLengthDictionary, Intern intern) {
+ this(false, onHeap, useVarLengthDictionary, intern);
}
@JsonCreator
public DictionaryIndexConfig(@JsonProperty("disabled") Boolean disabled,
@JsonProperty("onHeap") Boolean onHeap,
- @JsonProperty("useVarLengthDictionary") @Nullable Boolean
useVarLengthDictionary) {
+ @JsonProperty("useVarLengthDictionary") @Nullable Boolean
useVarLengthDictionary,
+ @JsonProperty("intern") @Nullable Intern intern) {
super(disabled);
+
+ if (intern != null) {
+ // Intern configs only work with onHeapDictionary. This precondition can
be removed when/if we support interning
+ // for off-heap dictionary.
+ Preconditions.checkState(intern.isDisabled() ||
Boolean.TRUE.equals(onHeap),
+ "Intern configs only work with on-heap dictionary");
+ }
+
_onHeap = onHeap != null && onHeap;
_useVarLengthDictionary = Boolean.TRUE.equals(useVarLengthDictionary);
+ _intern = intern;
}
public static DictionaryIndexConfig disabled() {
@@ -58,6 +75,10 @@ public class DictionaryIndexConfig extends IndexConfig {
return _useVarLengthDictionary;
}
+ public Intern getIntern() {
+ return _intern;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -67,19 +88,21 @@ public class DictionaryIndexConfig extends IndexConfig {
return false;
}
DictionaryIndexConfig that = (DictionaryIndexConfig) o;
- return _onHeap == that._onHeap && _useVarLengthDictionary ==
that._useVarLengthDictionary;
+ return _onHeap == that._onHeap && _useVarLengthDictionary ==
that._useVarLengthDictionary && Objects.equals(_intern,
+ that._intern);
}
@Override
public int hashCode() {
- return Objects.hash(_onHeap, _useVarLengthDictionary);
+ return Objects.hash(_onHeap, _useVarLengthDictionary, _intern);
}
@Override
public String toString() {
if (isEnabled()) {
+ String internStr = _intern == null ? "null" : _intern.toString();
return "DictionaryIndexConfig{" + "\"onHeap\":" + _onHeap + ",
\"useVarLengthDictionary\":"
- + _useVarLengthDictionary + "}";
+ + _useVarLengthDictionary + ", \"intern\":" + internStr + "}";
} else {
return "DictionaryIndexConfig{" + "\"disabled\": true}";
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/Intern.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/Intern.java
new file mode 100644
index 0000000000..e5fbf9ec43
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/Intern.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.config.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+
+
+/**
+ * Class that holds the configurations regarding interning.
+ */
+public class Intern {
+ public static final Intern DISABLED = new Intern(true, 0);
+
+ private boolean _disabled;
+ private int _capacity;
+
+ public Intern(int capacity) {
+ this(false, capacity);
+ }
+
+ @JsonCreator
+ public Intern(@JsonProperty("disabled") boolean disabled,
@JsonProperty("capacity") int capacity) {
+ Preconditions.checkState(capacity > 0 || disabled, "Invalid interner
capacity: " + capacity);
+ Preconditions.checkState(capacity == 0 || !disabled, "Enable interning to
use capacity > 0");
+
+ _disabled = disabled;
+ _capacity = capacity;
+ }
+
+ public boolean isDisabled() {
+ return _disabled;
+ }
+
+ public int getCapacity() {
+ return _capacity;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Intern that = (Intern) o;
+ return _disabled == that._disabled && _capacity == that._capacity;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), _disabled, _capacity);
+ }
+
+ @Override
+ public String toString() {
+ return "\"disabled\":" + _disabled + ", \"capacity\":" + _capacity;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/FALFInterner.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/FALFInterner.java
new file mode 100644
index 0000000000..89a3355756
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/FALFInterner.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.utils;
+
+
+import com.google.common.collect.Interner;
+import java.util.Objects;
+import java.util.function.BiPredicate;
+import java.util.function.ToIntFunction;
+
+/**
+ * Fixed-size Array-based, Lock-Free Interner.
+ *
+ * !!!!!!!!!!!!!!! READ THE PARAGRAPH BELOW BEFORE USING THIS CLASS
!!!!!!!!!!!!!!!!
+ * This class is technically not thread-safe. Therefore if it's called from
multiple
+ * threads, it should either be used with proper synchronization (in the same
way as
+ * you would use e.g. a HashMap), or under the following conditions:
+ * all the objects being interned are not just immutable, but also final (that
is, all
+ * their fields used in equals() and hashCode() methods are explicitly marked
final).
+ * That's to ensure that all threads always see the same contents of these
objects. If
+ * this rule is not followed, using this class from multiple threads may lead
to strange
+ * non-deterministic errors. Note that objects with all private fields that
are not
+ * marked final, or immutable collections created via
Collection.unmodifiableMap() etc,
+ * don't qualify.
+ *
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+ *
+ * This interner is intended to be used when either:
+ * (a) distribution of values among the objects to be interned make them not
suitable
+ * for standard interners
+ * (b) speed is more important than ultimate memory savings
+ *
+ * Problem (a) occurs when both the total number of objects AND the number of
unique
+ * values is large. For example, there are 1M strings that look like "a", "a",
"b", "b",
+ * "c", "c", ... - that is, for each unique value there are only two separate
objects.
+ * Another problematic case is when "a" has 1000 copies, "b" has 900 copies,
etc.,
+ * but in the last few hundred thousand objects each one is unique. In both
cases, if
+ * we use a standard interner such as a Guava interner or a ConcurrentHashMap
to
+ * deduplicate such objects, the amount of memory consumed by the interner
itself to
+ * store objects that have few or no duplicates, can be comparable, or even
exceed, the
+ * savings achieved by getting rid of duplicate objects.
+ *
+ * This implementation addresses the above problems by interning objects
"optimistically".
+ * It is a fixed-size, open-hashmap-based object cache. When there is a cache
miss,
+ * a cached object in the given slot is always replaced with a new object.
There is
+ * no locking and no synchronization, and thus, no associated overhead. In
essence,
+ * this cache is based on the idea that an object with value X, that has many
copies,
+ * has a higher chance of staying in the cache for long enough to guarantee
several
+ * cache hits for itself before a miss evicts it and replaces it with an
object with
+ * a different value Y.
+ *
+ * This interner has a minimum possible memory footprint. You should be
careful when
+ * choosing its capacity. In general, the bigger the better, but if some of
the objects
+ * that are interned eventually go away, an interner with too big a capacity
may still
+ * keep these objects in memory. Also, since there are no collision chains, it
is
+ * very important to use a hash function with the most uniform distribution,
to minimize
+ * a chance that two or more objects with many duplicates compete for the same
array slot.
+ *
+ * For more information, see
https://dzone.com/articles/duplicate-objects-in-java-not-just-strings
+ * Credits to the author: Misha Dmitriev
+ */
+public class FALFInterner<T> implements Interner<T> {
+ private static final int MAXIMUM_CAPACITY = 1 << 30;
+
+ private final Object[] _cache;
+ private final int _cacheLengthMinusOne;
+ private final BiPredicate<T, T> _equalsFunction;
+ private final ToIntFunction<T> _hashFunction;
+
+ /**
+ * Constructs a new instance with the specified capacity.
+ * Actual capacity will be a power of two number >= expectedCapacity.
+ */
+ public FALFInterner(int expectedCapacity) {
+ this(expectedCapacity, Objects::hashCode);
+ }
+
+ /**
+ * Constructs a new instance with the specified capacity and a custom hash
function.
+ * Actual capacity will be a power of two number >= expectedCapacity.
+ */
+ public FALFInterner(int expectedCapacity, ToIntFunction<T> hashFunction) {
+ this(expectedCapacity, hashFunction, Objects::equals);
+ }
+
+ /**
+ * Constructs a new instance with the specified capacity and custom equals
and hash functions.
+ * Actual capacity will be a power of two number >= expectedCapacity.
+ */
+ public FALFInterner(int expectedCapacity, ToIntFunction<T> hashFunction,
BiPredicate<T, T> equalsFunction) {
+ _cache = new Object[tableSizeFor(expectedCapacity)];
+ _cacheLengthMinusOne = _cache.length - 1;
+ _equalsFunction = Objects.requireNonNull(equalsFunction);
+ _hashFunction = Objects.requireNonNull(hashFunction);
+ }
+
+ /**
+ * IMPORTANT: OBJECTS TO INTERN SHOULD BE IMMUTABLE AND FINAL!
+ * SEE THE JAVADOC OF THIS CLASS FOR MORE INFORMATION.
+ *
+ * Interns the given object. That is, if a cached object obj1 such that
+ * obj1.equals(obj) is available, returns obj1. Otherwise, caches obj and
+ * returns it. None of the cached objects is guaranteed to survive in the
+ * cache.
+ */
+ @Override
+ public T intern(T obj) {
+ int slot = hash(obj) & _cacheLengthMinusOne;
+ T cachedObj = (T) _cache[slot];
+ if (cachedObj != null && _equalsFunction.test(obj, cachedObj)) {
+ return cachedObj;
+ }
+ _cache[slot] = obj;
+ return obj;
+ }
+
+ private int hash(T key) {
+ int h = _hashFunction.applyAsInt(key);
+ return h ^ (h >>> 16);
+ }
+
+ private static int tableSizeFor(int cap) {
+ // Calculated in the same way as in java.util.HashMap
+ int n = cap - 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
+ }
+}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
index 5339d914b2..f97e9b30b1 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
@@ -46,6 +46,9 @@ public class IndexingConfigTest {
indexingConfig.setOnHeapDictionaryColumns(onHeapDictionaryColumns);
List<String> bloomFilterColumns = Arrays.asList("a", "b");
indexingConfig.setBloomFilterColumns(bloomFilterColumns);
+ Map<String, BloomFilterConfig> bloomFilterConfigs = new HashMap<>();
+ bloomFilterConfigs.put("a", new BloomFilterConfig(0.123, 456, true));
+ indexingConfig.setBloomFilterConfigs(bloomFilterConfigs);
Map<String, String> noDictionaryConfig = new HashMap<>();
noDictionaryConfig.put("a", "SNAPPY");
noDictionaryConfig.put("b", "PASS_THROUGH");
@@ -54,7 +57,8 @@ public class IndexingConfigTest {
indexingConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
indexingConfig.setSegmentNameGeneratorType("normalizedDate");
- indexingConfig =
JsonUtils.stringToObject(JsonUtils.objectToString(indexingConfig),
IndexingConfig.class);
+ String indexingConfigStr = JsonUtils.objectToString(indexingConfig);
+ indexingConfig = JsonUtils.stringToObject(indexingConfigStr,
IndexingConfig.class);
assertEquals(indexingConfig.getLoadMode(), "MMAP");
assertTrue(indexingConfig.isAggregateMetrics());
@@ -62,6 +66,7 @@ public class IndexingConfigTest {
assertEquals(indexingConfig.getSortedColumn(), sortedColumn);
assertEquals(indexingConfig.getOnHeapDictionaryColumns(),
onHeapDictionaryColumns);
assertEquals(indexingConfig.getBloomFilterColumns(), bloomFilterColumns);
+ assertEquals(indexingConfig.getBloomFilterConfigs(), bloomFilterConfigs);
assertEquals(indexingConfig.getNoDictionaryConfig(), noDictionaryConfig);
assertEquals(indexingConfig.getVarLengthDictionaryColumns(),
varLengthDictionaryColumns);
assertEquals(indexingConfig.getSegmentNameGeneratorType(),
"normalizedDate");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]