This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c50a310  [optimize] Optimize spark load/broker load reading parquet 
format file (#3878)
c50a310 is described below

commit c50a310f8fda2d727337984d0ea0552b6c23e697
Author: xy720 <[email protected]>
AuthorDate: Tue Jun 23 13:42:22 2020 +0800

    [optimize] Optimize spark load/broker load reading parquet format file 
(#3878)
    
    Add BufferedReader for reading parquet file via broker
---
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/broker_reader.cpp                      |   3 +
 be/src/exec/buffered_reader.cpp                    | 152 +++++++++++++++++
 be/src/exec/buffered_reader.h                      |  62 +++++++
 be/src/exec/parquet_reader.cpp                     |   6 +
 be/src/exec/parquet_scanner.cpp                    |   5 +-
 be/test/exec/CMakeLists.txt                        |   1 +
 be/test/exec/buffered_reader_test.cpp              | 182 +++++++++++++++++++++
 .../buffered_reader/buffered_reader_test_file      | Bin 0 -> 950 bytes
 .../buffered_reader/buffered_reader_test_file.txt  |   4 +
 run-ut.sh                                          |   1 +
 11 files changed, 415 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index c42afce..39ab4c8 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -30,6 +30,7 @@ set(EXEC_FILES
     blocking_join_node.cpp
     broker_scan_node.cpp
     broker_reader.cpp
+    buffered_reader.cpp
     base_scanner.cpp
     broker_scanner.cpp
     cross_join_node.cpp
diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp
index dac2a5b..2125bb3 100644
--- a/be/src/exec/broker_reader.cpp
+++ b/be/src/exec/broker_reader.cpp
@@ -155,6 +155,8 @@ Status BrokerReader::readat(int64_t position, int64_t 
nbytes, int64_t* bytes_rea
             return status;
         }
 
+        VLOG_RPC << "send pread request to broker:" << broker_addr << " 
position:" << position << ", read bytes length:" << nbytes;
+
         try {
             client->pread(response, request);
         } catch (apache::thrift::transport::TTransportException& e) {
@@ -253,3 +255,4 @@ void BrokerReader::close() {
 }
 
 } // namespace doris
+
diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp
new file mode 100644
index 0000000..696067f
--- /dev/null
+++ b/be/src/exec/buffered_reader.cpp
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+        : _reader(reader),
+          _buffer_size(buffer_size),
+          _buffer_offset(0),
+          _buffer_limit(0),
+          _cur_offset(0) {
+    _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+    close();
+}
+
+Status BufferedReader::open() {
+    if (!_reader) {
+        std::stringstream ss;
+        ss << "Open buffered reader failed, reader is null";
+        return Status::InternalError(ss.str());
+    }
+    RETURN_IF_ERROR(_reader->open());
+    RETURN_IF_ERROR(_fill());
+    return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+    return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+    DCHECK_NE(*buf_len, 0);
+    int64_t bytes_read;
+    RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+    if (bytes_read == 0) {
+        *eof = true;
+    } else {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) {
+    if (nbytes <= 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out));
+    //EOF
+    if (*bytes_read <= 0) {
+        return Status::OK();
+    }
+    while (*bytes_read < nbytes) {
+        int64_t len;
+        RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - 
*bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+        // EOF
+        if (len <= 0) {
+            break;
+        }
+        *bytes_read += len;
+    }
+    return Status::OK();
+}
+
+Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) {
+    // requested bytes missed the local buffer
+    if (position >= _buffer_limit || position < _buffer_offset) {
+        // if requested length is larger than the capacity of buffer, do not
+        // need to copy the character into local buffer.
+        if (nbytes > _buffer_size) {
+            return _reader->readat(position, nbytes, bytes_read, out);
+        }
+        _buffer_offset = position;
+        RETURN_IF_ERROR(_fill());
+        if (position >= _buffer_limit) {
+            *bytes_read = 0;
+            return Status::OK();
+        }
+    } 
+    int64_t len = std::min(_buffer_limit - position, nbytes);
+    int64_t off = position - _buffer_offset;
+    memcpy(out, _buffer + off, len);
+    *bytes_read = len;
+    _cur_offset = position + *bytes_read;
+    return Status::OK();
+}
+
+Status BufferedReader::_fill() {
+    if (_buffer_offset >= 0) {
+        int64_t bytes_read;
+        // retry for new content
+        int retry_times = 1;
+        do {
+            // fill the buffer
+            RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, 
&bytes_read, _buffer));
+        } while (bytes_read == 0 && retry_times++ < 2);
+        _buffer_limit = _buffer_offset + bytes_read;
+    }
+    return Status::OK();
+}
+
+int64_t BufferedReader::size() {
+    return _reader->size();
+}
+
+Status BufferedReader::seek(int64_t position) {
+    _cur_offset = position;
+    return Status::OK();
+}
+
+Status BufferedReader::tell(int64_t* position) {
+    *position = _cur_offset;
+    return Status::OK();
+}
+
+void BufferedReader::close() {
+    _reader->close();
+    SAFE_DELETE_ARRAY(_buffer);
+}
+
+bool BufferedReader::closed() {
+    return _reader->closed();
+}
+
+} // namespace doris
+
diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h
new file mode 100644
index 0000000..d7f2fbd
--- /dev/null
+++ b/be/src/exec/buffered_reader.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader
+// Add a cache layer between the caller and the file reader to reduce the 
+// times of calls to the read function to speed up. 
+class BufferedReader : public FileReader {
+public:
+    // If the reader need the file size, set it when construct FileReader.
+    // There is no other way to set the file size.
+    BufferedReader(FileReader* reader, int64_t = 1024 * 1024);
+    virtual ~BufferedReader();
+
+    virtual Status open() override;
+
+    // Read 
+    virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
+    virtual Status readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) override;
+    virtual Status read_one_message(uint8_t** buf, size_t* length) override;
+    virtual int64_t size() override;
+    virtual Status seek(int64_t position) override;
+    virtual Status tell(int64_t* position) override;
+    virtual void close() override;
+    virtual bool closed() override;
+
+private:
+    Status _fill();
+    Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, 
void* out);
+private:
+    FileReader* _reader;
+    char* _buffer;
+    int64_t _buffer_size;
+    int64_t _buffer_offset;
+    int64_t _buffer_limit;
+    int64_t _cur_offset;
+};
+
+}
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index 34af303..b3e37d5 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -158,6 +158,9 @@ inline Status ParquetReaderWrap::set_field_null(Tuple* 
tuple, const SlotDescript
 
 Status ParquetReaderWrap::read_record_batch(const 
std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof) {
     if (_current_line_of_group >= _rows_of_group) {// read next row group
+        VLOG(7) << "read_record_batch, current group id:" << _current_group << 
" current line of group:" 
+        << _current_line_of_group << " is larger than rows group size:"
+        << _rows_of_group << ". start to read next row group";
         _current_group++;
         if (_current_group >= _total_groups) {// read completed.
             _parquet_column_ids.clear();
@@ -177,6 +180,9 @@ Status ParquetReaderWrap::read_record_batch(const 
std::vector<SlotDescriptor*>&
         }
         _current_line_of_batch = 0;
     } else if (_current_line_of_batch >= _batch->num_rows()) {
+        VLOG(7) << "read_record_batch, current group id:" << _current_group << 
" current line of batch:" 
+        << _current_line_of_batch << " is larger than batch size:"
+        << _batch->num_rows() << ". start to read next batch";
         arrow::Status status = _rb_batch->ReadNext(&_batch);
         if (!status.ok()) {
             return Status::InternalError("Read Batch Error With Libarrow.");
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 375d90f..cb22687 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -29,6 +29,7 @@
 #include "exec/text_converter.hpp"
 #include "exec/local_file_reader.h"
 #include "exec/broker_reader.h"
+#include "exec/buffered_reader.h"
 #include "exec/decompressor.h"
 #include "exec/parquet_reader.h"
 
@@ -119,8 +120,8 @@ Status ParquetScanner::open_next_reader() {
                 int64_t file_size = 0;
                 // for compatibility
                 if (range.__isset.file_size) { file_size = range.file_size; }
-                file_reader.reset(new BrokerReader(_state->exec_env(), 
_broker_addresses, _params.properties,
-                                               range.path, range.start_offset, 
file_size));
+                file_reader.reset(new BufferedReader(new 
BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
+                                               range.path, range.start_offset, 
file_size)));
                 break;
             }
 #if 0
diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt
index 2021480..9c49667 100644
--- a/be/test/exec/CMakeLists.txt
+++ b/be/test/exec/CMakeLists.txt
@@ -51,6 +51,7 @@ ADD_BE_TEST(broker_scanner_test)
 ADD_BE_TEST(broker_scan_node_test)
 ADD_BE_TEST(tablet_info_test)
 ADD_BE_TEST(tablet_sink_test)
+ADD_BE_TEST(buffered_reader_test)
 # ADD_BE_TEST(es_scan_node_test)
 ADD_BE_TEST(es_http_scan_node_test)
 ADD_BE_TEST(es_predicate_test)
diff --git a/be/test/exec/buffered_reader_test.cpp 
b/be/test/exec/buffered_reader_test.cpp
new file mode 100644
index 0000000..3c0bbeb
--- /dev/null
+++ b/be/test/exec/buffered_reader_test.cpp
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "exec/local_file_reader.h"
+#include "exec/buffered_reader.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+class BufferedReaderTest : public testing::Test {
+public:
+    BufferedReaderTest() {}
+
+protected:
+    virtual void SetUp() {
+    }
+    virtual void TearDown() {
+    }
+};
+
+TEST_F(BufferedReaderTest, normal_use) {
+    // buffered_reader_test_file 950 bytes
+    LocalFileReader file_reader(
+            
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0);
+    BufferedReader reader(&file_reader, 1024);
+    auto st = reader.open();
+    ASSERT_TRUE(st.ok());
+    uint8_t buf[1024];
+    MonotonicStopWatch watch;
+    watch.start();
+    int64_t read_length = 0;
+    st = reader.readat(0, 1024, &read_length, buf);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(950, read_length);
+    LOG(INFO) << "read bytes " << read_length << " using time " << 
watch.elapsed_time();
+}
+
+TEST_F(BufferedReaderTest, test_validity) {
+    // buffered_reader_test_file.txt 45 bytes
+    LocalFileReader file_reader(
+            
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
+    BufferedReader reader(&file_reader, 64);
+    auto st = reader.open();
+    ASSERT_TRUE(st.ok());
+    uint8_t buf[10];
+    bool eof = false;
+    size_t buf_len = 10;
+
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
+    ASSERT_FALSE(eof);
+
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("vxzAbCdEfG", std::string((char*)buf, buf_len).c_str());
+    ASSERT_FALSE(eof);
+
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, buf_len).c_str());
+    ASSERT_FALSE(eof);
+
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("rStUvWxYz\n", std::string((char*)buf, buf_len).c_str());
+    ASSERT_FALSE(eof);
+
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("IjKl", std::string((char*)buf, 4).c_str());
+    ASSERT_FALSE(eof);
+
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(eof);
+}
+
+TEST_F(BufferedReaderTest, test_seek) {
+    // buffered_reader_test_file.txt 45 bytes
+    LocalFileReader file_reader(
+            
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
+    BufferedReader reader(&file_reader, 64);
+    auto st = reader.open();
+    ASSERT_TRUE(st.ok());
+    uint8_t buf[10];
+    bool eof = false;
+    size_t buf_len = 10;
+
+    // Seek to the end of the file
+    st = reader.seek(45);
+    ASSERT_TRUE(st.ok());
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(eof);
+
+    // Seek to the beginning of the file
+    st = reader.seek(0);
+    ASSERT_TRUE(st.ok());
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
+    ASSERT_FALSE(eof);
+
+    // Seek to a wrong position
+    st = reader.seek(-1);
+    ASSERT_TRUE(st.ok());
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
+    ASSERT_FALSE(eof);
+
+    // Seek to a wrong position
+    st = reader.seek(-1000);
+    ASSERT_TRUE(st.ok());
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
+    ASSERT_FALSE(eof);
+
+    // Seek to a wrong position
+    st = reader.seek(1000);
+    ASSERT_TRUE(st.ok());
+    st = reader.read(buf, &buf_len, &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(eof);
+}
+
+TEST_F(BufferedReaderTest, test_miss) {
+    // buffered_reader_test_file.txt 45 bytes
+    LocalFileReader file_reader(
+            
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
+    BufferedReader reader(&file_reader, 64);
+    auto st = reader.open();
+    ASSERT_TRUE(st.ok());
+    uint8_t buf[128];
+    int64_t bytes_read;
+
+    st = reader.readat(20, 10, &bytes_read, buf);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, 
(size_t)bytes_read).c_str());
+    ASSERT_EQ(10, bytes_read);
+
+    st = reader.readat(0, 5, &bytes_read, buf);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("bdfhj", std::string((char*)buf, (size_t)bytes_read).c_str());
+    ASSERT_EQ(5, bytes_read);
+
+    st = reader.readat(5, 10, &bytes_read, buf);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("lnprtvxzAb", std::string((char*)buf, 
(size_t)bytes_read).c_str());
+    ASSERT_EQ(10, bytes_read);
+
+    // if requested length is larger than the capacity of buffer, do not
+    // need to copy the character into local buffer.
+    st = reader.readat(0, 128, &bytes_read, buf);
+    ASSERT_TRUE(st.ok());
+    ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str());
+    ASSERT_EQ(45, bytes_read);
+}
+
+} // end namespace doris
+
+int main(int argc, char **argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file 
b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file
new file mode 100644
index 0000000..88f4883
Binary files /dev/null and 
b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file differ
diff --git 
a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt 
b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt
new file mode 100644
index 0000000..e0e5fb6
--- /dev/null
+++ b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt
@@ -0,0 +1,4 @@
+bdfhjlnprtvxzAbCdEfGhIj
+
+MnOpQrStUvWxYz
+IjKl
diff --git a/run-ut.sh b/run-ut.sh
index db5dd6d..42b361b 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -212,6 +212,7 @@ ${DORIS_TEST_BINARY_DIR}/exec/es_scan_reader_test
 ${DORIS_TEST_BINARY_DIR}/exec/es_query_builder_test
 ${DORIS_TEST_BINARY_DIR}/exec/tablet_info_test
 ${DORIS_TEST_BINARY_DIR}/exec/tablet_sink_test
+${DORIS_TEST_BINARY_DIR}/exec/buffered_reader_test
 
 # Running runtime Unittest
 ${DORIS_TEST_BINARY_DIR}/runtime/external_scan_context_mgr_test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to