yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893223618

@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//   http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "vec/runtime/vfile_result_writer.h"
+#include "common/consts.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/raw_value.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/common/arena.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type 
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& 
+                                     RuntimeProfile* parent_profile, 
BufferControlBlock* sinker,
+                                     Block* output_block, bool 
+                                     const RowDescriptor& 
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+VFileResultWriter::~VFileResultWriter() {
+    _close_file_writer(true);
+Status VFileResultWriter::init(RuntimeState* state) {
+    _state = state;
+    _init_profile();
+    return _create_next_file_writer();
+void VFileResultWriter::_init_profile() {
+    RuntimeProfile* profile = 
_parent_profile->create_child("VFileResultWriter", true, true);
+    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", 
+    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", 
+    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", 
+    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", 
+Status VFileResultWriter::_create_success_file() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_success_file_name(&file_name));
+    RETURN_IF_ERROR(_create_file_writer(file_name));
+    return _close_file_writer(true, true);
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << _file_opts->success_file_name;
+    *file_name = ss.str();
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the 
file exists.
+        // Because the file path is currently arbitrarily specified by the 
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + 
+        }
+    }
+    return Status::OK();
+Status VFileResultWriter::_create_next_file_writer() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_next_file_name(&file_name));
+    return _create_file_writer(file_name);
+Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
+    } else if (_storage_type == TStorageBackendType::BROKER) {
+        _file_writer =
+                new BrokerWriter(_state->exec_env(), 
+                                 _file_opts->broker_properties, file_name, 0 
/*start offset*/);
+    } else if (_storage_type == TStorageBackendType::S3) {
+        _file_writer = new S3Writer(_file_opts->broker_properties, file_name, 
0 /* offset */);
+    } else if (_storage_type == TStorageBackendType::HDFS) {
+        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+                const_cast<std::map<std::string, 
+                file_name, &_file_writer));
+    }
+    RETURN_IF_ERROR(_file_writer->open());
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        // just use file writer is enough
+        break;
+    case TFileFormatType::FORMAT_PARQUET:
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+        break;
+    default:
+        return Status::InternalError(
+                strings::Substitute("unsupported file format: $0", 
+    }
+    LOG(INFO) << "create file for exporting query result. file name: " << 
+              << ". query id: " << print_id(_state->query_id())
+              << " format:" << _file_opts->file_format;
+    return Status::OK();
+// file name format as: my_prefix_{fragment_instance_id}_0.csv
+Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << 
(_file_idx++) << "."
+       << _file_format_to_name();
+    *file_name = ss.str();
+    _header_sent = false;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the 
file exists.
+        // Because the file path is currently arbitrarily specified by the 
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + 
+        }
+    }
+    return Status::OK();
+// file url format as:
+// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
+// S3: {file_path}{fragment_instance_id}_
+// BROKER: {file_path}{fragment_instance_id}_
+Status VFileResultWriter::_get_file_url(std::string* file_url) {
+    std::stringstream ss;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        ss << "file:///" << BackendOptions::get_localhost();
+    }
+    ss << _file_opts->file_path;
+    ss << print_id(_fragment_instance_id) << "_";
+    *file_url = ss.str();
+    return Status::OK();
+std::string VFileResultWriter::_file_format_to_name() {
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        return "csv";
+    case TFileFormatType::FORMAT_PARQUET:
+        return "parquet";
+    default:
+        return "unknown";
+    }
+Status VFileResultWriter::append_block(Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(write_csv_header());
+    SCOPED_TIMER(_append_row_batch_timer);
+    if (_parquet_writer != nullptr) {
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+    } else {
+        RETURN_IF_ERROR(_write_csv_file(block));
+    }
+    _written_rows += block.rows();
+    return Status::OK();
+Status VFileResultWriter::_write_csv_file(const Block& block) {
+    for (size_t i = 0; i < block.rows(); i++) {
+        for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+            auto col = block.get_by_position(col_id);
+            if (col.column->is_null_at(i)) {
+                _plain_text_outstream << NULL_IN_CSV;
+            } else {
+                switch (_output_expr_ctxs[col_id]->root()->type().type) {
+                case TYPE_BOOLEAN:
+                case TYPE_TINYINT:
+                    _plain_text_outstream << (int)*reinterpret_cast<const 
+                            col.column->get_data_at(i).data);
+                    break;
+                case TYPE_SMALLINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
+                    break;
+                case TYPE_INT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
+                    break;
+                case TYPE_BIGINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
+                    break;
+                case TYPE_LARGEINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
+                    break;
+                case TYPE_FLOAT: {
+                    char buffer[MAX_FLOAT_STR_LENGTH + 2];
+                    float float_value =
+                            *reinterpret_cast<const 
+                    buffer[0] = '\0';
+                    int length = FloatToBuffer(float_value, 
+                    DCHECK(length >= 0) << "gcvt float failed, float value=" 
<< float_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    // To prevent loss of precision on float and double types,
+                    // they are converted to strings before output.
+                    // For example: For a double value 27361919854.929001,
+                    // the direct output of using std::stringstream is 
+                    // and after conversion to a string, it outputs 
+                    char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+                    double double_value =
+                            *reinterpret_cast<const 
+                    buffer[0] = '\0';
+                    int length = DoubleToBuffer(double_value, 
+                    DCHECK(length >= 0) << "gcvt double failed, double value=" 
<< double_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DATE:
+                case TYPE_DATETIME: {
+                    char buf[64];
+                    const VecDateTimeValue* time_val =
+                            (const 
+                    time_val->to_string(buf);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_OBJECT:
+                case TYPE_HLL:
+                case TYPE_VARCHAR:
+                case TYPE_CHAR:
+                case TYPE_STRING: {
+                    auto value = col.column->get_data_at(i);
+                    _plain_text_outstream << value;
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    const DecimalV2Value decimal_val(
+                            reinterpret_cast<const 
+                                    ->value);
+                    std::string decimal_str;
+                    int output_scale = 
+                    decimal_str = decimal_val.to_string(output_scale);
+                    _plain_text_outstream << decimal_str;
+                    break;
+                }
+                default: {
+                    // not supported type, like BITMAP, HLL, just export null
+                    _plain_text_outstream << NULL_IN_CSV;
+                }
+                }
+            }
+            if (col_id < block.columns() - 1) {
+                _plain_text_outstream << _file_opts->column_separator;
+            }
+        }
+        _plain_text_outstream << _file_opts->line_delimiter;
+    }
+    return _flush_plain_text_outstream(true);
+std::string VFileResultWriter::gen_types() {
+    std::string types = "";
+    int num_columns = _output_expr_ctxs.size();
+    for (int i = 0; i < num_columns; ++i) {
+        types += type_to_string(_output_expr_ctxs[i]->root()->type().type);
+        if (i < num_columns - 1) {
+            types += _file_opts->column_separator;
+        }
+    }
+    types += _file_opts->line_delimiter;
+    return types;
+Status VFileResultWriter::write_csv_header() {
+    if (!_header_sent && _header.size() > 0) {
+        std::string tmp_header = _header;
+        if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+            tmp_header += gen_types();
+        }
+        size_t written_len = 0;
+        RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const 
+                                            tmp_header.size(), &written_len));
+        _header_sent = true;
+    }
+    return Status::OK();
+Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
+    SCOPED_TIMER(_file_write_timer);
+    size_t pos = _plain_text_outstream.tellp();
+    if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
+        return Status::OK();
+    }
+    const std::string& buf = _plain_text_outstream.str();
+    size_t written_len = 0;
+    RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const 
uint8_t*>(buf.c_str()), buf.size(),
+                                        &written_len));
+    COUNTER_UPDATE(_written_data_bytes, written_len);
+    _current_written_bytes += written_len;
+    // clear the stream
+    _plain_text_outstream.str("");
+    _plain_text_outstream.clear();
+    // split file if exceed limit
+    return _create_new_file_if_exceed_size();
+Status VFileResultWriter::_create_new_file_if_exceed_size() {
+    if (_current_written_bytes < _file_opts->max_file_size_bytes) {
+        return Status::OK();
+    }
+    // current file size exceed the max file size. close this file
+    // and create new one
+    {
+        SCOPED_TIMER(_writer_close_timer);
+        RETURN_IF_ERROR(_close_file_writer(false));
+    }
+    _current_written_bytes = 0;
+    return Status::OK();
+Status VFileResultWriter::_close_file_writer(bool done, bool only_close) {

Review Comment:
   maybe we should split this method a some little method.

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to