This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c715209a7e [refactor](dpp) remove original dpp writer (#11838)
c715209a7e is described below

commit c715209a7e6de5c50bfe96f00597ca8eaef882e9
Author: yiguolei <[email protected]>
AuthorDate: Wed Aug 17 10:42:29 2022 +0800

    [refactor](dpp) remove original dpp writer (#11838)
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/runtime/CMakeLists.txt |   1 -
 be/src/runtime/dpp_writer.cpp | 266 ------------------------------------------
 be/src/runtime/dpp_writer.h   |  73 ------------
 gensrc/proto/olap_file.proto  |  15 ---
 4 files changed, 355 deletions(-)

diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 50ab78eda6..5ff765f137 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -54,7 +54,6 @@ set(RUNTIME_FILES
     collection_value.cpp
     tuple.cpp
     tuple_row.cpp
-    dpp_writer.cpp
     qsorter.cpp
     fragment_mgr.cpp
     dpp_sink_internal.cpp
diff --git a/be/src/runtime/dpp_writer.cpp b/be/src/runtime/dpp_writer.cpp
deleted file mode 100644
index 56971c2bf4..0000000000
--- a/be/src/runtime/dpp_writer.cpp
+++ /dev/null
@@ -1,266 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/dpp_writer.h"
-
-#include <stdio.h>
-
-#include <vector>
-
-#include "exprs/expr_context.h"
-#include "olap/utils.h"
-#include "runtime/primitive_type.h"
-#include "runtime/row_batch.h"
-#include "runtime/tuple_row.h"
-#include "util/debug_util.h"
-#include "util/types.h"
-
-namespace doris {
-
-DppWriter::DppWriter(int32_t schema_hash, const std::vector<ExprContext*>& 
output_expr_ctxs,
-                     FileHandler* fp)
-        : _schema_hash(schema_hash),
-          _output_expr_ctxs(output_expr_ctxs),
-          _fp(fp),
-          _buf(nullptr),
-          _end(nullptr),
-          _pos(nullptr),
-          _write_len(0),
-          _content_adler32(1) {
-    _num_null_slots = 0;
-    for (int i = 0; i < _output_expr_ctxs.size(); ++i) {
-        if (true == _output_expr_ctxs[i]->is_nullable()) {
-            _num_null_slots += 1;
-        }
-    }
-    _num_null_bytes = (_num_null_slots + 7) / 8;
-}
-
-DppWriter::~DppWriter() {
-    if (_buf) {
-        delete[] _buf;
-    }
-}
-
-Status DppWriter::open() {
-    // Write header
-    _header.mutable_message()->set_schema_hash(_schema_hash);
-    _header.prepare(_fp);
-    _content_adler32 = 1;
-    // seek to size()
-    _fp->seek(_header.size(), SEEK_SET);
-
-    // new buf
-    const int k_buf_len = 16 * 1024;
-    _buf = new char[k_buf_len];
-    _pos = _buf;
-    _end = _buf + k_buf_len;
-    return Status::OK();
-}
-
-void DppWriter::reset_buf() {
-    _pos = _buf;
-}
-
-void DppWriter::append_to_buf(const void* ptr, int len) {
-    if (_pos + len > _end) {
-        // enlarge
-        int cur_len = _pos - _buf;
-        int old_buf_len = _end - _buf;
-        int new_len = std::max(2 * old_buf_len, old_buf_len + len);
-        char* new_buf = new char[new_len];
-        memcpy(new_buf, _buf, cur_len);
-        delete[] _buf;
-        _buf = new_buf;
-        _pos = _buf + cur_len;
-        _end = _buf + new_len;
-    }
-
-    memcpy(_pos, ptr, len);
-    _pos += len;
-}
-
-void DppWriter::increase_buf(int len) {
-    //increase buf to store nullptr bytes
-    //len is the bytes of nullptr
-    if (_pos + len > _end) {
-        int cur_len = _pos - _buf;
-        int old_buf_len = _end - _buf;
-        int new_len = std::max(2 * old_buf_len, old_buf_len + len);
-        char* new_buf = new char[new_len];
-        memcpy(new_buf, _buf, cur_len);
-        delete[] _buf;
-        _buf = new_buf;
-        _pos = _buf + cur_len;
-        _end = _buf + new_len;
-    }
-
-    memset(_pos, 0, len);
-    _pos += len;
-}
-
-Status DppWriter::append_one_row(TupleRow* row) {
-    int num_columns = _output_expr_ctxs.size();
-    int off = 0;
-    int pos = _pos - _buf;
-    increase_buf(_num_null_bytes);
-    for (int i = 0; i < num_columns; ++i) {
-        char* position = _buf + pos;
-        void* item = _output_expr_ctxs[i]->get_value(row);
-        // What happened failed???
-        if (true == _output_expr_ctxs[i]->is_nullable()) {
-            int index = off % 8;
-            if (item == nullptr) {
-                //store nullptr bytes
-                position[off / 8] |= 1 << (7 - index);
-                off += 1;
-                continue;
-            } else {
-                position[off / 8] &= ~(1 << (7 - index));
-                off += 1;
-            }
-        }
-        switch (_output_expr_ctxs[i]->root()->type().type) {
-        case TYPE_TINYINT:
-            append_to_buf(item, 1);
-            break;
-        case TYPE_SMALLINT:
-            append_to_buf(item, 2);
-            break;
-        case TYPE_INT:
-            append_to_buf(item, 4);
-            break;
-        case TYPE_BIGINT:
-            append_to_buf(item, 8);
-            break;
-        case TYPE_LARGEINT:
-            append_to_buf(item, 16);
-            break;
-        case TYPE_FLOAT:
-            append_to_buf(item, 4);
-            break;
-        case TYPE_DOUBLE:
-            append_to_buf(item, 8);
-            break;
-        case TYPE_DATE: {
-            const DateTimeValue* time_val = (const DateTimeValue*)(item);
-            uint64_t val = time_val->to_olap_date();
-            uint8_t char_val = val & 0xff;
-            append_to_buf(&char_val, 1);
-            val >>= 8;
-            char_val = val & 0xff;
-            append_to_buf(&char_val, 1);
-            val >>= 8;
-            char_val = val & 0xff;
-            append_to_buf(&char_val, 1);
-            break;
-        }
-        case TYPE_DATETIME: {
-            const DateTimeValue* time_val = (const DateTimeValue*)(item);
-            uint64_t val = time_val->to_olap_datetime();
-            append_to_buf(&val, 8);
-            break;
-        }
-        case TYPE_VARCHAR: {
-        case TYPE_HLL:
-        case TYPE_STRING:
-            const StringValue* str_val = (const StringValue*)(item);
-            if (UNLIKELY(str_val->ptr == nullptr && str_val->len != 0)) {
-                return Status::InternalError("String value ptr is null");
-            }
-
-            // write len first
-            uint16_t len = str_val->len;
-            if (len != str_val->len) {
-                return Status::InternalError("length of string is 
overflow.len={}", str_val->len);
-            }
-            append_to_buf(&len, 2);
-            // passing a nullptr pointer to memcpy may be core/
-            if (len == 0) {
-                break;
-            }
-            append_to_buf(str_val->ptr, len);
-            break;
-        }
-        case TYPE_CHAR: {
-            const StringValue* str_val = (const StringValue*)(item);
-            if (UNLIKELY(str_val->ptr == nullptr || str_val->len == 0)) {
-                return Status::InternalError("String value ptr is null");
-            }
-            append_to_buf(str_val->ptr, str_val->len);
-            break;
-        }
-        case TYPE_DECIMALV2: {
-            const DecimalV2Value decimal_val(reinterpret_cast<const 
PackedInt128*>(item)->value);
-            int64_t int_val = decimal_val.int_value();
-            int32_t frac_val = decimal_val.frac_value();
-            append_to_buf(&int_val, sizeof(int_val));
-            append_to_buf(&frac_val, sizeof(frac_val));
-            break;
-        }
-        default: {
-            std::stringstream ss;
-            ss << "Unknown column type " << 
_output_expr_ctxs[i]->root()->type();
-            return Status::InternalError(ss.str());
-        }
-        }
-    }
-
-    return Status::OK();
-}
-
-Status DppWriter::add_batch(RowBatch* batch) {
-    int num_rows = batch->num_rows();
-    if (num_rows <= 0) {
-        return Status::OK();
-    }
-
-    Status status;
-    for (int i = 0; i < num_rows; ++i) {
-        reset_buf();
-        TupleRow* row = batch->get_row(i);
-        status = append_one_row(row);
-        if (!status.ok()) {
-            LOG(WARNING) << "convert row to dpp output failed. reason: " << 
status.get_error_msg();
-
-            return status;
-        }
-        int len = _pos - _buf;
-        Status olap_status = _fp->write(_buf, len);
-        if (!olap_status.ok()) {
-            return Status::InternalError("write to file failed.");
-        }
-        _content_adler32 = olap_adler32(_content_adler32, _buf, len);
-        _write_len += len;
-    }
-
-    return status;
-}
-
-Status DppWriter::write_header() {
-    _header.set_file_length(_header.size() + _write_len);
-    _header.set_checksum(_content_adler32);
-    _header.serialize(_fp);
-    return Status::OK();
-}
-
-Status DppWriter::close() {
-    // Write header
-    return write_header();
-}
-
-} // namespace doris
diff --git a/be/src/runtime/dpp_writer.h b/be/src/runtime/dpp_writer.h
deleted file mode 100644
index cdd0feb64d..0000000000
--- a/be/src/runtime/dpp_writer.h
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/status.h"
-#include "gen_cpp/olap_file.pb.h"
-#include "olap/file_helper.h"
-
-namespace doris {
-
-class ExprContext;
-class TupleRow;
-class RowBatch;
-
-// Class used to convert to DPP output
-// this is used for don't change code in storage.
-class DppWriter {
-public:
-    DppWriter(int32_t schema_hash, const std::vector<ExprContext*>& 
output_expr, FileHandler* fp);
-
-    ~DppWriter();
-
-    // Some prepare work complete here.
-    Status open();
-
-    // Add one batch to this writer
-    Status add_batch(RowBatch* row_batch);
-
-    // Close this writer
-    // Write needed header to this file.
-    Status close();
-
-private:
-    Status write_header();
-    void reset_buf();
-    void append_to_buf(const void* ptr, int len);
-    void increase_buf(int len);
-    Status append_one_row(TupleRow* row);
-
-    // pass by client, data write to this file
-    // owned by client, doesn't care about this
-    int32_t _schema_hash;
-    int _num_null_slots;
-    int _num_null_bytes;
-    const std::vector<ExprContext*>& _output_expr_ctxs;
-    FileHandler* _fp;
-
-    char* _buf;
-    char* _end;
-    char* _pos;
-
-    int64_t _write_len;
-    uint32_t _content_adler32;
-
-    FileHeader<OLAPRawDeltaHeaderMessage, int32_t, FileHandler> _header;
-};
-
-} // namespace doris
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 14bb5ecb5e..19bc56823a 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -287,21 +287,6 @@ message TabletMetaPB {
     optional bool enable_unique_key_merge_on_write = 24 [default = false];
 }
 
-message OLAPIndexHeaderMessage {
-    required int32 start_version = 1;
-    required int32 end_version = 2;
-    required int64 cumulative_version_hash = 3;
-
-    required uint32 segment = 4;
-    required uint32 num_rows_per_block = 5;
-    optional bool null_supported = 6;
-    optional bool delete_flag = 7;
-}
-
-message OLAPDataHeaderMessage {
-    required uint32 segment = 2;
-}
-
 message OLAPRawDeltaHeaderMessage {
     required int32 schema_hash = 2;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to