morningman commented on code in PR #13867:
URL: https://github.com/apache/doris/pull/13867#discussion_r1021123018


##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java:
##########
@@ -79,6 +84,43 @@ public void createScanRangeLocations(ParamCreateContext 
context, BackendPolicy b
 
             // set hdfs params for hdfs file type.
             Map<String, String> locationProperties = getLocationProperties();

Review Comment:
   Move this line after L123



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -257,6 +257,24 @@ struct TFileAttributes {
     9: optional string header_type;
 }
 
+struct TIcebergDeleteFileDesc {
+    1: optional string path;
+    2: optional i64 position_lower_bound;
+    3: optional i64 position_upper_bound;
+    4: optional list<i32> field_ids;
+}
+
+struct TIcebergFileDesc {
+    1: optional i32 format_version;
+    2: optional i32 content;

Review Comment:
   Add comment to explain `content` field



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java:
##########
@@ -79,6 +84,43 @@ public void createScanRangeLocations(ParamCreateContext 
context, BackendPolicy b
 
             // set hdfs params for hdfs file type.
             Map<String, String> locationProperties = getLocationProperties();
+            if (inputSplit instanceof IcebergSplit) {
+                TTableFormatFileDesc tableFormatFileDesc = new 
TTableFormatFileDesc();
+                IcebergSplit icebergSplit = (IcebergSplit) inputSplit;
+                
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
+                TIcebergFileDesc fileDesc = new TIcebergFileDesc();
+                int formatVersion = icebergSplit.getFormatVersion();
+                fileDesc.setFormatVersion(formatVersion);
+                if (formatVersion < 2) {

Review Comment:
   The magic number `2` should be defined somewhere.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java:
##########
@@ -79,6 +84,43 @@ public void createScanRangeLocations(ParamCreateContext 
context, BackendPolicy b
 
             // set hdfs params for hdfs file type.
             Map<String, String> locationProperties = getLocationProperties();
+            if (inputSplit instanceof IcebergSplit) {

Review Comment:
   The whole `if (inputSplit instanceof IcebergSplit)` should be extracted to a 
method.



##########
be/src/vec/exec/format/table/iceberg_reader.h:
##########
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vec/exec/format/parquet/parquet_common.h>
+
+#include <queue>
+
+#include "table_format_reader.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace doris::vectorized {
+
+class IcebergTableReader : public TableFormatReader {
+public:
+    IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* 
profile,
+                       RuntimeState* state, const TFileScanRangeParams& params)
+            : TableFormatReader(file_format_reader),
+              _profile(profile),
+              _state(state),
+              _params(params) {}
+    Status init_row_filters();
+    void filter_rows() override;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+public:
+    enum { DATA, POSITON_DELELE, EQULITY_DELELE };
+    struct PositionDeleteParams {
+        int64_t low_bound_index = -1;
+        int64_t upper_bound_index = -1;
+        int64_t last_delete_row_index = -1;
+        int64_t total_file_rows = 0;
+    };
+
+private:
+    RuntimeProfile* _profile;
+    RuntimeState* _state;
+    const TFileScanRangeParams& _params;
+    std::vector<const FieldSchema*> _column_schemas;
+    std::queue<GenericReader*> _delete_file_readers;
+    GenericReader* _cur_delete_file_reader;
+    PositionDeleteParams _position_delete_params;
+    //    int64_t _read_rows = 0;

Review Comment:
   Remove unused code.



##########
be/src/vec/exec/format/table/iceberg_reader.cpp:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "iceberg_reader.h"
+
+#include <vec/core/column_with_type_and_name.h>
+#include <vec/exec/format/parquet/vparquet_reader.h>
+
+#include <vec/data_types/data_type_factory.hpp>
+
+#include "vec/common/assert_cast.h"
+
+namespace doris::vectorized {
+
+const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
+
+Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
+    Status status = _file_format_reader->get_next_block(block, read_rows, eof);
+    //    _read_rows += read_rows;

Review Comment:
   delete unused code.



##########
be/src/vec/exec/format/table/iceberg_reader.h:
##########
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vec/exec/format/parquet/parquet_common.h>
+
+#include <queue>
+
+#include "table_format_reader.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace doris::vectorized {
+
+class IcebergTableReader : public TableFormatReader {
+public:
+    IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* 
profile,
+                       RuntimeState* state, const TFileScanRangeParams& params)
+            : TableFormatReader(file_format_reader),
+              _profile(profile),
+              _state(state),
+              _params(params) {}
+    Status init_row_filters();
+    void filter_rows() override;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+public:
+    enum { DATA, POSITON_DELELE, EQULITY_DELELE };
+    struct PositionDeleteParams {
+        int64_t low_bound_index = -1;
+        int64_t upper_bound_index = -1;
+        int64_t last_delete_row_index = -1;
+        int64_t total_file_rows = 0;
+    };
+
+private:
+    RuntimeProfile* _profile;
+    RuntimeState* _state;
+    const TFileScanRangeParams& _params;
+    std::vector<const FieldSchema*> _column_schemas;
+    std::queue<GenericReader*> _delete_file_readers;

Review Comment:
   ```suggestion
       std::deque<std::unique_ptr<GenericReader>> _delete_file_readers;
   ```



##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -468,18 +471,29 @@ Status VFileScanner::_get_next_reader() {
         // create reader for specific format
         // TODO: add json, avro
         Status init_status;
+        // TODO: use data lake type
         switch (_params.format_type) {
         case TFileFormatType::FORMAT_PARQUET: {
-            _cur_reader.reset(new ParquetReader(
-                    _profile, _params, range, _file_col_names, 
_state->query_options().batch_size,
-                    const_cast<cctz::time_zone*>(&_state->timezone_obj())));
+            ParquetReader* parquet_reader =
+                    new ParquetReader(_profile, _params, range, 
_state->query_options().batch_size,
+                                      
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
             if (_push_down_expr == nullptr && _vconjunct_ctx != nullptr &&
                 _partition_slot_descs.empty()) { // TODO: support partition 
columns
                 RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, 
&_push_down_expr));
                 _discard_conjuncts();
             }
-            init_status = ((ParquetReader*)(_cur_reader.get()))
-                                  ->init_reader(_colname_to_value_range, 
_push_down_expr);
+            init_status = parquet_reader->init_reader(_file_col_names, 
_colname_to_value_range,
+                                                      _push_down_expr);
+            if (_params.__isset.table_format_params &&
+                _params.table_format_params.table_format_type == "iceberg") {
+                IcebergTableReader* iceberg_reader = new IcebergTableReader(

Review Comment:
   You pass the `parquet_reader` to `iceberg_reader`, and nowhere to delete it, 
cause memory leak.
   Here is my suggestions:
   1. if this is a iceberg table reader, pass the `parquet_reader` to 
`iceberg_reader`, but `iceberg_reader`
   hold the `parquet_reader` in a unique_ptr, so that it can be deleted after 
`iceberg_reader` destroyed.



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -299,6 +312,72 @@ Status 
ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
+void ParquetReader::merge_delete_row_ranges(const std::vector<RowRange>& 
delete_row_ranges) {
+    //    std::vector<RowRange> group_delete_row_ranges;

Review Comment:
   Remove unused code



##########
be/src/vec/exec/scan/vfile_scanner.cpp:
##########
@@ -468,18 +471,29 @@ Status VFileScanner::_get_next_reader() {
         // create reader for specific format
         // TODO: add json, avro
         Status init_status;
+        // TODO: use data lake type
         switch (_params.format_type) {
         case TFileFormatType::FORMAT_PARQUET: {
-            _cur_reader.reset(new ParquetReader(
-                    _profile, _params, range, _file_col_names, 
_state->query_options().batch_size,
-                    const_cast<cctz::time_zone*>(&_state->timezone_obj())));
+            ParquetReader* parquet_reader =
+                    new ParquetReader(_profile, _params, range, 
_state->query_options().batch_size,
+                                      
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
             if (_push_down_expr == nullptr && _vconjunct_ctx != nullptr &&
                 _partition_slot_descs.empty()) { // TODO: support partition 
columns
                 RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, 
&_push_down_expr));
                 _discard_conjuncts();
             }
-            init_status = ((ParquetReader*)(_cur_reader.get()))
-                                  ->init_reader(_colname_to_value_range, 
_push_down_expr);
+            init_status = parquet_reader->init_reader(_file_col_names, 
_colname_to_value_range,
+                                                      _push_down_expr);
+            if (_params.__isset.table_format_params &&
+                _params.table_format_params.table_format_type == "iceberg") {
+                IcebergTableReader* iceberg_reader = new IcebergTableReader(
+                        (GenericReader*)parquet_reader, _profile, _state, 
_params);
+                iceberg_reader->init_row_filters();
+                iceberg_reader->filter_rows();

Review Comment:
   `filter_rows()` method should move into `init_row_filters`



##########
be/src/vec/exec/format/parquet/vparquet_column_reader.h:
##########
@@ -106,12 +106,12 @@ class ParquetColumnReader {
     virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                                     ColumnSelectVector& select_vector, size_t 
batch_size,
                                     size_t* read_rows, bool* eof) = 0;
-    static Status create(FileReader* file, FieldSchema* field, const 
ParquetReadColumn& column,
-                         const tparquet::RowGroup& row_group, 
std::vector<RowRange>& row_ranges,
+    static Status create(FileReader* file, FieldSchema* field, const 
tparquet::RowGroup& row_group,
                          cctz::time_zone* ctz, 
std::unique_ptr<ParquetColumnReader>& reader,
                          size_t max_buf_size);
     void init_column_metadata(const tparquet::ColumnChunk& chunk);
     void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index 
= offset_index; }
+    void add_row_ranges(const std::vector<RowRange>* row_ranges) { _row_ranges 
= row_ranges; };

Review Comment:
   ```suggestion
       void set_row_ranges(const std::vector<RowRange>* row_ranges) { 
_row_ranges = row_ranges; };
   ```



##########
be/src/vec/exec/format/parquet/vparquet_reader.h:
##########
@@ -53,24 +53,33 @@ class ParquetReader : public GenericReader {
     };
 
     ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
-                  const TFileRangeDesc& range, const std::vector<std::string>& 
column_names,
-                  size_t batch_size, cctz::time_zone* ctz);
+                  const TFileRangeDesc& range, size_t batch_size, 
cctz::time_zone* ctz);
 
-    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
-                  const std::vector<std::string>& column_names);
+    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range);
 
     ~ParquetReader() override;
     // for test
     void set_file_reader(FileReader* file_reader) { 
_file_reader.reset(file_reader); }
 
+    Status init_reader(const std::vector<std::string>& column_names,
+                       const bool& filter_groups = true) {
+        // without predicate
+        return init_reader(column_names, nullptr, nullptr, filter_groups);
+    }
+
     Status init_reader(
+            const std::vector<std::string>& column_names,
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
-            VExprContext* vconjunct_ctx);
+            VExprContext* vconjunct_ctx, const bool& filter_groups = true);

Review Comment:
   ```suggestion
               VExprContext* vconjunct_ctx, bool filter_groups = true);
   ```



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -355,25 +438,34 @@ Status ParquetReader::_init_row_group_readers() {
     return Status::OK();
 }
 
-Status ParquetReader::_filter_row_groups() {
-    if (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0) {
+Status ParquetReader::_filter_row_groups(const bool& enabled,
+                                         std::vector<RowGroupIndex>& 
group_indexes) {
+    if (enabled && (_total_groups == 0 || _t_metadata->num_rows == 0 || 
_range_size < 0)) {
         return Status::EndOfFile("No row group need read");
     }
+    int32_t start_row_id = 0;

Review Comment:
   if `enabled` is false, you can return immediately.



##########
be/src/vec/exec/format/table/table_format_reader.h:
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vec/exec/format/parquet/parquet_common.h>
+
+#include <string>
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace doris::vectorized {
+
+class TableFormatReader : public GenericReader {
+public:
+    TableFormatReader(GenericReader* file_format_reader);
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override 
{
+        return _file_format_reader->get_next_block(block, read_rows, eof);
+    }
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override 
{
+        return _file_format_reader->get_columns(name_to_type, missing_cols);
+    }
+
+    virtual void filter_rows() = 0;
+
+protected:
+    std::string _table_format;          // hudi, iceberg
+    GenericReader* _file_format_reader; // parquet, orc

Review Comment:
   ```suggestion
       std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
   ```



##########
be/src/vec/exec/format/table/iceberg_reader.cpp:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "iceberg_reader.h"
+
+#include <vec/core/column_with_type_and_name.h>
+#include <vec/exec/format/parquet/vparquet_reader.h>
+
+#include <vec/data_types/data_type_factory.hpp>
+
+#include "vec/common/assert_cast.h"
+
+namespace doris::vectorized {
+
+const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
+
+Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
+    Status status = _file_format_reader->get_next_block(block, read_rows, eof);
+    //    _read_rows += read_rows;
+    //    filter delete row here
+    //    filter_rows()
+
+    // todo: align the delete file and range start
+    //    compare _read_rows
+    //    _position_delete_params.low_bound_index
+    //    _position_delete_params.upper_bound_index
+
+    //    compare _read_rows and delete_range start, delete_range end
+
+    return status;
+}
+
+Status IcebergTableReader::get_columns(
+        std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+        std::unordered_set<std::string>* missing_cols) {
+    return _file_format_reader->get_columns(name_to_type, missing_cols);
+}
+
+void IcebergTableReader::filter_rows() {
+    if (_cur_delete_file_reader == nullptr) {
+        return;
+    }
+    auto& table_desc = _params.table_format_params.iceberg_params;
+    auto& version = table_desc.format_version;
+    if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
+        return;
+    }
+    bool eof = false;
+    std::vector<RowRange> delete_row_ranges;
+    while (!eof) {
+        size_t read_rows = 0;
+        Block block = Block();
+        for (const FieldSchema* field : _column_schemas) {
+            DataTypePtr data_type = 
DataTypeFactory::instance().create_data_type(field->type, true);
+            MutableColumnPtr data_column = data_type->create_column();
+            block.insert(ColumnWithTypeAndName(std::move(data_column), 
data_type, field->name));
+        }
+        Status st = _cur_delete_file_reader->get_next_block(&block, 
&read_rows, &eof);
+        if (!st.ok() || eof) {
+            if (!_delete_file_readers.empty()) {
+                _cur_delete_file_reader = _delete_file_readers.front();
+                _delete_file_readers.pop();
+            }
+        }
+        if (read_rows != 0) {
+            auto& pos_type_column = block.get_by_name("pos");

Review Comment:
   "pos" should be define somewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to