This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant-sparse in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push: new 75689e5011a [enhance](variant) add inverted index (#48062) 75689e5011a is described below commit 75689e5011a656eb862bfd52a8b4d9625b2b66c2 Author: Sun Chenyang <suncheny...@selectdb.com> AuthorDate: Thu Feb 20 14:22:02 2025 +0800 [enhance](variant) add inverted index (#48062) --- be/src/olap/rowset/segment_v2/column_reader.cpp | 17 ++ be/src/olap/rowset/segment_v2/column_reader.h | 5 + be/src/olap/rowset/segment_v2/segment.cpp | 1 + be/src/olap/rowset/segment_v2/segment_iterator.cpp | 31 +++- be/src/olap/rowset/segment_v2/segment_writer.cpp | 4 +- .../segment_v2/variant_column_writer_impl.cpp | 17 +- .../rowset/segment_v2/variant_column_writer_impl.h | 2 + .../rowset/segment_v2/vertical_segment_writer.cpp | 3 +- .../data/variant_p0/update/inverted_index/load.out | Bin 0 -> 375 bytes .../variant_p0/update/inverted_index/query.out | Bin 0 -> 655 bytes .../data/variant_p0/with_index/var_index.out | Bin 745 -> 740 bytes .../variant_p0/update/inverted_index/load.groovy | 77 ++++++++ .../variant_p0/update/inverted_index/query.groovy | 201 +++++++++++++++++++++ 13 files changed, 338 insertions(+), 20 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index e0ec447cb1c..c71b45b82c1 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -478,6 +478,18 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF // check the root is already a leaf node _subcolumn_readers->add(relative_path, SubcolumnReader {std::move(reader), get_data_type_fn()}); + // init TabletIndex for subcolumns + if (opts.inverted_index) { + const auto& suffix_path = path.get_path(); + auto it = _variant_subcolumns_indexes.find(suffix_path); + if (it == _variant_subcolumns_indexes.end()) { + auto subcolumn_index = std::make_unique<TabletIndex>(*opts.inverted_index); + subcolumn_index->set_escaped_escaped_index_suffix_path(suffix_path); + _variant_subcolumns_indexes.emplace(suffix_path, std::move(subcolumn_index)); + } else { + DCHECK(false); + } + } } } @@ -495,6 +507,11 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF return Status::OK(); } +TabletIndex* VariantColumnReader::find_subcolumn_tablet_index(const std::string& path) { + auto it = _variant_subcolumns_indexes.find(path); + return it == _variant_subcolumns_indexes.end() ? nullptr : it->second.get(); +} + Status ColumnReader::create_variant(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, uint32_t column_id, uint64_t num_rows, const io::FileReaderSPtr& file_reader, diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 50b171c8d63..49058ef511f 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -88,6 +88,8 @@ struct ColumnReaderOptions { bool kept_in_memory = false; int be_exec_version = -1; + + const TabletIndex* inverted_index = nullptr; }; struct ColumnIteratorOptions { @@ -325,6 +327,8 @@ public: int64_t get_metadata_size() const override; + TabletIndex* find_subcolumn_tablet_index(const std::string&); + private: bool _read_flat_leaves(ReaderType type, const TabletColumn& target_col); // init for compaction read @@ -337,6 +341,7 @@ private: std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers; std::unique_ptr<ColumnReader> _sparse_column_reader; std::unique_ptr<VariantStatistics> _statistics; + std::map<std::string, std::unique_ptr<TabletIndex>> _variant_subcolumns_indexes; }; // Base iterator to read one column data diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 81b0a37e345..d5d49241e25 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -629,6 +629,7 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) { ColumnReaderOptions opts { .kept_in_memory = _tablet_schema->is_in_memory(), .be_exec_version = _be_exec_version, + .inverted_index = _tablet_schema->inverted_index(column.unique_id()), }; std::unique_ptr<ColumnReader> reader; RETURN_IF_ERROR(ColumnReader::create(opts, footer, iter->second, footer.num_rows(), diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 8ef6f06f896..5e2f6dcbed7 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -536,6 +536,13 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() { } } } + DBUG_EXECUTE_IF("segment_iterator.apply_inverted_index", { + LOG(INFO) << "Debug Point: segment_iterator.apply_inverted_index"; + if (!_common_expr_ctxs_push_down.empty() || !_col_predicates.empty()) { + return Status::Error<ErrorCode::INTERNAL_ERROR>("it is failed to apply inverted index"); + } + }) + if (!_row_bitmap.isEmpty() && (!_opts.topn_filter_source_node_ids.empty() || !_opts.col_id_to_predicates.empty() || _opts.delete_condition_predicates->num_of_column_predicate() > 0)) { @@ -1068,12 +1075,24 @@ Status SegmentIterator::_init_inverted_index_iterators() { // This is because the sub-column is created in create_materialized_variant_column. // We use this column to locate the metadata for the inverted index, which requires a unique_id and path. const auto& column = _opts.tablet_schema->column(cid); - int32_t col_unique_id = - column.is_extracted_column() ? column.parent_unique_id() : column.unique_id(); - RETURN_IF_ERROR(_segment->new_inverted_index_iterator( - column, - _segment->_tablet_schema->inverted_index(col_unique_id, column.suffix_path()), - _opts, &_inverted_index_iterators[cid])); + const TabletIndex* index_meta = nullptr; + if (column.is_extracted_column()) { + const TabletIndex* parent_index_meta = + _segment->_tablet_schema->inverted_index(column.parent_unique_id()); + + // variant column has no inverted index + if (parent_index_meta == nullptr) { + continue; + } + + auto* column_reader = _segment->_column_readers.at(column.parent_unique_id()).get(); + index_meta = assert_cast<VariantColumnReader*>(column_reader) + ->find_subcolumn_tablet_index(column.suffix_path()); + } else { + index_meta = _segment->_tablet_schema->inverted_index(column.unique_id()); + } + RETURN_IF_ERROR(_segment->new_inverted_index_iterator(column, index_meta, _opts, + &_inverted_index_iterators[cid])); } } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 3529b9d4cb5..d82ec538e98 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -228,9 +228,9 @@ Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& co opts.inverted_index = index; opts.need_inverted_index = true; DCHECK(_inverted_index_file_writer != nullptr); - opts.inverted_index_file_writer = _inverted_index_file_writer; - // TODO support multiple inverted index } + opts.inverted_index_file_writer = _inverted_index_file_writer; + #define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \ if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \ opts.need_zone_map = false; \ diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index bb8558ee3d4..344273fdf31 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -539,19 +539,16 @@ Status VariantColumnWriterImpl::_create_column_writer(uint32_t cid, const Tablet // } opts.need_bitmap_index = parent_column.has_bitmap_index(); - bool skip_inverted_index = false; - if (_opts.rowset_ctx != nullptr) { - // skip write inverted index for index compaction column - skip_inverted_index = _opts.rowset_ctx->columns_to_do_index_compaction.contains( - parent_column.unique_id()); - } - if (const auto& index = tablet_schema->inverted_index(parent_column); - index != nullptr && !skip_inverted_index) { - opts.inverted_index = index; + const auto& index = tablet_schema->inverted_index(parent_column.unique_id()); + if (index != nullptr && + segment_v2::InvertedIndexColumnWriter::check_support_inverted_index(column)) { + auto subcolumn_index = std::make_unique<TabletIndex>(*index); + subcolumn_index->set_escaped_escaped_index_suffix_path(column.path_info_ptr()->get_path()); + opts.inverted_index = subcolumn_index.get(); opts.need_inverted_index = true; DCHECK(_opts.inverted_index_file_writer != nullptr); opts.inverted_index_file_writer = _opts.inverted_index_file_writer; - // TODO support multiple inverted index + _subcolumns_indexes.emplace_back(std::move(subcolumn_index)); } #define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \ diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h index 1f9d5d3191a..fe00999dc38 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h @@ -98,6 +98,8 @@ private: // staticstics which will be persisted in the footer VariantStatistics _statistics; + + std::vector<std::unique_ptr<TabletIndex>> _subcolumns_indexes; }; } // namespace segment_v2 } // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 3ae8f69e0a1..2b4e8286822 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -214,9 +214,8 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo opts.inverted_index = index; opts.need_inverted_index = true; DCHECK(_inverted_index_file_writer != nullptr); - opts.inverted_index_file_writer = _inverted_index_file_writer; - // TODO support multiple inverted index } + opts.inverted_index_file_writer = _inverted_index_file_writer; #define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \ if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \ diff --git a/regression-test/data/variant_p0/update/inverted_index/load.out b/regression-test/data/variant_p0/update/inverted_index/load.out new file mode 100644 index 00000000000..771bc652d02 Binary files /dev/null and b/regression-test/data/variant_p0/update/inverted_index/load.out differ diff --git a/regression-test/data/variant_p0/update/inverted_index/query.out b/regression-test/data/variant_p0/update/inverted_index/query.out new file mode 100644 index 00000000000..d71b7a8eca8 Binary files /dev/null and b/regression-test/data/variant_p0/update/inverted_index/query.out differ diff --git a/regression-test/data/variant_p0/with_index/var_index.out b/regression-test/data/variant_p0/with_index/var_index.out index 100892e5a06..79439b6617e 100644 Binary files a/regression-test/data/variant_p0/with_index/var_index.out and b/regression-test/data/variant_p0/with_index/var_index.out differ diff --git a/regression-test/suites/variant_p0/update/inverted_index/load.groovy b/regression-test/suites/variant_p0/update/inverted_index/load.groovy new file mode 100644 index 00000000000..bc7a93e8f9a --- /dev/null +++ b/regression-test/suites/variant_p0/update/inverted_index/load.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("update_test_index_load", "p0") { + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + + def create_table_load_data = {create_table_name-> + sql "DROP TABLE IF EXISTS ${create_table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${create_table_name} ( + k bigint, + v variant NOT NULL, + INDEX idx(v) USING INVERTED PROPERTIES("parser"="standard") + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 6 + properties("replication_num" = "1", "disable_auto_compaction" = "true", "variant_max_subcolumns_count" = "0"); + """ + + for (int i = 0; i < 10; i++) { + load_json_data.call(create_table_name, """${getS3Url() + '/regression/load/ghdata_sample.json'}""") + } + sql """set enable_match_without_inverted_index = false""" + sql """ set inverted_index_skip_threshold = 0 """ + qt_sql """ select count() from ${create_table_name} """ + qt_sql """ select count() from ${create_table_name} where cast (v['repo']['name'] as string) match 'github'""" + qt_sql """ select count() from ${create_table_name} where cast (v['actor']['id'] as int) > 1575592 """ + qt_sql """ select count() from ${create_table_name} where cast (v['actor']['id'] as int) > 1575592 and cast (v['repo']['name'] as string) match 'github'""" + } + + create_table_load_data.call("test_update_index_sc") + create_table_load_data.call("test_update_index_sc2") + create_table_load_data.call("test_update_index_compact") + create_table_load_data.call("test_update_index_compact2") +} diff --git a/regression-test/suites/variant_p0/update/inverted_index/query.groovy b/regression-test/suites/variant_p0/update/inverted_index/query.groovy new file mode 100644 index 00000000000..a156a560fcd --- /dev/null +++ b/regression-test/suites/variant_p0/update/inverted_index/query.groovy @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("update_test_index_query", "p0") { + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + set 'memtable_on_sink_node', 'true' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def compaction = {compact_table_name -> + + def tablets = sql_return_maparray """ show tablets from ${compact_table_name}; """ + + // trigger compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def backend_id = tablet.BackendId + def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + } + + // wait for all compactions done + for (def tablet in tablets) { + Awaitility.await().atMost(30, TimeUnit.MINUTES).untilAsserted(() -> { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + def backend_id = tablet.BackendId + def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("compaction task for this tablet is not running", compactionStatus.msg.toLowerCase()) + }); + } + + + for (def tablet in tablets) { + int afterSegmentCount = 0 + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + logger.info("rowset is: " + rowset) + afterSegmentCount += Integer.parseInt(rowset.split(" ")[1]) + } + assertEquals(afterSegmentCount, 1) + } + } + + def normal_check = {check_table_name-> + sql """set enable_match_without_inverted_index = false """ + sql """ set inverted_index_skip_threshold = 0 """ + qt_sql """ select count() from ${check_table_name} """ + qt_sql """ select count() from ${check_table_name} where cast (v['repo']['name'] as string) match 'github'""" + qt_sql """ select count() from ${check_table_name} where cast (v['actor']['id'] as int) > 1575592 """ + qt_sql """ select count() from ${check_table_name} where cast (v['actor']['id'] as int) > 1575592 and cast (v['repo']['name'] as string) match 'github'""" + } + + def table_name = "test_update_index_compact" + + for (int i = 0; i < 2; i++) { + load_json_data.call(table_name, """${getS3Url() + '/regression/load/ghdata_sample.json'}""") + } + + GetDebugPoint().enableDebugPointForAllBEs("segment_iterator.apply_inverted_index") + + normal_check.call(table_name) + + compaction.call(table_name) + + normal_check.call(table_name) + + table_name = "test_update_index_compact2" + + normal_check.call(table_name) + + compaction.call(table_name) + + normal_check.call(table_name) + + + def schema_change = {schema_change_table_name -> + def tablets = sql_return_maparray """ show tablets from ${schema_change_table_name}; """ + Set<String> rowsetids = new HashSet<>(); + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + int segmentCount = Integer.parseInt(rowset.split(" ")[1]) + if (segmentCount == 0) { + continue; + } + String rowsetid = rowset.split(" ")[4]; + rowsetids.add(rowsetid) + logger.info("rowsetid: " + rowsetid) + } + } + sql """ alter table ${schema_change_table_name} modify column v variant null""" + Awaitility.await().atMost(30, TimeUnit.MINUTES).untilAsserted(() -> { + Thread.sleep(1000) + tablets = sql_return_maparray """ show tablets from ${schema_change_table_name}; """ + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + int segmentCount = Integer.parseInt(rowset.split(" ")[1]) + if (segmentCount == 0) { + continue; + } + String rowsetid = rowset.split(" ")[4]; + logger.info("rowsetid: " + rowsetid) + assertTrue(!rowsetids.contains(rowsetid)) + } + } + }); + } + + + def table_name_sc = "test_update_index_sc" + + for (int i = 0; i < 2; i++) { + load_json_data.call(table_name_sc, """${getS3Url() + '/regression/load/ghdata_sample.json'}""") + } + + normal_check.call(table_name_sc) + + schema_change.call(table_name_sc) + + normal_check.call(table_name_sc) + + table_name_sc = "test_update_index_sc2" + + normal_check.call(table_name_sc) + + schema_change.call(table_name_sc) + + normal_check.call(table_name_sc) + + GetDebugPoint().disableDebugPointForAllBEs("segment_iterator.apply_inverted_index") +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org