This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch clucene
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/clucene by this push:
new 98826578c4e Optimize the compression of inverted index position
information (#242)
98826578c4e is described below
commit 98826578c4eff5ca2c1bbde7be06d9a84c7e5cfa
Author: zzzxl <[email protected]>
AuthorDate: Thu Oct 17 22:10:48 2024 +0800
Optimize the compression of inverted index position information (#242)
---
src/core/CLucene/index/FieldInfos.cpp | 18 +-
src/core/CLucene/index/IndexVersion.h | 1 +
src/core/CLucene/index/IndexWriter.cpp | 34 ++-
src/core/CLucene/index/IndexWriter.h | 5 +-
src/core/CLucene/index/SDocumentWriter.cpp | 12 +-
src/core/CLucene/index/SDocumentWriter.h | 1 +
src/core/CLucene/index/SegmentTermDocs.cpp | 4 +-
src/core/CLucene/index/SegmentTermPositions.cpp | 16 +-
src/core/CLucene/index/_FieldInfos.h | 9 +-
src/core/CLucene/index/_SegmentHeader.h | 61 +++++
src/core/CLucene/util/PFORUtil.cpp | 53 ++++
src/core/CLucene/util/PFORUtil.h | 13 +
src/test/CMakeLists.txt | 1 +
src/test/index/TestIndexCompress.cpp | 309 ++++++++++++++++++++++++
src/test/test.h | 1 +
src/test/tests.cpp | 1 +
16 files changed, 514 insertions(+), 25 deletions(-)
diff --git a/src/core/CLucene/index/FieldInfos.cpp
b/src/core/CLucene/index/FieldInfos.cpp
index 00e0c4275a5..3fa035661d4 100644
--- a/src/core/CLucene/index/FieldInfos.cpp
+++ b/src/core/CLucene/index/FieldInfos.cpp
@@ -125,11 +125,12 @@ void FieldInfos::addIndexed(const TCHAR** names, const
bool storeTermVectors, co
void FieldInfos::add(const TCHAR** names, const bool isIndexed, const bool
storeTermVectors,
const bool storePositionWithTermVector, const bool
storeOffsetWithTermVector,
- const bool omitNorms, const bool hasProx, const bool
storePayloads) {
+ const bool omitNorms, const bool hasProx, const bool
storePayloads,
+
IndexVersion indexVersion) {
size_t i = 0;
while ( names[i] != NULL ){
add(names[i], isIndexed, storeTermVectors,
storePositionWithTermVector,
- storeOffsetWithTermVector, omitNorms, hasProx,
storePayloads);
+ storeOffsetWithTermVector, omitNorms, hasProx,
storePayloads, indexVersion);
++i;
}
}
@@ -137,11 +138,13 @@ void FieldInfos::add(const TCHAR** names, const bool
isIndexed, const bool store
FieldInfo* FieldInfos::add(const TCHAR* name, const bool isIndexed, const bool
storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const bool
omitNorms,
- const bool hasProx, const bool storePayloads) {
+ const bool hasProx, const bool storePayloads,
+
IndexVersion indexVersion) {
FieldInfo* fi = fieldInfo(name);
if (fi == NULL) {
return addInternal(name, isIndexed, storeTermVector,
storePositionWithTermVector,
-
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads);
+
storeOffsetWithTermVector, omitNorms, hasProx, storePayloads,
+
indexVersion);
} else {
if (fi->isIndexed != isIndexed) {
fi->isIndexed = true; // once
indexed, always index
@@ -164,6 +167,9 @@ FieldInfo* FieldInfos::add(const TCHAR* name, const bool
isIndexed, const bool s
if (fi->storePayloads != storePayloads) {
fi->storePayloads = true;
}
+ if (fi->indexVersion_ != indexVersion) {
+ fi->indexVersion_ = indexVersion;
+ }
}
return fi;
}
@@ -172,10 +178,12 @@ FieldInfo* FieldInfos::addInternal(const TCHAR* name,
const bool isIndexed,
const bool storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const
bool omitNorms,
- const bool hasProx, const bool
storePayloads) {
+ const bool hasProx, const bool
storePayloads,
+
IndexVersion
indexVersion) {
FieldInfo* fi = _CLNEW FieldInfo(name, isIndexed, byNumber.size(),
storeTermVector,
storePositionWithTermVector, storeOffsetWithTermVector,
omitNorms,
hasProx, storePayloads);
+ fi->setIndexVersion(indexVersion);
byNumber.push_back(fi);
byName.put( fi->name, fi);
return fi;
diff --git a/src/core/CLucene/index/IndexVersion.h
b/src/core/CLucene/index/IndexVersion.h
index 448b1872ab4..1320df2c4c7 100644
--- a/src/core/CLucene/index/IndexVersion.h
+++ b/src/core/CLucene/index/IndexVersion.h
@@ -3,6 +3,7 @@
enum class IndexVersion {
kV0 = 0,
kV1 = 1,
+ kV2 = 2,
kNone
};
\ No newline at end of file
diff --git a/src/core/CLucene/index/IndexWriter.cpp
b/src/core/CLucene/index/IndexWriter.cpp
index 10dfd68c60d..8a2c50431cc 100644
--- a/src/core/CLucene/index/IndexWriter.cpp
+++ b/src/core/CLucene/index/IndexWriter.cpp
@@ -8,6 +8,7 @@
#include "CLucene/analysis/AnalysisHeader.h"
#include "CLucene/analysis/Analyzers.h"
+#include "CLucene/config/repl_wchar.h"
#include "CLucene/document/Document.h"
#include "CLucene/search/Similarity.h"
#include "CLucene/store/Directory.h"
@@ -1327,22 +1328,27 @@ void
IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
std::vector<lucene::index::IndexWriter *> destIndexWriterList;
std::vector<lucene::store::IndexOutput *> nullBitmapIndexOutputList;
try {
- // check hasProx
+ // check hasProx, indexVersion
bool hasProx = false;
+ IndexVersion indexVersion = IndexVersion::kV1;
{
if (!readers.empty()) {
IndexReader* reader = readers[0];
hasProx = reader->getFieldInfos()->hasProx();
+ indexVersion = reader->getFieldInfos()->getIndexVersion();
for (int32_t i = 1; i < readers.size(); i++) {
if (hasProx != readers[i]->getFieldInfos()->hasProx()) {
_CLTHROWA(CL_ERR_IllegalArgument, "src_dirs hasProx
inconformity");
}
+ if (indexVersion !=
readers[i]->getFieldInfos()->getIndexVersion()) {
+ _CLTHROWA(CL_ERR_IllegalArgument, "src_dirs
indexVersion inconformity");
+ }
}
}
}
/// merge fields
- mergeFields(hasProx);
+ mergeFields(hasProx, indexVersion);
/// write fields and create files writers
for (int j = 0; j < numDestIndexes; j++) {
@@ -1390,7 +1396,7 @@ void
IndexWriter::indexCompaction(std::vector<lucene::store::Directory *> &src_d
}
/// merge terms
- mergeTerms(hasProx);
+ mergeTerms(hasProx, indexVersion);
/// merge null_bitmap
mergeNullBitmap(srcNullBitmapValues, nullBitmapIndexOutputList);
@@ -1555,7 +1561,7 @@ void IndexWriter::compareIndexes(lucene::store::Directory
*other) {
}
}
-void IndexWriter::mergeFields(bool hasProx) {
+void IndexWriter::mergeFields(bool hasProx, IndexVersion indexVersion) {
//Create a new FieldInfos
fieldInfos = _CLNEW FieldInfos();
//Condition check to see if fieldInfos points to a valid instance
@@ -1570,7 +1576,8 @@ void IndexWriter::mergeFields(bool hasProx) {
FieldInfo *fi = reader->getFieldInfos()->fieldInfo(j);
fieldInfos->add(fi->name, fi->isIndexed, fi->storeTermVector,
fi->storePositionWithTermVector,
fi->storeOffsetWithTermVector,
- !reader->hasNorms(fi->name), hasProx,
fi->storePayloads);
+ !reader->hasNorms(fi->name), hasProx,
fi->storePayloads,
+ fi->indexVersion_);
}
}
@@ -1614,7 +1621,7 @@ protected:
};
-void IndexWriter::mergeTerms(bool hasProx) {
+void IndexWriter::mergeTerms(bool hasProx, IndexVersion indexVersion) {
auto queue = _CLNEW SegmentMergeQueue(readers.size());
auto numSrcIndexes = readers.size();
//std::vector<TermPositions *> postingsList(numSrcIndexes);
@@ -1667,6 +1674,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
std::vector<std::vector<uint32_t>> docDeltaBuffers(numDestIndexes);
std::vector<std::vector<uint32_t>> freqBuffers(numDestIndexes);
+ std::vector<std::vector<uint32_t>> posBuffers(numDestIndexes);
auto destPostingQueues = _CLNEW postingQueue(matchSize);
std::vector<DestDoc> destDocs(matchSize);
@@ -1758,6 +1766,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
auto proxOut = proxOutputList[destIdx];
auto& docDeltaBuffer = docDeltaBuffers[destIdx];
auto& freqBuffer = freqBuffers[destIdx];
+ auto& posBuffer = posBuffers[destIdx];
auto skipWriter = skipListWriterList[destIdx];
auto& df = dfs[destIdx];
auto& lastDoc = lastDocs[destIdx];
@@ -1776,6 +1785,9 @@ void IndexWriter::mergeTerms(bool hasProx) {
encode(freqOut, docDeltaBuffer, true);
if (hasProx) {
encode(freqOut, freqBuffer, false);
+ if (indexVersion >= IndexVersion::kV2) {
+ PforUtil::encodePos(proxOut, posBuffer);
+ }
}
skipWriter->setSkipData(lastDoc, false, -1);
@@ -1791,7 +1803,11 @@ void IndexWriter::mergeTerms(bool hasProx) {
for (int32_t i = 0; i < descPositions.size(); i++) {
int32_t position = descPositions[i];
int32_t delta = position - lastPosition;
- proxOut->writeVInt(delta);
+ if (indexVersion >= IndexVersion::kV2) {
+ posBuffer.push_back(delta);
+ } else {
+ proxOut->writeVInt(delta);
+ }
lastPosition = position;
}
freqBuffer.push_back(destFreq);
@@ -1828,6 +1844,7 @@ void IndexWriter::mergeTerms(bool hasProx) {
{
auto& docDeltaBuffer = docDeltaBuffers[i];
auto& freqBuffer = freqBuffers[i];
+ auto& posBuffer = posBuffers[i];
freqOutput->writeByte((char)CodeMode::kDefault);
freqOutput->writeVInt(docDeltaBuffer.size());
@@ -1851,6 +1868,9 @@ void IndexWriter::mergeTerms(bool hasProx) {
}
docDeltaBuffer.resize(0);
freqBuffer.resize(0);
+ if (indexVersion >= IndexVersion::kV2) {
+ PforUtil::encodePos(proxOutput, posBuffer);
+ }
}
int64_t skipPointer = skipListWriter->writeSkip(freqOutput);
diff --git a/src/core/CLucene/index/IndexWriter.h
b/src/core/CLucene/index/IndexWriter.h
index 7cfb67d2ca7..4f32ede24e6 100644
--- a/src/core/CLucene/index/IndexWriter.h
+++ b/src/core/CLucene/index/IndexWriter.h
@@ -7,6 +7,7 @@
#ifndef _lucene_index_IndexWriter_
#define _lucene_index_IndexWriter_
+#include "CLucene/index/IndexVersion.h"
#include "CLucene/util/VoidList.h"
#include "CLucene/util/Array.h"
@@ -320,11 +321,11 @@ public:
std::vector<uint32_t> dest_index_docs);
// create new fields info
- void mergeFields(bool hasProx);
+ void mergeFields(bool hasProx, IndexVersion indexVersion);
// write fields info file
void writeFields(lucene::store::Directory* d, std::string segment);
// merge terms and write files
- void mergeTerms(bool hasProx);
+ void mergeTerms(bool hasProx, IndexVersion indexVersion);
// merge null_bitmap
void mergeNullBitmap(std::vector<std::vector<uint32_t>> srcBitmapValues,
std::vector<lucene::store::IndexOutput *> nullBitmapIndexOutputList);
diff --git a/src/core/CLucene/index/SDocumentWriter.cpp
b/src/core/CLucene/index/SDocumentWriter.cpp
index 4ff262f86d0..d2b80d89d76 100644
--- a/src/core/CLucene/index/SDocumentWriter.cpp
+++ b/src/core/CLucene/index/SDocumentWriter.cpp
@@ -1222,6 +1222,9 @@ void
SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
encode(freqOut, docDeltaBuffer, true);
if (hasProx_) {
encode(freqOut, freqBuffer, false);
+ if (indexVersion_ >= IndexVersion::kV2) {
+ PforUtil::encodePos(proxOut, posBuffer);
+ }
}
skipListWriter->setSkipData(lastDoc,
currentFieldStorePayloads, lastPayloadLength);
@@ -1253,7 +1256,11 @@ void
SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
for (int32_t j = 0; j < termDocFreq; j++) {
const int32_t code = prox.readVInt();
assert(0 == (code & 1));
- proxOut->writeVInt(code >> 1);
+ if (indexVersion_ >= IndexVersion::kV2) {
+ posBuffer.push_back(code >> 1);
+ } else {
+ proxOut->writeVInt(code >> 1);
+ }
}
freqBuffer.push_back(termDocFreq);
}
@@ -1310,6 +1317,9 @@ void
SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa
}
docDeltaBuffer.resize(0);
freqBuffer.resize(0);
+ if (indexVersion_ >= IndexVersion::kV2) {
+ PforUtil::encodePos(proxOut, posBuffer);
+ }
}
int64_t skipPointer = skipListWriter->writeSkip(freqOut);
diff --git a/src/core/CLucene/index/SDocumentWriter.h
b/src/core/CLucene/index/SDocumentWriter.h
index 3dd98186635..36a43fc57b3 100644
--- a/src/core/CLucene/index/SDocumentWriter.h
+++ b/src/core/CLucene/index/SDocumentWriter.h
@@ -52,6 +52,7 @@ private:
std::string segment;// Current segment we are working on
std::vector<uint32_t> docDeltaBuffer;
std::vector<uint32_t> freqBuffer;
+ std::vector<uint32_t> posBuffer;
std::ostream* infoStream{};
int64_t ramBufferSize;
// Flush @ this number of docs. If rarmBufferSize is
diff --git a/src/core/CLucene/index/SegmentTermDocs.cpp
b/src/core/CLucene/index/SegmentTermDocs.cpp
index e346dc0ca24..a761fec2810 100644
--- a/src/core/CLucene/index/SegmentTermDocs.cpp
+++ b/src/core/CLucene/index/SegmentTermDocs.cpp
@@ -190,7 +190,7 @@ void TermDocsBuffer::refill() {
cur_doc_ = 0;
cur_freq_ = 0;
- if (indexVersion_ == IndexVersion::kV1) {
+ if (indexVersion_ >= IndexVersion::kV1) {
size_ = refillV1();
} else {
size_ = refillV0();
@@ -199,7 +199,7 @@ void TermDocsBuffer::refill() {
void TermDocsBuffer::readRange(DocRange* docRange) {
int32_t size = 0;
- if (indexVersion_ == IndexVersion::kV1) {
+ if (indexVersion_ >= IndexVersion::kV1) {
size = refillV1();
} else {
size = refillV0();
diff --git a/src/core/CLucene/index/SegmentTermPositions.cpp
b/src/core/CLucene/index/SegmentTermPositions.cpp
index 1c7db0703c7..5de0da20add 100644
--- a/src/core/CLucene/index/SegmentTermPositions.cpp
+++ b/src/core/CLucene/index/SegmentTermPositions.cpp
@@ -17,6 +17,8 @@ CL_NS_DEF(index)
SegmentTermPositions::SegmentTermPositions(const SegmentReader* _parent):
SegmentTermDocs(_parent), proxStream(NULL)// the proxStream will be
cloned lazily when nextPosition() is called for the first time
,lazySkipPointer(-1), lazySkipProxCount(0)
+ , indexVersion_(_parent->_fieldInfos->getIndexVersion())
+ , buffer_(proxStream, indexVersion_)
{
CND_CONDITION(_parent != NULL, "Parent is NULL");
}
@@ -64,14 +66,15 @@ int32_t SegmentTermPositions::nextPosition() {
}
int32_t SegmentTermPositions::readDeltaPosition() {
- int32_t delta = proxStream->readVInt();
+ int32_t delta = buffer_.getPos();
if (currentFieldStoresPayloads) {
// if the current field stores payloads then
// the position delta is shifted one bit to the left.
// if the LSB is set, then we have to read the current
// payload length
if ((delta & 1) != 0) {
- payloadLength = proxStream->readVInt();
+ // payloadLength = proxStream->readVInt();
+ _CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload
flow is not supported at the moment");
}
delta = (int32_t)((uint32_t)delta >> (uint32_t)1);
needToLoadPayload = true;
@@ -122,7 +125,8 @@ void SegmentTermPositions::skipPositions(const int32_t n) {
void SegmentTermPositions::skipPayload() {
if (needToLoadPayload && payloadLength > 0) {
- proxStream->seek(proxStream->getFilePointer() + payloadLength);
+ // proxStream->seek(proxStream->getFilePointer() +
payloadLength);
+ _CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is
not supported at the moment");
}
needToLoadPayload = false;
}
@@ -131,6 +135,7 @@ void SegmentTermPositions::lazySkip() {
if (proxStream == NULL) {
// clone lazily
proxStream = parent->proxStream->clone();
+ buffer_.reset(proxStream);
}
// we might have to skip the current payload
@@ -138,7 +143,7 @@ void SegmentTermPositions::lazySkip() {
skipPayload();
if (lazySkipPointer != -1) {
- proxStream->seek(lazySkipPointer);
+ buffer_.seek(lazySkipPointer);
lazySkipPointer = -1;
}
@@ -166,7 +171,8 @@ uint8_t* SegmentTermPositions::getPayload(uint8_t* data) {
} else {
retArray = data;
}
- proxStream->readBytes(retArray, payloadLength);
+ // proxStream->readBytes(retArray, payloadLength);
+ _CLTHROWA(CL_ERR_UnsupportedOperation, "Processing the payload flow is not
supported at the moment");
needToLoadPayload = false;
return retArray;
}
diff --git a/src/core/CLucene/index/_FieldInfos.h
b/src/core/CLucene/index/_FieldInfos.h
index ed142c4435c..4ddd0f47fa6 100644
--- a/src/core/CLucene/index/_FieldInfos.h
+++ b/src/core/CLucene/index/_FieldInfos.h
@@ -146,7 +146,8 @@ public:
void add(const TCHAR** names, const bool isIndexed, const bool
storeTermVector = false,
const bool
storePositionWithTermVector = false,
const bool
storeOffsetWithTermVector = false, const bool omitNorms = false,
- const bool hasProx = false,
const bool storePayloads = false);
+ const bool hasProx = false,
const bool storePayloads = false,
+ IndexVersion indexVersion =
IndexVersion::kV1);
// Merges in information from another FieldInfos.
void add(FieldInfos* other);
@@ -167,13 +168,15 @@ public:
FieldInfo* add(const TCHAR* name, const bool isIndexed, const bool
storeTermVector = false,
const
bool storePositionWithTermVector = false,
const
bool storeOffsetWithTermVector = false, const bool omitNorms = false,
- const
bool hasProx = false, const bool storePayloads = false);
+ const
bool hasProx = false, const bool storePayloads = false,
+
IndexVersion indexVersion = IndexVersion::kV1);
// was void
FieldInfo* addInternal(const TCHAR* name, const bool isIndexed, const
bool storeTermVector,
const bool storePositionWithTermVector,
const bool storeOffsetWithTermVector, const bool
omitNorms,
-
const bool hasProx, const bool storePayloads);
+
const bool hasProx, const bool storePayloads,
+
IndexVersion indexVersion = IndexVersion::kV1);
int32_t fieldNumber(const TCHAR* fieldName)const;
diff --git a/src/core/CLucene/index/_SegmentHeader.h
b/src/core/CLucene/index/_SegmentHeader.h
index 6bf7d1819b7..4fa9b3fc04c 100644
--- a/src/core/CLucene/index/_SegmentHeader.h
+++ b/src/core/CLucene/index/_SegmentHeader.h
@@ -7,6 +7,7 @@
#ifndef _lucene_index_SegmentHeader_
#define _lucene_index_SegmentHeader_
+#include "CLucene/util/PFORUtil.h"
#include "_SegmentInfos.h"
#include "CLucene/util/BitSet.h"
//#include "CLucene/util/VoidMap.h"
@@ -86,6 +87,62 @@ private:
IndexVersion indexVersion_ = IndexVersion::kV0;
};
+class TermPostingsBuffer {
+public:
+ TermPostingsBuffer(CL_NS(store)::IndexInput* proxStream, IndexVersion
indexVersion)
+ : poss_(PforUtil::blockSize + 3)
+ , proxStream_(proxStream)
+ , indexVersion_(indexVersion) {
+ }
+
+ ~TermPostingsBuffer() {
+ size_ = 0;
+ cur_pos_ = 0;
+ proxStream_ = nullptr;
+ }
+
+ int32_t getPos() {
+ if (indexVersion_ >= IndexVersion::kV2) {
+ if (cur_pos_ >= size_) {
+ refill();
+ }
+ return poss_[cur_pos_++];
+ } else {
+ return proxStream_->readVInt();
+ }
+ }
+
+ void seek(int64_t skipPointer) {
+ if (indexVersion_ >= IndexVersion::kV2) {
+ size_ = 0;
+ cur_pos_ = 0;
+ }
+ proxStream_->seek(skipPointer);
+ }
+
+ void reset(CL_NS(store)::IndexInput* proxStream) {
+ size_ = 0;
+ cur_pos_ = 0;
+ proxStream_ = proxStream;
+ }
+
+private:
+ void refill() {
+ cur_pos_ = 0;
+ size_ = PforUtil::decodePos(proxStream_, poss_);
+ }
+
+private:
+ uint32_t size_ = 0;
+
+ uint32_t cur_pos_ = 0;
+ std::vector<uint32_t> poss_;
+
+ CL_NS(store)::IndexInput* proxStream_ = nullptr;
+
+ IndexVersion indexVersion_ = IndexVersion::kV1;
+};
+
class SegmentTermDocs:public virtual TermDocs {
protected:
const SegmentReader* parent;
@@ -232,6 +289,10 @@ private:
int32_t doc() const{ return SegmentTermDocs::doc(); }
int32_t freq() const{ return SegmentTermDocs::freq(); }
bool skipTo(const int32_t target){ return SegmentTermDocs::skipTo(target); }
+
+private:
+ IndexVersion indexVersion_ = IndexVersion::kV0;
+ TermPostingsBuffer buffer_;
};
diff --git a/src/core/CLucene/util/PFORUtil.cpp
b/src/core/CLucene/util/PFORUtil.cpp
index ae27f521553..8ebad306a4e 100644
--- a/src/core/CLucene/util/PFORUtil.cpp
+++ b/src/core/CLucene/util/PFORUtil.cpp
@@ -20,6 +20,11 @@
#include <cpuid.h>
#endif
+#include "CLucene/CLConfig.h"
+#include "CLucene/index/CodeMode.h"
+
+CL_NS_USE(index)
+
namespace {
using DEC_FUNC = size_t (*)(unsigned char *__restrict, size_t, uint32_t
*__restrict);
using ENC_FUNC = size_t (*)(uint32_t *__restrict in, size_t n, unsigned char
*__restrict out);
@@ -129,3 +134,51 @@ size_t P4ENC(uint32_t *__restrict in, size_t n, unsigned
char *__restrict out) {
size_t P4NZENC(uint32_t *__restrict in, size_t n, unsigned char *__restrict
out) {
return g_p4nzenc(in, n, out);
}
+
+void PforUtil::encodePos(IndexOutput* out, std::vector<uint32_t>& buffer) {
+ auto encode = [&out, &buffer](size_t offset, size_t size, CodeMode mode) {
+ out->writeByte((char)mode);
+ out->writeVInt(size);
+ if (mode == CodeMode::kPfor) {
+ std::vector<uint8_t> compress(4 * size + PFOR_BLOCK_SIZE);
+ size_t compressSize = P4NZENC(buffer.data() + offset, size,
compress.data());
+ out->writeVInt(compressSize);
+ out->writeBytes(reinterpret_cast<const uint8_t*>(compress.data()),
compressSize);
+ } else if (mode == CodeMode::kDefault) {
+ for (size_t i = 0; i < size; i++) {
+ out->writeVInt(buffer[offset + i]);
+ }
+ }
+ };
+
+ size_t i = 0;
+ size_t totalSize = buffer.size();
+ while (i < totalSize) {
+ size_t remainingElements = totalSize - i;
+ if (remainingElements >= blockSize) {
+ encode(i, blockSize, CodeMode::kPfor);
+ i += blockSize;
+ } else {
+ encode(i, remainingElements, CodeMode::kDefault);
+ break;
+ }
+ }
+
+ buffer.resize(0);
+}
+
+uint32_t PforUtil::decodePos(IndexInput* in, std::vector<uint32_t>& buffer) {
+ CodeMode mode = static_cast<CodeMode>(in->readByte());
+ uint32_t size = in->readVInt();
+ if (mode == CodeMode::kPfor) {
+ uint32_t serializedSize = in->readVInt();
+ std::vector<uint8_t> buf(serializedSize + PFOR_BLOCK_SIZE);
+ in->readBytes(buf.data(), serializedSize);
+ P4NZDEC(buf.data(), size, buffer.data());
+ } else {
+ for (uint32_t i = 0; i < size; i++) {
+ buffer[i] = in->readVInt();
+ }
+ }
+ return size;
+}
\ No newline at end of file
diff --git a/src/core/CLucene/util/PFORUtil.h b/src/core/CLucene/util/PFORUtil.h
index 29acb7fe7a6..583f5c9a25e 100644
--- a/src/core/CLucene/util/PFORUtil.h
+++ b/src/core/CLucene/util/PFORUtil.h
@@ -18,9 +18,22 @@
#include <cstddef>
#include <cstdint>
+#include <vector>
+
+#include "CLucene.h"
+#include "CLucene/store/IndexOutput.h"
+
+CL_NS_USE(store)
size_t P4DEC(unsigned char *__restrict in, size_t n, uint32_t *__restrict out);
size_t P4NZDEC(unsigned char *__restrict in, size_t n, uint32_t *__restrict
out);
size_t P4ENC(uint32_t *__restrict in, size_t n, unsigned char *__restrict out);
size_t P4NZENC(uint32_t *__restrict in, size_t n, unsigned char *__restrict
out);
+class PforUtil {
+public:
+ static constexpr size_t blockSize = 128;
+
+ static void encodePos(IndexOutput* out, std::vector<uint32_t>& buffer);
+ static uint32_t decodePos(IndexInput* out, std::vector<uint32_t>& buffer);
+};
\ No newline at end of file
diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt
index fa8e4d3db03..f48b0ff76eb 100644
--- a/src/test/CMakeLists.txt
+++ b/src/test/CMakeLists.txt
@@ -86,6 +86,7 @@ SET(test_files ./tests.cpp
./search/spans/TestSpanExplanationsOfNonMatches.cpp
./search/spans/TestSpanExplanationsOfNonMatches.h
./index/TestIndexCompaction.cpp
+ ./index/TestIndexCompress.cpp
./index/TestIndexModifier.cpp
./index/TestIndexWriter.cpp
./index/TestIndexModifier.cpp
diff --git a/src/test/index/TestIndexCompress.cpp
b/src/test/index/TestIndexCompress.cpp
new file mode 100644
index 00000000000..3105c4649c6
--- /dev/null
+++ b/src/test/index/TestIndexCompress.cpp
@@ -0,0 +1,309 @@
+#include <CLucene.h> // IWYU pragma: keep
+#include <CLucene/index/IndexReader.h>
+#include <CLucene/search/query/TermPositionIterator.h>
+#include <CLucene/util/stringUtil.h>
+
+#include <ctime>
+#include <exception>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "CLucene/analysis/Analyzers.h"
+#include "CLucene/index/IndexVersion.h"
+#include "CLucene/index/Term.h"
+#include "CLucene/store/FSDirectory.h"
+#include "test.h"
+
+CL_NS_USE(search)
+CL_NS_USE(store)
+CL_NS_USE(index)
+CL_NS_USE(util)
+
+static constexpr int32_t doc_count = 10000;
+
+#define FINALLY(eptr, finallyBlock) \
+ { \
+ finallyBlock; \
+ if (eptr) { \
+ std::rethrow_exception(eptr); \
+ } \
+ }
+
+int32_t getDaySeed() {
+ std::time_t now = std::time(nullptr);
+ std::tm* localTime = std::localtime(&now);
+ localTime->tm_sec = 0;
+ localTime->tm_min = 0;
+ localTime->tm_hour = 0;
+ return static_cast<int32_t>(std::mktime(localTime) / (60 * 60 * 24));
+}
+
+static std::string generateRandomIP() {
+ std::string ip_v4;
+ ip_v4.append(std::to_string(rand() % 256));
+ ip_v4.append(".");
+ ip_v4.append(std::to_string(rand() % 256));
+ ip_v4.append(".");
+ ip_v4.append(std::to_string(rand() % 256));
+ ip_v4.append(".");
+ ip_v4.append(std::to_string(rand() % 256));
+ return ip_v4;
+}
+
+static void write_index(const std::string& name, RAMDirectory* dir,
IndexVersion index_version,
+ const std::vector<std::string>& datas) {
+ auto* analyzer = _CLNEW lucene::analysis::SimpleAnalyzer<char>;
+ analyzer->set_stopwords(nullptr);
+ auto* indexwriter = _CLNEW lucene::index::IndexWriter(dir, analyzer, true);
+ indexwriter->setRAMBufferSizeMB(512);
+ indexwriter->setMaxBufferedDocs(-1);
+ indexwriter->setMaxFieldLength(0x7FFFFFFFL);
+ indexwriter->setMergeFactor(1000000000);
+ indexwriter->setUseCompoundFile(false);
+
+ auto* char_string_reader = _CLNEW lucene::util::SStringReader<char>;
+
+ auto* doc = _CLNEW lucene::document::Document();
+ int32_t field_config = lucene::document::Field::STORE_NO;
+ field_config |= lucene::document::Field::INDEX_NONORMS;
+ field_config |= lucene::document::Field::INDEX_TOKENIZED;
+ auto field_name = std::wstring(name.begin(), name.end());
+ auto* field = _CLNEW lucene::document::Field(field_name.c_str(),
field_config);
+ field->setOmitTermFreqAndPositions(false);
+ field->setIndexVersion(index_version);
+ doc->add(*field);
+
+ for (const auto& data : datas) {
+ char_string_reader->init(data.data(), data.size(), false);
+ auto* stream = analyzer->reusableTokenStream(field->name(),
char_string_reader);
+ field->setValue(stream);
+ indexwriter->addDocument(doc);
+ }
+
+ indexwriter->close();
+
+ _CLLDELETE(indexwriter);
+ _CLLDELETE(doc);
+ _CLLDELETE(analyzer);
+ _CLLDELETE(char_string_reader);
+}
+
+static void read_index(RAMDirectory* dir, int32_t doc_count) {
+ auto* reader = IndexReader::open(dir);
+
+ std::exception_ptr eptr;
+ try {
+ if (doc_count != reader->numDocs()) {
+ std::string msg = "doc_count: " + std::to_string(doc_count) +
+ ", numDocs: " +
std::to_string(reader->numDocs());
+ _CLTHROWA(CL_ERR_IllegalArgument, msg.c_str());
+ }
+
+ Term* term = nullptr;
+ TermEnum* enumerator = nullptr;
+ try {
+ enumerator = reader->terms();
+ while (enumerator->next()) {
+ term = enumerator->term();
+
+ auto* term_pos = reader->termPositions(term);
+
+ std::exception_ptr eptr;
+ try {
+ TermPositionIterator iter(term_pos);
+ int32_t doc = 0;
+ while ((doc = iter.nextDoc()) != INT32_MAX) {
+ for (int32_t i = 0; i < iter.freq(); i++) {
+ int32_t pos = iter.nextPosition();
+ if (pos < 0 || pos > 3) {
+ std::string msg = "pos: " +
std::to_string(pos);
+ _CLTHROWA(CL_ERR_IllegalArgument, msg.c_str());
+ }
+ }
+ }
+ } catch (...) {
+ eptr = std::current_exception();
+ }
+ FINALLY(eptr, { _CLDELETE(term_pos); })
+
+ _CLDECDELETE(term);
+ }
+ }
+ _CLFINALLY({
+ _CLDECDELETE(term);
+ enumerator->close();
+ _CLDELETE(enumerator);
+ })
+
+ } catch (...) {
+ eptr = std::current_exception();
+ }
+ FINALLY(eptr, {
+ reader->close();
+ _CLLDELETE(reader);
+ })
+}
+
+static void index_compaction(RAMDirectory* tmp_dir,
std::vector<lucene::store::Directory*> srcDirs,
+ std::vector<lucene::store::Directory*> destDirs,
int32_t count) {
+ auto* analyzer = _CLNEW lucene::analysis::SimpleAnalyzer<char>;
+ auto* indexwriter = _CLNEW lucene::index::IndexWriter(tmp_dir, analyzer,
true);
+
+ std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec(
+ srcDirs.size(), std::vector<std::pair<uint32_t, uint32_t>>(count));
+ int32_t idx = 0;
+ int32_t id = 0;
+ for (int32_t i = 0; i < count; i++) {
+ for (int32_t j = 0; j < srcDirs.size(); j++) {
+ if (id == count * destDirs.size()) {
+ idx++;
+ id = 0;
+ }
+ trans_vec[j][i] = std::make_pair(idx, id++);
+ }
+ }
+
+ std::vector<uint32_t> dest_index_docs(destDirs.size());
+ for (int32_t i = 0; i < destDirs.size(); i++) {
+ dest_index_docs[i] = count * destDirs.size();
+ }
+
+ std::exception_ptr eptr;
+ try {
+ indexwriter->indexCompaction(srcDirs, destDirs, trans_vec,
dest_index_docs);
+ } catch (...) {
+ eptr = std::current_exception();
+ }
+ FINALLY(eptr, {
+ indexwriter->close();
+ _CLDELETE(indexwriter);
+ _CLDELETE(analyzer);
+ })
+}
+
+void TestIndexCompressV2(CuTest* tc) {
+ std::srand(getDaySeed());
+
+ std::string name = "v2_field_name";
+ std::vector<std::string> datas;
+ for (int32_t i = 0; i < doc_count; i++) {
+ std::string ip_v4 = generateRandomIP();
+ datas.emplace_back(ip_v4);
+ }
+
+ RAMDirectory dir;
+ write_index(name, &dir, IndexVersion::kV2, datas);
+
+ try {
+ read_index(&dir, doc_count);
+ } catch (...) {
+ assertTrue(false);
+ }
+
+ std::cout << "\nTestIndexCompressV2 sucess" << std::endl;
+}
+
+void TestIndexCompactionV2(CuTest* tc) {
+ std::srand(getDaySeed());
+ std::string name = "field_name";
+
+ // index v2
+ RAMDirectory in_dir;
+ {
+ std::vector<std::string> datas;
+ for (int32_t i = 0; i < doc_count; i++) {
+ std::string ip_v4 = generateRandomIP();
+ datas.emplace_back(ip_v4);
+ }
+ write_index(name, &in_dir, IndexVersion::kV2, datas);
+ }
+
+ // index compaction v3
+ RAMDirectory outdir1;
+ RAMDirectory outdir2;
+ RAMDirectory outdir3;
+ {
+ std::vector<lucene::store::Directory*> srcDirs;
+ srcDirs.push_back(&in_dir);
+ srcDirs.push_back(&in_dir);
+ srcDirs.push_back(&in_dir);
+ srcDirs.push_back(&in_dir);
+ srcDirs.push_back(&in_dir);
+ srcDirs.push_back(&in_dir);
+ srcDirs.push_back(&in_dir);
+ srcDirs.push_back(&in_dir);
+ srcDirs.push_back(&in_dir);
+ std::vector<lucene::store::Directory*> destDirs;
+ destDirs.push_back(&outdir1);
+ destDirs.push_back(&outdir2);
+ destDirs.push_back(&outdir3);
+
+ try {
+ RAMDirectory empty_dir;
+ index_compaction(&empty_dir, srcDirs, destDirs, doc_count);
+ } catch (...) {
+ assertTrue(false);
+ }
+ }
+
+ std::cout << "TestIndexCompactionV2 sucess" << std::endl;
+}
+
+void TestIndexCompactionException(CuTest* tc) {
+ std::srand(getDaySeed());
+ std::string name = "field_name";
+
+ // index v1
+ RAMDirectory in_dir_v1;
+ {
+ std::vector<std::string> datas;
+ for (int32_t i = 0; i < 10; i++) {
+ std::string ip_v4 = generateRandomIP();
+ datas.emplace_back(ip_v4);
+ }
+ write_index(name, &in_dir_v1, IndexVersion::kV1, datas);
+ }
+
+ // index v2
+ RAMDirectory in_dir_v2;
+ {
+ std::vector<std::string> datas;
+ for (int32_t i = 0; i < 10; i++) {
+ std::string ip_v4 = generateRandomIP();
+ datas.emplace_back(ip_v4);
+ }
+ write_index(name, &in_dir_v2, IndexVersion::kV2, datas);
+ }
+
+ // index compaction exception 1
+ RAMDirectory out_dir;
+ {
+ std::vector<lucene::store::Directory*> srcDirs;
+ srcDirs.push_back(&in_dir_v1);
+ srcDirs.push_back(&in_dir_v2);
+ std::vector<lucene::store::Directory*> destDirs;
+ destDirs.push_back(&out_dir);
+
+ bool flag = false;
+ try {
+ RAMDirectory empty_dir;
+ index_compaction(&empty_dir, srcDirs, destDirs, 10);
+ } catch (...) {
+ flag = true;
+ }
+ assertTrue(flag);
+ }
+
+ std::cout << "TestIndexCompactionException sucess" << std::endl;
+}
+
+CuSuite* testIndexCompress() {
+ CuSuite* suite = CuSuiteNew(_T("CLucene Index Compress Test"));
+
+ SUITE_ADD_TEST(suite, TestIndexCompressV2);
+ SUITE_ADD_TEST(suite, TestIndexCompactionV2);
+ SUITE_ADD_TEST(suite, TestIndexCompactionException);
+
+ return suite;
+}
diff --git a/src/test/test.h b/src/test/test.h
index 39959327f63..4ec86cb6884 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -85,6 +85,7 @@ CuSuite *testSearchRange(void);
CuSuite *testMultiPhraseQuery(void);
CuSuite *testIndexCompaction(void);
CuSuite *testStringReader(void);
+CuSuite *testIndexCompress(void);
#ifdef TEST_CONTRIB_LIBS
//CuSuite *testGermanAnalyzer(void);
diff --git a/src/test/tests.cpp b/src/test/tests.cpp
index 5708ff62986..a570dad4663 100644
--- a/src/test/tests.cpp
+++ b/src/test/tests.cpp
@@ -19,6 +19,7 @@ unittest tests[] = {
{"MultiPhraseQuery", testMultiPhraseQuery},
{"IndexCompaction", testIndexCompaction},
{"testStringReader", testStringReader},
+ {"IndexCompress", testIndexCompress},
#ifdef TEST_CONTRIB_LIBS
{"chinese", testchinese},
#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]