gaodayue commented on a change in pull request #1409: Add dict page URL: https://github.com/apache/incubator-doris/pull/1409#discussion_r305250606
########## File path: be/src/olap/rowset/segment_v2/binary_dict_page.cpp ########## @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/binary_dict_page.h" + +#include "util/slice.h" // for Slice +#include "gutil/strings/substitute.h" // for Substitute +#include "runtime/mem_pool.h" + +#include "olap/rowset/segment_v2/bitshuffle_page.h" +#include "olap/rowset/segment_v2/rle_page.h" + +namespace doris { +namespace segment_v2 { + +using strings::Substitute; + +BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options) : + _options(options), + _finished(false), + _data_page_builder(nullptr), + _dict_builder(nullptr), + _encoding_type(DICT_ENCODING) { + // initially use DICT_ENCODING + // TODO: the data page builder type can be created by Factory according to user config + _data_page_builder.reset(new BitshufflePageBuilder<OLAP_FIELD_TYPE_INT>(options)); + PageBuilderOptions dict_builder_options; + dict_builder_options.data_page_size = _options.dict_page_size; + _dict_builder.reset(new BinaryPlainPageBuilder(dict_builder_options)); + reset(); +} + +bool BinaryDictPageBuilder::is_page_full() { + if (_data_page_builder->is_page_full()) { + return true; + } + if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { + return true; + } + return false; +} + +Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { + if (_encoding_type == DICT_ENCODING) { + DCHECK(!_finished); + DCHECK_GT(*count, 0); + const Slice* src = reinterpret_cast<const Slice*>(vals); + size_t num_added = 0; + for (int i = 0; i < *count; ++i, ++src) { + auto ret = _dictionary.find(*src); + size_t add_count = 1; + if (ret != _dictionary.end()) { + uint32_t value_code = ret->second; + RETURN_IF_ERROR(_data_page_builder->add(reinterpret_cast<const uint8_t*>(&value_code), &add_count)); + num_added += add_count; + if (add_count == 0) { + // current data page is full, stop processing remaining inputs + break; + } + } else { + if (_dict_builder->is_page_full()) { + break; + } + char* item_mem = _arena.Allocate(src->size); + if (item_mem == nullptr) { + return Status::Corruption(Substitute("memory allocate failed, size:$0", src->size)); + } + Slice dict_item(src->data, src->size); + dict_item.relocate(item_mem); + uint32_t value_code = _dictionary.size(); + _dictionary.insert({dict_item, value_code}); + _dict_items.push_back(dict_item); + _dict_builder->update_prepared_size(dict_item.size); + RETURN_IF_ERROR(_data_page_builder->add(reinterpret_cast<const uint8_t*>(&value_code), &add_count)); + if (add_count == 0) { + // current data page is full, stop processing remaining inputs + break; + } + num_added += 1; + } + } + *count = num_added; + return Status::OK(); + } else { + DCHECK_EQ(_encoding_type, PLAIN_ENCODING); + return _data_page_builder->add(vals, count); + } +} + +Slice BinaryDictPageBuilder::finish() { + _finished = true; + + Slice data_slice = _data_page_builder->finish(); + _buffer.append(data_slice.data, data_slice.size); + encode_fixed32_le(&_buffer[0], _encoding_type); + return Slice(_buffer.data(), _buffer.size()); +} + +void BinaryDictPageBuilder::reset() { + _finished = false; + _buffer.clear(); + _buffer.resize(BINARY_DICT_PAGE_HEADER_SIZE); + _buffer.reserve(_options.data_page_size); + + if (_encoding_type == DICT_ENCODING + && _dict_builder->is_page_full()) { + _data_page_builder.reset(new BinaryPlainPageBuilder(_options)); + _encoding_type = PLAIN_ENCODING; + } else { + _data_page_builder->reset(); + } + _finished = false; +} + +size_t BinaryDictPageBuilder::count() const { + return _data_page_builder->count(); +} + +Status BinaryDictPageBuilder::get_dictionary_page(Slice* dictionary_page) { + DCHECK(_finished) << "get dictionary page when the builder is not finished"; + _dictionary.clear(); + _dict_builder->reset(); + size_t add_count = 1; + // here do not check is_page_full of dict_builder + // because it is checked in add + for (auto& dict_item: _dict_items) { + RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), &add_count)); + } + *dictionary_page = _dict_builder->finish(); + _dict_builder->release(); + _dict_items.clear(); + return Status::OK(); +} + +BinaryDictPageDecoder::BinaryDictPageDecoder(Slice data, const PageDecoderOptions& options) : + _data(data), + _options(options), + _data_page_decoder(nullptr), + _dict_decoder(options.dict_decoder), + _parsed(false), + _encoding_type(UNKNOWN_ENCODING) { } + +Status BinaryDictPageDecoder::init() { + CHECK(!_parsed); + if (_data.size < BINARY_DICT_PAGE_HEADER_SIZE) { + LOG(WARNING) << "corrupted data, data size:" << _data.size; + return Status::Corruption(Substitute("invalid data size:$0, header size:$1", + _data.size, BINARY_DICT_PAGE_HEADER_SIZE)); + } + size_t type = decode_fixed32_le((const uint8_t*)&_data.data[0]); + _encoding_type = static_cast<EncodingTypePB>(type); + _data.remove_prefix(BINARY_DICT_PAGE_HEADER_SIZE); + if (_encoding_type == DICT_ENCODING) { + DCHECK(_dict_decoder != nullptr) << "dict decoder pointer is nullptr"; + _data_page_decoder.reset(new BitShufflePageDecoder<OLAP_FIELD_TYPE_INT>(_data, _options)); + } else if (_encoding_type == PLAIN_ENCODING) { + DCHECK_EQ(_encoding_type, PLAIN_ENCODING); + // use plain page decoder to decode data + _data_page_decoder.reset(new BinaryPlainPageDecoder(_data, _options)); + } else { + LOG(WARNING) << "invalide encoding type:" << _encoding_type; + return Status::Corruption(Substitute("invalid encoding type:$0", _encoding_type)); + } + + Status status = _data_page_decoder->init(); + if (!status.ok()) { + LOG(WARNING) << "status:" << status.to_string(); + return status; + } + _parsed = true; + return Status::OK(); +} + +Status BinaryDictPageDecoder::seek_to_position_in_page(size_t pos) { + return _data_page_decoder->seek_to_position_in_page(pos); +} + +Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) { + if (_encoding_type == PLAIN_ENCODING) { + return _data_page_decoder->next_batch(n, dst); + } else { + // dictionary encoding + DCHECK(_parsed); + if (PREDICT_FALSE(*n == 0)) { + *n = 0; + return Status::OK(); + } + Slice* out = reinterpret_cast<Slice*>(dst->data()); + _code_buf.resize((*n) * sizeof(int32_t)); + + // copy the codewords into a temporary buffer first + // And then copy the strings corresponding to the codewords to the destination buffer + BitShufflePageDecoder<OLAP_FIELD_TYPE_INT>* data_ptr = + down_cast<BitShufflePageDecoder<OLAP_FIELD_TYPE_INT>*>(_data_page_decoder.get()); Review comment: Why do we need the raw pointer here? Couldn't we simply use _data_page_decoder->next_batch in the following code? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org