yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893223618


##########
be/src/vec/runtime/vfile_result_writer.cpp:
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "common/consts.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/raw_value.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/common/arena.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type 
storage_type,
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& 
output_expr_ctxs,
+                                     RuntimeProfile* parent_profile, 
BufferControlBlock* sinker,
+                                     Block* output_block, bool 
output_object_data,
+                                     const RowDescriptor& 
output_row_descriptor)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+}
+
+VFileResultWriter::~VFileResultWriter() {
+    _close_file_writer(true);
+}
+
+Status VFileResultWriter::init(RuntimeState* state) {
+    _state = state;
+    _init_profile();
+    return _create_next_file_writer();
+}
+
+void VFileResultWriter::_init_profile() {
+    RuntimeProfile* profile = 
_parent_profile->create_child("VFileResultWriter", true, true);
+    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", 
"AppendBatchTime");
+    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", 
"AppendBatchTime");
+    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", 
TUnit::UNIT);
+    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", 
TUnit::BYTES);
+}
+
+Status VFileResultWriter::_create_success_file() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_success_file_name(&file_name));
+    RETURN_IF_ERROR(_create_file_writer(file_name));
+    return _close_file_writer(true, true);
+}
+
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << _file_opts->success_file_name;
+    *file_name = ss.str();
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the 
file exists.
+        // Because the file path is currently arbitrarily specified by the 
user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + 
BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+Status VFileResultWriter::_create_next_file_writer() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_next_file_name(&file_name));
+    return _create_file_writer(file_name);
+}
+
+Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
+    } else if (_storage_type == TStorageBackendType::BROKER) {
+        _file_writer =
+                new BrokerWriter(_state->exec_env(), 
_file_opts->broker_addresses,
+                                 _file_opts->broker_properties, file_name, 0 
/*start offset*/);
+    } else if (_storage_type == TStorageBackendType::S3) {
+        _file_writer = new S3Writer(_file_opts->broker_properties, file_name, 
0 /* offset */);
+    } else if (_storage_type == TStorageBackendType::HDFS) {
+        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+                const_cast<std::map<std::string, 
std::string>&>(_file_opts->broker_properties),
+                file_name, &_file_writer));
+    }
+    RETURN_IF_ERROR(_file_writer->open());
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        // just use file writer is enough
+        break;
+    case TFileFormatType::FORMAT_PARQUET:
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+        break;
+    default:
+        return Status::InternalError(
+                strings::Substitute("unsupported file format: $0", 
_file_opts->file_format));
+    }
+    LOG(INFO) << "create file for exporting query result. file name: " << 
file_name
+              << ". query id: " << print_id(_state->query_id())
+              << " format:" << _file_opts->file_format;
+    return Status::OK();
+}
+
+// file name format as: my_prefix_{fragment_instance_id}_0.csv
+Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << 
(_file_idx++) << "."
+       << _file_format_to_name();
+    *file_name = ss.str();
+    _header_sent = false;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the 
file exists.
+        // Because the file path is currently arbitrarily specified by the 
user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + 
BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+// file url format as:
+// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
+// S3: {file_path}{fragment_instance_id}_
+// BROKER: {file_path}{fragment_instance_id}_
+
+Status VFileResultWriter::_get_file_url(std::string* file_url) {
+    std::stringstream ss;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        ss << "file:///" << BackendOptions::get_localhost();
+    }
+    ss << _file_opts->file_path;
+    ss << print_id(_fragment_instance_id) << "_";
+    *file_url = ss.str();
+    return Status::OK();
+}
+
+std::string VFileResultWriter::_file_format_to_name() {
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        return "csv";
+    case TFileFormatType::FORMAT_PARQUET:
+        return "parquet";
+    default:
+        return "unknown";
+    }
+}
+
+Status VFileResultWriter::append_block(Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(write_csv_header());
+    SCOPED_TIMER(_append_row_batch_timer);
+    if (_parquet_writer != nullptr) {
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+    } else {
+        RETURN_IF_ERROR(_write_csv_file(block));
+    }
+
+    _written_rows += block.rows();
+    return Status::OK();
+}
+
+Status VFileResultWriter::_write_csv_file(const Block& block) {
+    for (size_t i = 0; i < block.rows(); i++) {
+        for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+            auto col = block.get_by_position(col_id);
+            if (col.column->is_null_at(i)) {
+                _plain_text_outstream << NULL_IN_CSV;
+            } else {
+                switch (_output_expr_ctxs[col_id]->root()->type().type) {
+                case TYPE_BOOLEAN:
+                case TYPE_TINYINT:
+                    _plain_text_outstream << (int)*reinterpret_cast<const 
int8_t*>(
+                            col.column->get_data_at(i).data);
+                    break;
+                case TYPE_SMALLINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
int16_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_INT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
int32_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_BIGINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
int64_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_LARGEINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
__int128*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_FLOAT: {
+                    char buffer[MAX_FLOAT_STR_LENGTH + 2];
+                    float float_value =
+                            *reinterpret_cast<const 
float*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = FloatToBuffer(float_value, 
MAX_FLOAT_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt float failed, float value=" 
<< float_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    // To prevent loss of precision on float and double types,
+                    // they are converted to strings before output.
+                    // For example: For a double value 27361919854.929001,
+                    // the direct output of using std::stringstream is 
2.73619e+10,
+                    // and after conversion to a string, it outputs 
27361919854.929001
+                    char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+                    double double_value =
+                            *reinterpret_cast<const 
double*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = DoubleToBuffer(double_value, 
MAX_DOUBLE_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt double failed, double value=" 
<< double_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DATE:
+                case TYPE_DATETIME: {
+                    char buf[64];
+                    const VecDateTimeValue* time_val =
+                            (const 
VecDateTimeValue*)(col.column->get_data_at(i).data);
+                    time_val->to_string(buf);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_OBJECT:
+                case TYPE_HLL:
+                case TYPE_VARCHAR:
+                case TYPE_CHAR:
+                case TYPE_STRING: {
+                    auto value = col.column->get_data_at(i);
+                    _plain_text_outstream << value;
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    const DecimalV2Value decimal_val(
+                            reinterpret_cast<const 
PackedInt128*>(col.column->get_data_at(i).data)
+                                    ->value);
+                    std::string decimal_str;
+                    int output_scale = 
_output_expr_ctxs[col_id]->root()->output_scale();
+                    decimal_str = decimal_val.to_string(output_scale);
+                    _plain_text_outstream << decimal_str;
+                    break;
+                }
+                default: {
+                    // not supported type, like BITMAP, HLL, just export null
+                    _plain_text_outstream << NULL_IN_CSV;
+                }
+                }
+            }
+            if (col_id < block.columns() - 1) {
+                _plain_text_outstream << _file_opts->column_separator;
+            }
+        }
+        _plain_text_outstream << _file_opts->line_delimiter;
+    }
+
+    return _flush_plain_text_outstream(true);
+}
+
+std::string VFileResultWriter::gen_types() {
+    std::string types = "";
+    int num_columns = _output_expr_ctxs.size();
+    for (int i = 0; i < num_columns; ++i) {
+        types += type_to_string(_output_expr_ctxs[i]->root()->type().type);
+        if (i < num_columns - 1) {
+            types += _file_opts->column_separator;
+        }
+    }
+    types += _file_opts->line_delimiter;
+    return types;
+}
+
+Status VFileResultWriter::write_csv_header() {
+    if (!_header_sent && _header.size() > 0) {
+        std::string tmp_header = _header;
+        if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+            tmp_header += gen_types();
+        }
+        size_t written_len = 0;
+        RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const 
uint8_t*>(tmp_header.c_str()),
+                                            tmp_header.size(), &written_len));
+        _header_sent = true;
+    }
+    return Status::OK();
+}
+
+Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
+    SCOPED_TIMER(_file_write_timer);
+    size_t pos = _plain_text_outstream.tellp();
+    if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
+        return Status::OK();
+    }
+
+    const std::string& buf = _plain_text_outstream.str();
+    size_t written_len = 0;
+    RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const 
uint8_t*>(buf.c_str()), buf.size(),
+                                        &written_len));
+    COUNTER_UPDATE(_written_data_bytes, written_len);
+    _current_written_bytes += written_len;
+
+    // clear the stream
+    _plain_text_outstream.str("");
+    _plain_text_outstream.clear();
+
+    // split file if exceed limit
+    return _create_new_file_if_exceed_size();
+}
+
+Status VFileResultWriter::_create_new_file_if_exceed_size() {
+    if (_current_written_bytes < _file_opts->max_file_size_bytes) {
+        return Status::OK();
+    }
+    // current file size exceed the max file size. close this file
+    // and create new one
+    {
+        SCOPED_TIMER(_writer_close_timer);
+        RETURN_IF_ERROR(_close_file_writer(false));
+    }
+    _current_written_bytes = 0;
+    return Status::OK();
+}
+
+Status VFileResultWriter::_close_file_writer(bool done, bool only_close) {

Review Comment:
   maybe we should split this method a some little method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to