morningman commented on code in PR #12899:
URL: https://github.com/apache/doris/pull/12899#discussion_r979237838


##########
be/src/vec/exec/scan/new_odbc_scanner.cpp:
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_odbc_scanner.h"
+
+#include "common/status.h"
+#include "exec/text_converter.hpp"
+#include "vec/exec/scan/new_odbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+
+static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner";
+
+namespace doris::vectorized {
+NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, 
int64_t limit,
+                               MemTracker* mem_tracker, const TOdbcScanNode& 
odbc_scan_node)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+          _is_init(false),
+          _table_name(odbc_scan_node.table_name),
+          _connect_string(odbc_scan_node.connect_string),
+          _query_string(odbc_scan_node.query_string),
+          _tuple_id(odbc_scan_node.tuple_id),
+          _tuple_desc(nullptr) {}
+
+Status NewOdbcScanner::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
+
+    if (_is_init) {
+        return Status::OK();
+    }
+
+    if (nullptr == state) {
+        return Status::InternalError("input pointer is null.");
+    }
+
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+    // get tuple desc
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+    if (nullptr == _tuple_desc) {
+        return Status::InternalError("Failed to get tuple descriptor.");
+    }
+
+    _odbc_param.connect_string = std::move(_connect_string);
+    _odbc_param.query_string = std::move(_query_string);
+    _odbc_param.tuple_desc = _tuple_desc;
+
+    _odbc_scanner.reset(new (std::nothrow) ODBCConnector(_odbc_param));
+
+    if (_odbc_scanner == nullptr) {
+        return Status::InternalError("new a odbc scanner failed.");
+    }
+
+    _text_converter.reset(new (std::nothrow) TextConverter('\\'));
+
+    if (_text_converter == nullptr) {
+        return Status::InternalError("new a text convertor failed.");
+    }
+
+    _is_init = true;
+
+    return Status::OK();
+}
+
+Status NewOdbcScanner::open(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open";
+
+    if (nullptr == state) {
+        return Status::InternalError("input pointer is null.");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
+
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(VScanner::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+    RETURN_IF_ERROR(_odbc_scanner->open());
+    RETURN_IF_ERROR(_odbc_scanner->query());
+    // check materialize slot num
+
+    return Status::OK();
+}
+
+Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block, 
bool* eof) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl";
+
+    if (nullptr == state || nullptr == block || nullptr == eof) {
+        return Status::InternalError("input is NULL pointer");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
+    RETURN_IF_CANCELLED(state);
+
+    auto column_size = _tuple_desc->slots().size();
+    std::vector<MutableColumnPtr> columns(column_size);
+
+    bool mem_reuse = block->mem_reuse();
+    // only empty block should be here
+    DCHECK(block->rows() == 0);
+
+    // Indicates whether there are more rows to process. Set in 
_odbc_scanner.next().
+    bool odbc_eof = false;
+
+    do {
+        RETURN_IF_CANCELLED(state);
+
+        columns.resize(column_size);
+        for (auto i = 0; i < column_size; i++) {
+            if (mem_reuse) {
+                columns[i] = 
std::move(*block->get_by_position(i).column).mutate();
+            } else {
+                columns[i] = 
_tuple_desc->slots()[i]->get_empty_mutable_column();
+            }
+        }
+
+        for (int row_index = 0; true; row_index++) {
+            // block is full, break
+            if (state->batch_size() <= columns[0]->size()) {
+                break;
+            }
+
+            RETURN_IF_ERROR(_odbc_scanner->get_next_row(&odbc_eof));
+
+            if (odbc_eof) {
+                *eof = true;
+                break;
+            }
+
+            // Read one row from reader
+            for (int column_index = 0, materialized_column_index = 0; 
column_index < column_size;
+                 ++column_index) {
+                auto slot_desc = _tuple_desc->slots()[column_index];
+                // because the fe planner filter the non_materialize column
+                if (!slot_desc->is_materialized()) {
+                    continue;
+                }
+                const auto& column_data = 
_odbc_scanner->get_column_data(materialized_column_index);
+
+                char* value_data = 
static_cast<char*>(column_data.target_value_ptr);
+                int value_len = column_data.strlen_or_ind;
+
+                if (value_len == SQL_NULL_DATA) {
+                    if (slot_desc->is_nullable()) {
+                        columns[column_index]->insert_default();
+                    } else {
+                        return Status::InternalError(
+                                "nonnull column contains nullptr. table={}, 
column={}", _table_name,
+                                slot_desc->col_name());
+                    }
+                } else if (value_len > column_data.buffer_length) {
+                    return Status::InternalError(
+                            "column value length longer than buffer length. "
+                            "table={}, column={}, buffer_length",
+                            _table_name, slot_desc->col_name(), 
column_data.buffer_length);
+                } else {
+                    if (!_text_converter->write_column(slot_desc, 
&columns[column_index],
+                                                       value_data, value_len, 
true, false)) {
+                        std::stringstream ss;
+                        ss << "Fail to convert odbc value:'" << value_data << 
"' to "
+                           << slot_desc->type() << " on column:`" << 
slot_desc->col_name() + "`";
+                        return Status::InternalError(ss.str());
+                    }
+                }
+                materialized_column_index++;
+            }
+        }
+
+        // Before really use the Block, muse clear other ptr of column in block

Review Comment:
   ```suggestion
           // Before really use the Block, must clear other ptr of column in 
block
   ```



##########
be/src/vec/exec/scan/new_odbc_scanner.cpp:
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_odbc_scanner.h"
+
+#include "common/status.h"
+#include "exec/text_converter.hpp"
+#include "vec/exec/scan/new_odbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+
+static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner";
+
+namespace doris::vectorized {
+NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, 
int64_t limit,
+                               MemTracker* mem_tracker, const TOdbcScanNode& 
odbc_scan_node)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+          _is_init(false),
+          _table_name(odbc_scan_node.table_name),
+          _connect_string(odbc_scan_node.connect_string),
+          _query_string(odbc_scan_node.query_string),
+          _tuple_id(odbc_scan_node.tuple_id),
+          _tuple_desc(nullptr) {}
+
+Status NewOdbcScanner::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
+
+    if (_is_init) {
+        return Status::OK();
+    }
+
+    if (nullptr == state) {
+        return Status::InternalError("input pointer is null.");
+    }
+
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+    // get tuple desc
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+    if (nullptr == _tuple_desc) {
+        return Status::InternalError("Failed to get tuple descriptor.");
+    }
+
+    _odbc_param.connect_string = std::move(_connect_string);
+    _odbc_param.query_string = std::move(_query_string);
+    _odbc_param.tuple_desc = _tuple_desc;
+
+    _odbc_scanner.reset(new (std::nothrow) ODBCConnector(_odbc_param));
+
+    if (_odbc_scanner == nullptr) {
+        return Status::InternalError("new a odbc scanner failed.");
+    }
+
+    _text_converter.reset(new (std::nothrow) TextConverter('\\'));
+
+    if (_text_converter == nullptr) {
+        return Status::InternalError("new a text convertor failed.");
+    }
+
+    _is_init = true;
+
+    return Status::OK();
+}
+
+Status NewOdbcScanner::open(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open";
+
+    if (nullptr == state) {
+        return Status::InternalError("input pointer is null.");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
+
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(VScanner::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+    RETURN_IF_ERROR(_odbc_scanner->open());
+    RETURN_IF_ERROR(_odbc_scanner->query());
+    // check materialize slot num
+
+    return Status::OK();
+}
+
+Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block, 
bool* eof) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl";
+
+    if (nullptr == state || nullptr == block || nullptr == eof) {
+        return Status::InternalError("input is NULL pointer");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
+    RETURN_IF_CANCELLED(state);
+
+    auto column_size = _tuple_desc->slots().size();
+    std::vector<MutableColumnPtr> columns(column_size);
+
+    bool mem_reuse = block->mem_reuse();
+    // only empty block should be here
+    DCHECK(block->rows() == 0);
+
+    // Indicates whether there are more rows to process. Set in 
_odbc_scanner.next().
+    bool odbc_eof = false;
+
+    do {
+        RETURN_IF_CANCELLED(state);
+
+        columns.resize(column_size);
+        for (auto i = 0; i < column_size; i++) {
+            if (mem_reuse) {
+                columns[i] = 
std::move(*block->get_by_position(i).column).mutate();
+            } else {
+                columns[i] = 
_tuple_desc->slots()[i]->get_empty_mutable_column();
+            }
+        }
+
+        for (int row_index = 0; true; row_index++) {
+            // block is full, break
+            if (state->batch_size() <= columns[0]->size()) {
+                break;
+            }
+
+            RETURN_IF_ERROR(_odbc_scanner->get_next_row(&odbc_eof));
+
+            if (odbc_eof) {
+                *eof = true;

Review Comment:
   You should make sure when eof is set to true, the block you return must be 
empty.
   Here, if `odbc_eof` is true, there may still has data in `block`.
   
   You can set `odbc_eof` as a member of this odbc scanner. and the `eof` can 
be set to true iff:
   1. `odbc_eof` is true, and
   2. block->rows() == 0
   
   And each time we call `_get_block_impl()`, we check `odbc_eof` first, if it 
is true, no need to read data and return with `eof` as true.



##########
be/src/vec/exec/scan/new_odbc_scanner.h:
##########
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/odbc_connector.h"
+#include "exec/text_converter.h"
+#include "vec/exec/scan/new_odbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+
+namespace doris::vectorized {
+class NewOdbcScanner : public VScanner {
+public:
+    NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit,
+                   MemTracker* mem_tracker, const TOdbcScanNode& 
odbc_scan_node);
+
+    Status open(RuntimeState* state) override;
+
+    // Close the odbc_scanner, and report errors.
+    Status close(RuntimeState* state) override;
+
+public:
+    Status prepare(RuntimeState* state);
+
+protected:
+    Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
+
+private:
+    bool _is_init;
+
+    std::string _table_name;
+
+    std::string _connect_string;
+
+    std::string _query_string;
+    // Tuple id resolved in prepare() to set _tuple_desc;
+    TupleId _tuple_id;
+
+    // Descriptor of tuples read from ODBC table.
+    const TupleDescriptor* _tuple_desc;
+
+    // Scanner of ODBC.
+    std::unique_ptr<ODBCConnector> _odbc_scanner;

Review Comment:
   ```suggestion
       std::unique_ptr<ODBCConnector> _odbc_connector;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to