This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf8fd93759 MultiValue VarByte V4 index writer and consolidate V4
reader for all types (#11674)
cf8fd93759 is described below
commit cf8fd93759ebfc521141190196922aa26787c7e1
Author: Saurabh Dubey <[email protected]>
AuthorDate: Thu Sep 28 23:22:41 2023 +0530
MultiValue VarByte V4 index writer and consolidate V4 reader for all types
(#11674)
---
.../pinot/perf/BenchmarkRawForwardIndexReader.java | 8 +-
.../impl/VarByteChunkForwardIndexWriter.java | 6 +-
.../impl/VarByteChunkForwardIndexWriterV4.java | 48 +++++++
.../local/io/writer/impl/VarByteChunkWriter.java | 4 +
.../impl/fwd/MultiValueVarByteRawIndexCreator.java | 17 +--
.../index/forward/ForwardIndexReaderFactory.java | 32 +++--
....java => VarByteChunkForwardIndexReaderV4.java} | 159 ++++++++++++++++++++-
.../impl/VarByteChunkSVForwardIndexWriterTest.java | 4 +-
.../MultiValueFixedByteRawIndexCreatorTest.java | 59 ++++----
.../MultiValueVarByteRawIndexCreatorTest.java | 43 +++---
.../segment/index/creator/VarByteChunkV4Test.java | 8 +-
11 files changed, 300 insertions(+), 88 deletions(-)
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
index 6da32f2906..31c106cd03 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
@@ -28,8 +28,8 @@ import org.apache.commons.io.FileUtils;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec;
@@ -184,9 +184,9 @@ public class BenchmarkRawForwardIndexReader {
public void readV4(V4State state, Blackhole bh)
throws IOException {
try (PinotDataBuffer buffer =
PinotDataBuffer.loadBigEndianFile(state._file);
- VarByteChunkSVForwardIndexReaderV4 reader =
- new VarByteChunkSVForwardIndexReaderV4(buffer,
FieldSpec.DataType.BYTES);
- VarByteChunkSVForwardIndexReaderV4.ReaderContext context =
reader.createContext()) {
+ VarByteChunkForwardIndexReaderV4 reader =
+ new VarByteChunkForwardIndexReaderV4(buffer, FieldSpec.DataType.BYTES,
true);
+ VarByteChunkForwardIndexReaderV4.ReaderContext context =
reader.createContext()) {
for (int i = 0; i < state._records; i++) {
bh.consume(reader.getBytes(i, context));
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
index b6daaf7fe7..fadcce827e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
@@ -99,7 +99,8 @@ public class VarByteChunkForwardIndexWriter extends
BaseChunkForwardIndexWriter
// Note: some duplication is tolerated between these overloads for the sake
of memory efficiency
- public void putStrings(String[] values) {
+ @Override
+ public void putStringMV(String[] values) {
// the entire String[] will be encoded as a single string, write the
header here
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
_chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
@@ -122,7 +123,8 @@ public class VarByteChunkForwardIndexWriter extends
BaseChunkForwardIndexWriter
writeChunkIfNecessary();
}
- public void putByteArrays(byte[][] values) {
+ @Override
+ public void putBytesMV(byte[][] values) {
// the entire byte[][] will be encoded as a single string, write the
header here
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
_chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
index d70ed2dcbc..35c61f35f5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
@@ -37,6 +37,8 @@ import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
/**
* Chunk-based raw (non-dictionary-encoded) forward index writer where each
chunk contains variable number of docs, and
@@ -142,6 +144,52 @@ public class VarByteChunkForwardIndexWriterV4 implements
VarByteChunkWriter {
_nextDocId++;
}
+ @Override
+ public void putStringMV(String[] values) {
+ // num values + length of each value
+ int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+ int size = headerSize;
+ byte[][] stringBytes = new byte[values.length][];
+ for (int i = 0; i < values.length; i++) {
+ stringBytes[i] = values[i].getBytes(UTF_8);
+ size += stringBytes[i].length;
+ }
+
+ // Format :
[numValues][length1][length2]...[lengthN][value1][value2]...[valueN]
+ byte[] serializedBytes = new byte[size];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
+ byteBuffer.putInt(values.length);
+ byteBuffer.position(headerSize);
+ for (int i = 0; i < values.length; i++) {
+ byteBuffer.putInt((i + 1) * Integer.BYTES, stringBytes[i].length);
+ byteBuffer.put(stringBytes[i]);
+ }
+
+ putBytes(serializedBytes);
+ }
+
+ @Override
+ public void putBytesMV(byte[][] values) {
+ // num values + length of each value
+ int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+ int size = headerSize;
+ for (byte[] value : values) {
+ size += value.length;
+ }
+
+ // Format :
[numValues][length1][length2]...[lengthN][bytes1][bytes2]...[bytesN]
+ byte[] serializedBytes = new byte[size];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
+ byteBuffer.putInt(values.length);
+ byteBuffer.position(headerSize);
+ for (int i = 0; i < values.length; i++) {
+ byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length);
+ byteBuffer.put(values[i]);
+ }
+
+ putBytes(serializedBytes);
+ }
+
private void writeHugeChunk(byte[] bytes) {
// huge values where the bytes and their length prefix don't fit in to the
remainder of the buffer after the prefix
// for the number of documents in a regular chunk are written as a single
value without metadata, and these chunks
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
index 1e6dbc2837..bf3537d67c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
@@ -28,4 +28,8 @@ public interface VarByteChunkWriter extends Closeable {
void putString(String value);
void putBytes(byte[] value);
+
+ void putStringMV(String[] values);
+
+ void putBytesMV(byte[][] values);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 85dba85ab9..0c41ce2c6e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -37,7 +38,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
- private final VarByteChunkForwardIndexWriter _indexWriter;
+ private final VarByteChunkWriter _indexWriter;
private final DataType _valueType;
/**
@@ -80,13 +81,9 @@ public class MultiValueVarByteRawIndexCreator implements
ForwardIndexCreator {
int numDocsPerChunk = Math.max(
TARGET_MAX_CHUNK_SIZE / (totalMaxLength +
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
1);
- // TODO: Support V4 MV reader
- // Currently fall back to V2 for backward compatible
- if (writerVersion == VarByteChunkForwardIndexWriterV4.VERSION) {
- writerVersion = 2;
- }
- _indexWriter = new VarByteChunkForwardIndexWriter(file, compressionType,
totalDocs, numDocsPerChunk, totalMaxLength,
- writerVersion);
+ _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ?
new VarByteChunkForwardIndexWriter(file,
+ compressionType, totalDocs, numDocsPerChunk, totalMaxLength,
writerVersion)
+ : new VarByteChunkForwardIndexWriterV4(file, compressionType,
TARGET_MAX_CHUNK_SIZE);
_valueType = valueType;
}
@@ -107,12 +104,12 @@ public class MultiValueVarByteRawIndexCreator implements
ForwardIndexCreator {
@Override
public void putStringMV(final String[] values) {
- _indexWriter.putStrings(values);
+ _indexWriter.putStringMV(values);
}
@Override
public void putBytesMV(final byte[][] values) {
- _indexWriter.putByteArrays(values);
+ _indexWriter.putBytesMV(values);
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index ecaaf875a5..6de6e1294b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -25,9 +25,9 @@ import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVFo
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
import
org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -75,19 +75,31 @@ public class ForwardIndexReaderFactory extends
IndexReaderFactory.Default<Forwar
public static ForwardIndexReader createRawIndexReader(PinotDataBuffer
dataBuffer, DataType storedType,
boolean isSingleValue) {
int version = dataBuffer.getInt(0);
+ if (isSingleValue && storedType.isFixedWidth()) {
+ return version == FixedBytePower2ChunkSVForwardIndexReader.VERSION
+ ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer,
storedType)
+ : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
+ }
+
+ if (version == VarByteChunkForwardIndexWriterV4.VERSION) {
+ // V4 reader is common for sv var byte, mv fixed byte and mv var byte
+ return new VarByteChunkForwardIndexReaderV4(dataBuffer, storedType,
isSingleValue);
+ } else {
+ return createNonV4RawIndexReader(dataBuffer, storedType, isSingleValue);
+ }
+ }
+
+ private static ForwardIndexReader createNonV4RawIndexReader(PinotDataBuffer
dataBuffer, DataType storedType,
+ boolean isSingleValue) {
+ // Only reach here if SV + raw + var byte + non v4 or MV + non v4
if (isSingleValue) {
+ return new VarByteChunkSVForwardIndexReader(dataBuffer, storedType);
+ } else {
if (storedType.isFixedWidth()) {
- return version == FixedBytePower2ChunkSVForwardIndexReader.VERSION
- ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer,
storedType)
- : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
+ return new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType);
} else {
- return version == VarByteChunkForwardIndexWriterV4.VERSION ? new
VarByteChunkSVForwardIndexReaderV4(dataBuffer,
- storedType) : new VarByteChunkSVForwardIndexReader(dataBuffer,
storedType);
+ return new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
}
- } else {
- // TODO: Support V4 MV reader
- return storedType.isFixedWidth() ? new
FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
- : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
similarity index 65%
rename from
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
rename to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
index c1d842b23c..4858e790cb 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
@@ -39,13 +39,13 @@ import org.slf4j.LoggerFactory;
/**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader
for values of variable length data type
- * (BIG_DECIMAL, STRING, BYTES).
+ * Chunk-based raw (non-dictionary-encoded) forward index reader for values of
SV variable length data types
+ * (BIG_DECIMAL, STRING, BYTES), MV fixed length and MV variable length data
types.
* <p>For data layout, please refer to the documentation for {@link
VarByteChunkForwardIndexWriterV4}
*/
-public class VarByteChunkSVForwardIndexReaderV4
- implements
ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
+public class VarByteChunkForwardIndexReaderV4
+ implements
ForwardIndexReader<VarByteChunkForwardIndexReaderV4.ReaderContext> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(VarByteChunkForwardIndexReaderV4.class);
private static final int METADATA_ENTRY_SIZE = 8;
private final FieldSpec.DataType _storedType;
@@ -55,8 +55,10 @@ public class VarByteChunkSVForwardIndexReaderV4
private final PinotDataBuffer _metadata;
private final PinotDataBuffer _chunks;
+ private final boolean _isSingleValue;
- public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer,
FieldSpec.DataType storedType) {
+ public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer,
FieldSpec.DataType storedType,
+ boolean isSingleValue) {
int version = dataBuffer.getInt(0);
Preconditions.checkState(version ==
VarByteChunkForwardIndexWriterV4.VERSION, "Illegal index version: %s", version);
_storedType = storedType;
@@ -67,6 +69,7 @@ public class VarByteChunkSVForwardIndexReaderV4
// the file has a BE header for compatability reasons (version selection)
but the content is LE
_metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN);
_chunks = dataBuffer.view(chunksOffset, dataBuffer.size(),
ByteOrder.LITTLE_ENDIAN);
+ _isSingleValue = isSingleValue;
}
@Override
@@ -76,7 +79,7 @@ public class VarByteChunkSVForwardIndexReaderV4
@Override
public boolean isSingleValue() {
- return true;
+ return _isSingleValue;
}
@Override
@@ -113,6 +116,148 @@ public class VarByteChunkSVForwardIndexReaderV4
return context.getValue(docId);
}
+ @Override
+ public int getIntMV(int docId, int[] valueBuffer,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getInt();
+ }
+ return numValues;
+ }
+
+ @Override
+ public int[] getIntMV(int docId,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ int[] valueBuffer = new int[numValues];
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getInt();
+ }
+ return valueBuffer;
+ }
+
+ @Override
+ public int getLongMV(int docId, long[] valueBuffer,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getLong();
+ }
+ return numValues;
+ }
+
+ @Override
+ public long[] getLongMV(int docId,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ long[] valueBuffer = new long[numValues];
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getLong();
+ }
+ return valueBuffer;
+ }
+
+ @Override
+ public int getFloatMV(int docId, float[] valueBuffer,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getFloat();
+ }
+ return numValues;
+ }
+
+ @Override
+ public float[] getFloatMV(int docId,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ float[] valueBuffer = new float[numValues];
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getFloat();
+ }
+ return valueBuffer;
+ }
+
+ @Override
+ public int getDoubleMV(int docId, double[] valueBuffer,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getDouble();
+ }
+ return numValues;
+ }
+
+ @Override
+ public double[] getDoubleMV(int docId,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ double[] valueBuffer = new double[numValues];
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getFloat();
+ }
+ return valueBuffer;
+ }
+
+ @Override
+ public int getStringMV(int docId, String[] valueBuffer,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ byteBuffer.position((numValues + 1) * Integer.BYTES);
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.get(bytes);
+ valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
+ }
+ return numValues;
+ }
+
+ @Override
+ public String[] getStringMV(int docId,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ byteBuffer.position((numValues + 1) * Integer.BYTES);
+ String[] valueBuffer = new String[numValues];
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.get(bytes);
+ valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
+ }
+ return valueBuffer;
+ }
+
+ @Override
+ public int getBytesMV(int docId, byte[][] valueBuffer,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ byteBuffer.position((numValues + 1) * Integer.BYTES);
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.get(bytes, 0, length);
+ valueBuffer[i] = bytes;
+ }
+ return numValues;
+ }
+
+ @Override
+ public byte[][] getBytesMV(int docId,
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+ int numValues = byteBuffer.getInt();
+ byteBuffer.position((numValues + 1) * Integer.BYTES);
+ byte[][] valueBuffer = new byte[numValues][];
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.get(bytes, 0, length);
+ valueBuffer[i] = bytes;
+ }
+ return valueBuffer;
+ }
+
@Override
public void close()
throws IOException {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
index e3d89157db..5a46c9d3f3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
@@ -87,7 +87,7 @@ public class VarByteChunkSVForwardIndexWriterTest {
try (VarByteChunkForwardIndexWriter writer = new
VarByteChunkForwardIndexWriter(file, compressionType, totalDocs,
numDocsPerChunk, maxEntryLengthInBytes, version)) {
for (String[] array : arrays) {
- writer.putStrings(array);
+ writer.putStringMV(array);
}
}
try (VarByteChunkSVForwardIndexReader reader = new
VarByteChunkSVForwardIndexReader(
@@ -122,7 +122,7 @@ public class VarByteChunkSVForwardIndexWriterTest {
try (VarByteChunkForwardIndexWriter writer = new
VarByteChunkForwardIndexWriter(file, compressionType, totalDocs,
numDocsPerChunk, maxEntryLengthInBytes, version)) {
for (String[] array : arrays) {
- writer.putByteArrays(Arrays.stream(array).map(str ->
str.getBytes(UTF_8)).toArray(byte[][]::new));
+ writer.putBytesMV(Arrays.stream(array).map(str ->
str.getBytes(UTF_8)).toArray(byte[][]::new));
}
}
try (VarByteChunkSVForwardIndexReader reader = new
VarByteChunkSVForwardIndexReader(
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index f6ae39aa3b..a0cf8c7a97 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -29,11 +29,14 @@ import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
import
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.testng.Assert;
@@ -52,7 +55,9 @@ public class MultiValueFixedByteRawIndexCreatorTest {
@DataProvider(name = "compressionTypes")
public Object[][] compressionTypes() {
- return Arrays.stream(ChunkCompressionType.values()).map(ct -> new
Object[]{ct}).toArray(Object[][]::new);
+ return Arrays.stream(ChunkCompressionType.values())
+ .flatMap(ct -> IntStream.of(2, 4).boxed()
+ .map(writerVersion -> new Object[]{ct,
writerVersion})).toArray(Object[][]::new);
}
@BeforeClass
@@ -70,98 +75,98 @@ public class MultiValueFixedByteRawIndexCreatorTest {
}
@Test(dataProvider = "compressionTypes")
- public void testMVInt(ChunkCompressionType compressionType)
+ public void testMVInt(ChunkCompressionType compressionType, int
writerVersion)
throws IOException {
// This tests varying lengths of MV rows
testMV(DataType.INT, ints(false), x -> x.length, int[]::new,
MultiValueFixedByteRawIndexCreator::putIntMV,
(reader, context, docId, buffer) -> {
int length = reader.getIntMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
- }, compressionType);
+ }, compressionType, writerVersion);
// This tests a fixed length of MV rows to ensure there are no
BufferOverflowExceptions on filling up the chunk
testMV(DataType.INT, ints(true), x -> x.length, int[]::new,
MultiValueFixedByteRawIndexCreator::putIntMV,
(reader, context, docId, buffer) -> {
int length = reader.getIntMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
- }, compressionType);
+ }, compressionType, writerVersion);
}
@Test(dataProvider = "compressionTypes")
- public void testMVLong(ChunkCompressionType compressionType)
+ public void testMVLong(ChunkCompressionType compressionType, int
writerVersion)
throws IOException {
// This tests varying lengths of MV rows
testMV(DataType.LONG, longs(false), x -> x.length, long[]::new,
MultiValueFixedByteRawIndexCreator::putLongMV,
(reader, context, docId, buffer) -> {
int length = reader.getLongMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
- }, compressionType);
+ }, compressionType, writerVersion);
// This tests a fixed length of MV rows to ensure there are no
BufferOverflowExceptions on filling up the chunk
testMV(DataType.LONG, longs(true), x -> x.length, long[]::new,
MultiValueFixedByteRawIndexCreator::putLongMV,
(reader, context, docId, buffer) -> {
int length = reader.getLongMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
- }, compressionType);
+ }, compressionType, writerVersion);
}
@Test(dataProvider = "compressionTypes")
- public void testMVFloat(ChunkCompressionType compressionType)
+ public void testMVFloat(ChunkCompressionType compressionType, int
writerVersion)
throws IOException {
// This tests varying lengths of MV rows
testMV(DataType.FLOAT, floats(false), x -> x.length, float[]::new,
MultiValueFixedByteRawIndexCreator::putFloatMV,
(reader, context, docId, buffer) -> {
int length = reader.getFloatMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
- }, compressionType);
+ }, compressionType, writerVersion);
// This tests a fixed length of MV rows to ensure there are no
BufferOverflowExceptions on filling up the chunk
testMV(DataType.FLOAT, floats(true), x -> x.length, float[]::new,
MultiValueFixedByteRawIndexCreator::putFloatMV,
(reader, context, docId, buffer) -> {
int length = reader.getFloatMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
- }, compressionType);
+ }, compressionType, writerVersion);
}
@Test(dataProvider = "compressionTypes")
- public void testMVDouble(ChunkCompressionType compressionType)
+ public void testMVDouble(ChunkCompressionType compressionType, int
writerVersion)
throws IOException {
// This tests varying lengths of MV rows
testMV(DataType.DOUBLE, doubles(false), x -> x.length, double[]::new,
- MultiValueFixedByteRawIndexCreator::putDoubleMV,
- (reader, context, docId, buffer) -> {
+ MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context,
docId, buffer) -> {
int length = reader.getDoubleMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
- }, compressionType);
+ }, compressionType, writerVersion);
// This tests a fixed length of MV rows to ensure there are no
BufferOverflowExceptions on filling up the chunk
testMV(DataType.DOUBLE, doubles(true), x -> x.length, double[]::new,
- MultiValueFixedByteRawIndexCreator::putDoubleMV,
- (reader, context, docId, buffer) -> {
+ MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context,
docId, buffer) -> {
int length = reader.getDoubleMV(docId, buffer, context);
return Arrays.copyOf(buffer, length);
- }, compressionType);
+ }, compressionType, writerVersion);
}
public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T>
sizeof, IntFunction<T> constructor,
- Injector<T> injector, Extractor<T> extractor, ChunkCompressionType
compressionType)
+ Injector<T> injector, Extractor<T> extractor, ChunkCompressionType
compressionType, int writerVersion)
throws IOException {
String column = "testCol_" + dataType;
int numDocs = inputs.size();
int maxElements =
inputs.stream().mapToInt(sizeof).max().orElseThrow(RuntimeException::new);
File file = new File(OUTPUT_DIR, column +
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
file.delete();
- MultiValueFixedByteRawIndexCreator creator = new
MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR),
- compressionType, column, numDocs, dataType, maxElements);
+ MultiValueFixedByteRawIndexCreator creator =
+ new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR),
compressionType, column, numDocs, dataType,
+ maxElements, false, writerVersion);
inputs.forEach(input -> injector.inject(creator, input));
creator.close();
//read
- final PinotDataBuffer buffer = PinotDataBuffer
- .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
- FixedByteChunkMVForwardIndexReader reader = new
FixedByteChunkMVForwardIndexReader(buffer,
- dataType.getStoredType());
- final ChunkReaderContext context = reader.createContext();
+ final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0,
file.length(), ByteOrder.BIG_ENDIAN, "");
+ ForwardIndexReader reader =
+ writerVersion == VarByteChunkForwardIndexWriterV4.VERSION ? new
VarByteChunkForwardIndexReaderV4(buffer,
+ dataType.getStoredType(), false) : new
FixedByteChunkMVForwardIndexReader(buffer, dataType.getStoredType());
+
+ final ForwardIndexReaderContext context = reader.createContext();
T valueBuffer = constructor.apply(maxElements);
for (int i = 0; i < numDocs; i++) {
Assert.assertEquals(inputs.get(i), extractor.extract(reader, context, i,
valueBuffer));
@@ -169,7 +174,7 @@ public class MultiValueFixedByteRawIndexCreatorTest {
}
interface Extractor<T> {
- T extract(FixedByteChunkMVForwardIndexReader reader, ChunkReaderContext
context, int offset, T buffer);
+ T extract(ForwardIndexReader reader, ForwardIndexReaderContext context,
int offset, T buffer);
}
interface Injector<T> {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index 1006245586..32f4ff1a2c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -30,10 +30,11 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.testng.Assert;
@@ -56,12 +57,12 @@ public class MultiValueVarByteRawIndexCreatorTest {
@DataProvider
public Object[][] params() {
- return Arrays.stream(ChunkCompressionType.values())
- .flatMap(chunkCompressionType -> IntStream.of(10, 15, 20, 1000).boxed()
- .flatMap(useFullSize -> Stream.of(true, false)
- .flatMap(maxLength -> IntStream.range(1, 20).map(i -> i * 2 -
1).boxed()
- .map(maxNumEntries -> new Object[]{chunkCompressionType,
useFullSize, maxLength,
- maxNumEntries}))))
+ return
Arrays.stream(ChunkCompressionType.values()).flatMap(chunkCompressionType ->
IntStream.of(2, 4).boxed()
+ .flatMap(writerVersion -> IntStream.of(10, 15, 20,
1000).boxed().flatMap(maxLength -> Stream.of(true, false)
+ .flatMap(
+ useFullSize -> IntStream.range(1, 20).map(i -> i * 2 -
1).boxed().map(maxNumEntries -> new Object[]{
+ chunkCompressionType, useFullSize, writerVersion,
maxLength, maxNumEntries
+ })))))
.toArray(Object[][]::new);
}
@@ -86,7 +87,8 @@ public class MultiValueVarByteRawIndexCreatorTest {
}
@Test(dataProvider = "params")
- public void testMVString(ChunkCompressionType compressionType, int
maxLength, boolean useFullSize, int maxNumEntries)
+ public void testMVString(ChunkCompressionType compressionType, boolean
useFullSize, int writerVersion, int maxLength,
+ int maxNumEntries)
throws IOException {
String column = "testCol-" + UUID.randomUUID();
int numDocs = 1000;
@@ -117,18 +119,16 @@ public class MultiValueVarByteRawIndexCreatorTest {
inputs.add(values);
}
try (MultiValueVarByteRawIndexCreator creator = new
MultiValueVarByteRawIndexCreator(OUTPUT_DIR, compressionType,
- column, numDocs, DataType.STRING, maxTotalLength, maxElements)) {
+ column, numDocs, DataType.STRING, maxTotalLength, maxElements,
writerVersion)) {
for (String[] input : inputs) {
creator.putStringMV(input);
}
}
//read
- final PinotDataBuffer buffer = PinotDataBuffer
- .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
- VarByteChunkMVForwardIndexReader reader = new
VarByteChunkMVForwardIndexReader(buffer,
- DataType.STRING);
- final ChunkReaderContext context = reader.createContext();
+ final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0,
file.length(), ByteOrder.BIG_ENDIAN, "");
+ ForwardIndexReader reader =
ForwardIndexReaderFactory.createRawIndexReader(buffer, DataType.STRING, false);
+ final ForwardIndexReaderContext context = reader.createContext();
String[] values = new String[maxElements];
for (int i = 0; i < numDocs; i++) {
int length = reader.getStringMV(i, values, context);
@@ -138,7 +138,8 @@ public class MultiValueVarByteRawIndexCreatorTest {
}
@Test(dataProvider = "params")
- public void testMVBytes(ChunkCompressionType compressionType, int maxLength,
boolean useFullSize, int maxNumEntries)
+ public void testMVBytes(ChunkCompressionType compressionType, boolean
useFullSize, int writerVersion, int maxLength,
+ int maxNumEntries)
throws IOException {
String column = "testCol-" + UUID.randomUUID();
int numDocs = 1000;
@@ -169,18 +170,16 @@ public class MultiValueVarByteRawIndexCreatorTest {
inputs.add(values);
}
try (MultiValueVarByteRawIndexCreator creator = new
MultiValueVarByteRawIndexCreator(OUTPUT_DIR, compressionType,
- column, numDocs, DataType.STRING, maxTotalLength, maxElements)) {
+ column, numDocs, DataType.BYTES, writerVersion, maxTotalLength,
maxElements)) {
for (byte[][] input : inputs) {
creator.putBytesMV(input);
}
}
//read
- final PinotDataBuffer buffer = PinotDataBuffer
- .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
- VarByteChunkMVForwardIndexReader reader = new
VarByteChunkMVForwardIndexReader(buffer,
- DataType.BYTES);
- final ChunkReaderContext context = reader.createContext();
+ final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0,
file.length(), ByteOrder.BIG_ENDIAN, "");
+ ForwardIndexReader reader =
ForwardIndexReaderFactory.createRawIndexReader(buffer, DataType.BYTES, false);
+ final ForwardIndexReaderContext context = reader.createContext();
byte[][] values = new byte[maxElements][];
for (int i = 0; i < numDocs; i++) {
int length = reader.getBytesMV(i, values, context);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
index ba59d057a7..88414b8b58 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
@@ -31,7 +31,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
-import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
+import
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec;
@@ -113,8 +113,8 @@ public class VarByteChunkV4Test {
}
}
try (PinotDataBuffer buffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(_file)) {
- try (VarByteChunkSVForwardIndexReaderV4 reader = new
VarByteChunkSVForwardIndexReaderV4(buffer, dataType);
- VarByteChunkSVForwardIndexReaderV4.ReaderContext context =
reader.createContext()) {
+ try (VarByteChunkForwardIndexReaderV4 reader = new
VarByteChunkForwardIndexReaderV4(buffer, dataType,
+ true); VarByteChunkForwardIndexReaderV4.ReaderContext context =
reader.createContext()) {
for (int i = 0; i < values.size(); i++) {
assertEquals(read.read(reader, context, i), values.get(i));
}
@@ -161,7 +161,7 @@ public class VarByteChunkV4Test {
@FunctionalInterface
interface Read<T> {
- T read(VarByteChunkSVForwardIndexReaderV4 reader,
VarByteChunkSVForwardIndexReaderV4.ReaderContext context,
+ T read(VarByteChunkForwardIndexReaderV4 reader,
VarByteChunkForwardIndexReaderV4.ReaderContext context,
int docId);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]