This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch clucene in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/clucene by this push: new 0f33e06d [improvement]optimize reduce index write time (#87) 0f33e06d is described below commit 0f33e06d5c18a073b5858179a0a7d4daebee01b8 Author: zzzxl <33418555+zzzxl1...@users.noreply.github.com> AuthorDate: Wed Jun 14 11:09:33 2023 +0800 [improvement]optimize reduce index write time (#87) --- src/core/CLucene/analysis/AnalysisHeader.cpp | 7 --- src/core/CLucene/analysis/AnalysisHeader.h | 33 +++++++++- src/core/CLucene/analysis/Analyzers.cpp | 2 +- src/core/CLucene/analysis/Analyzers.h | 23 ++++--- src/core/CLucene/index/SDocumentWriter.cpp | 50 ++++++++++----- src/core/CLucene/index/SDocumentWriter.h | 3 +- src/core/CLucene/store/IndexOutput.h | 2 +- src/core/CLucene/util/stringUtil.h | 94 ++++++++++++++++++++++++++++ 8 files changed, 178 insertions(+), 36 deletions(-) diff --git a/src/core/CLucene/analysis/AnalysisHeader.cpp b/src/core/CLucene/analysis/AnalysisHeader.cpp index a7fd819e..f90085c0 100644 --- a/src/core/CLucene/analysis/AnalysisHeader.cpp +++ b/src/core/CLucene/analysis/AnalysisHeader.cpp @@ -43,13 +43,6 @@ size_t Token::termLength<TCHAR>(){ return _termTextLen; }; -template <> -size_t Token::termLength<char>(){ - if ( _termTextLen == -1 ) //it was invalidated by growBuffer - _termTextLen = strlen((char*)_buffer); - return _termTextLen; -}; - ///Compares the Token for their order class OrderCompare:LUCENE_BASE, public CL_NS(util)::Compare::_base //<Token*> { diff --git a/src/core/CLucene/analysis/AnalysisHeader.h b/src/core/CLucene/analysis/AnalysisHeader.h index cbe0453d..07beb9e4 100644 --- a/src/core/CLucene/analysis/AnalysisHeader.h +++ b/src/core/CLucene/analysis/AnalysisHeader.h @@ -68,6 +68,7 @@ private: size_t bufferTextLen; void *_buffer; ///< the text of the term int32_t _termTextLen;///< the length of termText. Internal use only + bool isNoCopy = false; CL_NS(index)::Payload *payload; @@ -84,7 +85,10 @@ public: bufferTextLen = 0; }; virtual ~Token() { - free(_buffer); + if (!isNoCopy) { + free(_buffer); + _buffer = nullptr; + } _CLLDELETE(payload); }; @@ -99,6 +103,15 @@ public: setText(text, end - start); }; + template<typename T> + void setNoCopy(const T *text, const int32_t start, const int32_t end, const TCHAR *typ = NULL) { + _startOffset = start; + _endOffset = end; + _type = (typ == NULL ? getDefaultType() : typ); + positionIncrement = 1; + setTextNoCopy(text, end - start); + }; + size_t bufferLength() const { return bufferTextLen; } @@ -132,12 +145,12 @@ public: int32_t getPositionIncrement() const { return positionIncrement; } template<typename T> - T *termBuffer() const { + inline T *termBuffer() const { return (T *) _buffer; } template<typename T> - size_t termLength();//< Length of the the termBuffer. See #termBuffer + inline size_t termLength();//< Length of the the termBuffer. See #termBuffer void resetTermTextLen() { _termTextLen = -1; @@ -152,6 +165,13 @@ public: ((T *) _buffer)[_termTextLen] = 0;//make sure null terminated }; + template<typename T> + void setTextNoCopy(const T *text, int32_t l) { + _termTextLen = l; + _buffer = (void*)text; + isNoCopy = true; + }; + int32_t startOffset() const { return _startOffset; } @@ -196,6 +216,13 @@ public: } }; +template <> +inline size_t Token::termLength<char>(){ + if ( _termTextLen == -1 ) //it was invalidated by growBuffer + _termTextLen = strlen((char*)_buffer); + return _termTextLen; +}; + class CLUCENE_EXPORT TokenStream { public: /** Returns the next token in the stream, or null at EOS. diff --git a/src/core/CLucene/analysis/Analyzers.cpp b/src/core/CLucene/analysis/Analyzers.cpp index ebaa82b6..3ea3e8b0 100644 --- a/src/core/CLucene/analysis/Analyzers.cpp +++ b/src/core/CLucene/analysis/Analyzers.cpp @@ -94,7 +94,7 @@ Token *SimpleTokenizer<char>::next(Token *token) { break; // return 'em } buffer[length] = 0; - token->set(buffer, start, start + length); + token->setNoCopy(buffer, start, start + length); return token; }; diff --git a/src/core/CLucene/analysis/Analyzers.h b/src/core/CLucene/analysis/Analyzers.h index f5da43c6..17f88cff 100644 --- a/src/core/CLucene/analysis/Analyzers.h +++ b/src/core/CLucene/analysis/Analyzers.h @@ -184,16 +184,23 @@ public: return _CLNEW SimpleTokenizer<T>(reader); } TokenStream* reusableTokenStream(const TCHAR* fieldName, CL_NS(util)::Reader* reader) override{ - auto* tokenizer = static_cast<Tokenizer*>(getPreviousTokenStream()); - if (tokenizer == nullptr) { - tokenizer = _CLNEW SimpleTokenizer<T>(reader); - setPreviousTokenStream(tokenizer); - } else - tokenizer->reset(reader); - return tokenizer; + if (tokenizer_ == nullptr) { + tokenizer_ = new SimpleTokenizer<T>(reader); + } else { + tokenizer_->reset(reader); + } + return tokenizer_; }; - virtual ~SimpleAnalyzer(){} + virtual ~SimpleAnalyzer() { + if (tokenizer_) { + delete tokenizer_; + tokenizer_ = nullptr; + } + } + +private: + SimpleTokenizer<T>* tokenizer_ = nullptr; }; /** diff --git a/src/core/CLucene/index/SDocumentWriter.cpp b/src/core/CLucene/index/SDocumentWriter.cpp index 6c06519c..3b22fdad 100644 --- a/src/core/CLucene/index/SDocumentWriter.cpp +++ b/src/core/CLucene/index/SDocumentWriter.cpp @@ -140,15 +140,18 @@ typename SDocumentsWriter<T>::ThreadState *SDocumentsWriter<T>::getThreadState(D threadState = _CLNEW ThreadState(this); } - ThreadState *state = threadState; - if (segment.empty()) + if (segment.empty()) { segment = writer->newSegmentName(); - state->init(doc, nextDocID); + threadState->init(doc, nextDocID); + } + + threadState->docID = nextDocID; + // Only increment nextDocID & numDocsInRAM on successful init nextDocID++; numDocsInRAM++; - return state; + return threadState; } template<typename T> @@ -244,6 +247,7 @@ void SDocumentsWriter<T>::ThreadState::init(Document *doc, int32_t doc_id) { fp->docFields.values[fp->fieldCount++] = field; } + _parent->hasProx_ = _parent->fieldInfos->hasProx(); } template<typename T> @@ -313,7 +317,7 @@ void SDocumentsWriter<T>::ThreadState::FieldData::processField(Analyzer *sanalyz threadState->numStoredFields++; } - docFieldsFinal.values[j] = NULL; + // docFieldsFinal.values[j] = NULL; } } catch (exception &ae) { throw ae; @@ -440,10 +444,27 @@ void SDocumentsWriter<T>::ThreadState::writeProxBytes(const uint8_t *b, int32_t } } +template <typename T> +inline bool eq(const std::basic_string_view<T>& a, const std::basic_string_view<T>& b) { + if constexpr (std::is_same_v<T, char>) { +#if defined(__SSE2__) + if (a.size() != b.size()) { + return false; + } + return StringUtil::memequalSSE2Wide(a.data(), b.data(), a.size()); +#endif + } + return a == b; +} + template<typename T> void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) { const T *tokenText = token->termBuffer<T>(); const int32_t tokenTextLen = token->termLength<T>(); + std::basic_string_view<T> term(tokenText, tokenTextLen); + // if constexpr (std::is_same_v<T, char>) { + // std::cout << term << std::endl; + // } uint32_t code = 0; // Compute hashcode @@ -459,16 +480,14 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) { // Locate Posting in hash threadState->p = postingsHash[hashPos]; - if (threadState->p != nullptr && !threadState->postingEquals(tokenText, tokenTextLen)) { - // Conflict: keep searching different locations in - // the hash table. + if (threadState->p != nullptr && !eq(threadState->p->term_, term)) { const uint32_t inc = ((code >> 8) + code) | 1; do { postingsHashConflicts++; code += inc; hashPos = code & postingsHashMask; threadState->p = postingsHash[hashPos]; - } while (threadState->p != nullptr && !threadState->postingEquals(tokenText, tokenTextLen)); + } while (threadState->p != nullptr && !eq(threadState->p->term_, term)); } int32_t proxCode = 0; @@ -530,6 +549,7 @@ void SDocumentsWriter<T>::ThreadState::FieldData::addPosition(Token *token) { threadState->scharPool->tUpto += textLen1; memcpy(textUpto, tokenText, tokenTextLen * sizeof(T)); + threadState->p->term_ = std::basic_string_view<T>(textUpto, term.size()); textUpto[tokenTextLen] = CLUCENE_END_OF_WORD; assert(postingsHash[hashPos] == NULL); @@ -947,7 +967,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) { IndexOutput *freqOut = directory->createOutput((segmentName + ".frq").c_str()); // TODO:add options in field index IndexOutput *proxOut = nullptr; - if (fieldInfos->hasProx()) { + if (hasProx_) { proxOut = directory->createOutput((segmentName + ".prx").c_str()); } @@ -1007,7 +1027,7 @@ void SDocumentsWriter<T>::writeSegment(std::vector<std::string> &flushedFiles) { // Record all files we have flushed flushedFiles.push_back(segmentFileName(IndexFileNames::FIELD_INFOS_EXTENSION)); flushedFiles.push_back(segmentFileName(IndexFileNames::FREQ_EXTENSION)); - if (fieldInfos->hasProx()) { + if (hasProx_) { flushedFiles.push_back(segmentFileName(IndexFileNames::PROX_EXTENSION)); } flushedFiles.push_back(segmentFileName(IndexFileNames::TERMS_EXTENSION)); @@ -1129,7 +1149,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa int64_t freqPointer = freqOut->getFilePointer(); int64_t proxPointer = 0; - if (fieldInfos->hasProx()) { + if (hasProx_) { proxPointer = proxOut->getFilePointer(); } @@ -1154,7 +1174,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa while (numToMerge > 0) { if ((++df % skipInterval) == 0) { - if (fieldInfos->hasProx()) { + if (hasProx_) { freqOut->writeByte((char)CodeMode::kPfor); freqOut->writeVInt(docDeltaBuffer.size()); // doc @@ -1187,7 +1207,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa // changing the format to match Lucene's segment // format. - if (fieldInfos->hasProx()) { + if (hasProx_) { // position for (int32_t j = 0; j < termDocFreq; j++) { const int32_t code = prox.readVInt(); @@ -1235,7 +1255,7 @@ void SDocumentsWriter<T>::appendPostings(ArrayBase<typename ThreadState::FieldDa // Done merging this term { - if (fieldInfos->hasProx()) { + if (hasProx_) { freqOut->writeByte((char)CodeMode::kDefault); freqOut->writeVInt(docDeltaBuffer.size()); uint32_t lastDoc = 0; diff --git a/src/core/CLucene/index/SDocumentWriter.h b/src/core/CLucene/index/SDocumentWriter.h index f1da76fa..5163c2a1 100644 --- a/src/core/CLucene/index/SDocumentWriter.h +++ b/src/core/CLucene/index/SDocumentWriter.h @@ -53,6 +53,7 @@ private: std::vector<uint32_t> freqBuffer; std::ostream* infoStream{}; int64_t ramBufferSize; + bool hasProx_ = false; public: class FieldMergeState; @@ -68,6 +69,7 @@ public: int32_t lastDocID; // Last docID where this term occurred int32_t lastDocCode; // Code for prior doc int32_t lastPosition;// Last position where this term occurred + std::basic_string_view<T> term_; }; /* Stores norms, buffered in RAM, until they are flushed @@ -672,7 +674,6 @@ public: this->infoStream = nullptr; fieldInfos = _CLNEW FieldInfos(); - this->closed = this->flushPending = false; postingsFreeCountDW = postingsAllocCountDW = 0; docStoreOffset = nextDocID = numDocsInRAM = numDocsInStore = nextWriteDocID = 0; diff --git a/src/core/CLucene/store/IndexOutput.h b/src/core/CLucene/store/IndexOutput.h index d6887f58..6b6ca321 100644 --- a/src/core/CLucene/store/IndexOutput.h +++ b/src/core/CLucene/store/IndexOutput.h @@ -117,7 +117,7 @@ public: /** Base implementation class for buffered {@link IndexOutput}. */ class CLUCENE_EXPORT BufferedIndexOutput : public IndexOutput{ public: - LUCENE_STATIC_CONSTANT(int32_t, BUFFER_SIZE=16384); + LUCENE_STATIC_CONSTANT(int32_t, BUFFER_SIZE=65536); private: uint8_t* buffer; int64_t bufferStart; // position in file of buffer diff --git a/src/core/CLucene/util/stringUtil.h b/src/core/CLucene/util/stringUtil.h index 01bbb7fa..da0547d6 100644 --- a/src/core/CLucene/util/stringUtil.h +++ b/src/core/CLucene/util/stringUtil.h @@ -111,4 +111,98 @@ public: return (char)LUT[c]; } +class StringUtil { +public: + template <typename T> + static inline T unaligned_load(const void* address) { + T res {}; + memcpy(&res, address, sizeof(res)); + return res; + } + +#if defined(__SSE2__) + + static inline bool compareSSE2(const char* p1, const char* p2) { + return 0xFFFF == _mm_movemask_epi8(_mm_cmpeq_epi8( + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1)), + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2)))); + } + + static inline bool compareSSE2x4(const char* p1, const char* p2) { + return 0xFFFF == + _mm_movemask_epi8(_mm_and_si128( + _mm_and_si128( + _mm_cmpeq_epi8( + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1)), + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2))), + _mm_cmpeq_epi8( + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 1), + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) + 1))), + _mm_and_si128( + _mm_cmpeq_epi8( + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 2), + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) + 2)), + _mm_cmpeq_epi8( + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p1) + 3), + _mm_loadu_si128(reinterpret_cast<const __m128i*>(p2) + + 3))))); + } + + static inline bool memequalSSE2Wide(const char* p1, const char* p2, size_t size) { + if (size <= 16) { + if (size >= 8) { + /// Chunks of [8,16] bytes. + return unaligned_load<uint64_t>(p1) == unaligned_load<uint64_t>(p2) && + unaligned_load<uint64_t>(p1 + size - 8) == + unaligned_load<uint64_t>(p2 + size - 8); + } else if (size >= 4) { + /// Chunks of [4,7] bytes. + return unaligned_load<uint32_t>(p1) == unaligned_load<uint32_t>(p2) && + unaligned_load<uint32_t>(p1 + size - 4) == + unaligned_load<uint32_t>(p2 + size - 4); + } else if (size >= 2) { + /// Chunks of [2,3] bytes. + return unaligned_load<uint16_t>(p1) == unaligned_load<uint16_t>(p2) && + unaligned_load<uint16_t>(p1 + size - 2) == + unaligned_load<uint16_t>(p2 + size - 2); + } else if (size >= 1) { + /// A single byte. + return *p1 == *p2; + } + return true; + } + + while (size >= 64) { + if (compareSSE2x4(p1, p2)) { + p1 += 64; + p2 += 64; + size -= 64; + } else { + return false; + } + } + + switch (size / 16) { + case 3: + if (!compareSSE2(p1 + 32, p2 + 32)) { + return false; + } + [[fallthrough]]; + case 2: + if (!compareSSE2(p1 + 16, p2 + 16)) { + return false; + } + [[fallthrough]]; + case 1: + if (!compareSSE2(p1, p2)) { + return false; + } + } + + return compareSSE2(p1 + size - 16, p2 + size - 16); + } + +#endif +}; + #endif//_lucene_util__stringutil_H --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org