This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 94089b9192 [Refactor] Use file factory to replace create file reader/writer (#9505) 94089b9192 is described below commit 94089b919211403de7e6d8e664715d44ab4fd4f3 Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed Jun 8 15:07:39 2022 +0800 [Refactor] Use file factory to replace create file reader/writer (#9505) 1. Simplify code logic and improve abstraction 2. Fix the mem leak of raw pointer Co-authored-by: lihaopeng <lihaop...@baidu.com> --- be/src/exec/CMakeLists.txt | 22 ++-- be/src/exec/arrow/arrow_reader.cpp | 2 +- be/src/exec/arrow/orc_reader.cpp | 2 +- be/src/exec/arrow/parquet_reader.cpp | 6 +- be/src/exec/broker_scanner.cpp | 94 +++------------ be/src/exec/broker_scanner.h | 4 +- be/src/exec/json_scanner.cpp | 73 ++---------- be/src/exec/json_scanner.h | 5 +- be/src/exec/orc_scanner.cpp | 54 ++------- be/src/exec/parquet_scanner.cpp | 49 ++------ be/src/exec/parquet_writer.cpp | 11 +- be/src/exec/plain_binary_line_reader.cpp | 2 +- be/src/exec/plain_text_line_reader.cpp | 2 +- be/src/io/CMakeLists.txt | 46 +++++++ be/src/{exec => io}/broker_reader.cpp | 2 +- be/src/{exec => io}/broker_reader.h | 2 +- be/src/{exec => io}/broker_writer.cpp | 2 +- be/src/{exec => io}/broker_writer.h | 2 +- be/src/{exec => io}/buffered_reader.cpp | 2 +- be/src/{exec => io}/buffered_reader.h | 2 +- be/src/io/file_factory.cpp | 132 +++++++++++++++++++++ be/src/io/file_factory.h | 77 ++++++++++++ be/src/{exec => io}/file_reader.h | 0 be/src/{exec => io}/file_writer.h | 0 be/src/{exec => io}/hdfs_file_reader.cpp | 2 +- be/src/{exec => io}/hdfs_file_reader.h | 2 +- be/src/{exec => io}/hdfs_reader_writer.cpp | 11 +- be/src/{exec => io}/hdfs_reader_writer.h | 6 +- be/src/{exec => io}/hdfs_writer.cpp | 2 +- be/src/{exec => io}/hdfs_writer.h | 2 +- be/src/{exec => io}/local_file_reader.cpp | 2 +- be/src/{exec => io}/local_file_reader.h | 2 +- be/src/{exec => io}/local_file_writer.cpp | 20 +++- be/src/{exec => io}/local_file_writer.h | 7 +- be/src/{exec => io}/s3_reader.cpp | 2 +- be/src/{exec => io}/s3_reader.h | 2 +- be/src/{exec => io}/s3_writer.cpp | 2 +- be/src/{exec => io}/s3_writer.h | 2 +- be/src/runtime/export_sink.cpp | 53 ++------- be/src/runtime/file_result_writer.cpp | 40 ++----- be/src/runtime/file_result_writer.h | 2 +- be/src/runtime/routine_load/kafka_consumer_pipe.h | 2 +- be/src/runtime/stream_load/stream_load_pipe.h | 2 +- be/src/util/broker_load_error_hub.cpp | 2 +- be/src/util/broker_storage_backend.cpp | 4 +- be/src/vec/exec/varrow_scanner.cpp | 44 +------ be/src/vec/exec/vjson_scanner.cpp | 2 +- be/test/exec/broker_reader_test.cpp | 2 +- be/test/exec/broker_scan_node_test.cpp | 2 +- be/test/exec/broker_scanner_test.cpp | 2 +- be/test/exec/buffered_reader_test.cpp | 4 +- be/test/exec/hdfs_file_reader_test.cpp | 4 +- be/test/exec/json_scanner_test.cpp | 2 +- be/test/exec/json_scanner_with_jsonpath_test.cpp | 2 +- be/test/exec/multi_bytes_separator_test.cpp | 2 +- be/test/exec/orc_scanner_test.cpp | 2 +- be/test/exec/parquet_scanner_test.cpp | 2 +- be/test/exec/plain_text_line_reader_bzip_test.cpp | 2 +- be/test/exec/plain_text_line_reader_gzip_test.cpp | 2 +- .../exec/plain_text_line_reader_lz4frame_test.cpp | 2 +- be/test/exec/plain_text_line_reader_lzop_test.cpp | 2 +- .../plain_text_line_reader_uncompressed_test.cpp | 2 +- be/test/exec/s3_reader_test.cpp | 4 +- be/test/vec/exec/vbroker_scan_node_test.cpp | 2 +- be/test/vec/exec/vbroker_scanner_test.cpp | 2 +- be/test/vec/exec/vjson_scanner_test.cpp | 2 +- be/test/vec/exec/vorc_scanner_test.cpp | 2 +- be/test/vec/exec/vparquet_scanner_test.cpp | 2 +- 68 files changed, 427 insertions(+), 429 deletions(-) diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 7ce4c4caf3..5708eb0465 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -28,8 +28,7 @@ set(EXEC_FILES analytic_eval_node.cpp blocking_join_node.cpp broker_scan_node.cpp - broker_reader.cpp - buffered_reader.cpp + ../io/buffered_reader.cpp base_scanner.cpp broker_scanner.cpp cross_join_node.cpp @@ -42,7 +41,6 @@ set(EXEC_FILES exchange_node.cpp hash_join_node.cpp hash_table.cpp - local_file_reader.cpp merge_node.cpp scan_node.cpp select_node.cpp @@ -93,24 +91,28 @@ set(EXEC_FILES partitioned_hash_table.cc partitioned_aggregation_node.cc odbc_scan_node.cpp - local_file_writer.cpp - broker_writer.cpp parquet_scanner.cpp parquet_writer.cpp orc_scanner.cpp odbc_connector.cpp json_scanner.cpp assert_num_rows_node.cpp - s3_reader.cpp - s3_writer.cpp - hdfs_reader_writer.cpp + + ../io/local_file_reader.cpp + ../io/local_file_writer.cpp + ../io/broker_reader.cpp + ../io/broker_writer.cpp + ../io/s3_reader.cpp + ../io/s3_writer.cpp + ../io/hdfs_reader_writer.cpp + ../io/file_factory.cpp ) if (ARCH_AMD64) set(EXEC_FILES ${EXEC_FILES} - hdfs_file_reader.cpp - hdfs_writer.cpp + ../io/hdfs_file_reader.cpp + ../io/hdfs_writer.cpp ) endif() diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp index 789c66be2c..94289a990b 100644 --- a/be/src/exec/arrow/arrow_reader.cpp +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -21,9 +21,9 @@ #include <time.h> #include "common/logging.h" -#include "exec/file_reader.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/TPaloBrokerService.h" +#include "io/file_reader.h" #include "runtime/broker_mgr.h" #include "runtime/client_cache.h" #include "runtime/descriptors.h" diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp index 5815f008df..536b852ad6 100644 --- a/be/src/exec/arrow/orc_reader.cpp +++ b/be/src/exec/arrow/orc_reader.cpp @@ -21,7 +21,7 @@ #include <time.h> #include "common/logging.h" -#include "exec/file_reader.h" +#include "io/file_reader.h" #include "runtime/mem_pool.h" #include "runtime/tuple.h" diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp index 330d983c36..dad3e9e5e8 100644 --- a/be/src/exec/arrow/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -27,7 +27,11 @@ #include "common/logging.h" #include "common/status.h" -#include "exec/file_reader.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "io/file_reader.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" #include "runtime/descriptors.h" #include "runtime/mem_pool.h" #include "runtime/string_value.h" diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 2c21fd3d54..8942c4fd67 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -23,22 +23,20 @@ #include <sstream> #include "common/consts.h" -#include "exec/broker_reader.h" -#include "exec/buffered_reader.h" #include "exec/decompressor.h" #include "exec/exec_node.h" -#include "exec/hdfs_reader_writer.h" -#include "exec/local_file_reader.h" #include "exec/plain_binary_line_reader.h" #include "exec/plain_text_line_reader.h" -#include "exec/s3_reader.h" #include "exprs/expr.h" +#include "io/buffered_reader.h" +#include "io/file_factory.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/raw_value.h" #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/stream_load_pipe.h" #include "runtime/tuple.h" +#include "util/string_util.h" #include "util/utf8_check.h" namespace doris { @@ -109,11 +107,8 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo break; // break always } } - if (_scanner_eof) { - *eof = true; - } else { - *eof = false; - } + + *eof = _scanner_eof; return Status::OK(); } @@ -131,16 +126,6 @@ Status BrokerScanner::open_next_reader() { } Status BrokerScanner::open_file_reader() { - if (_cur_file_reader != nullptr) { - if (_stream_load_pipe != nullptr) { - _stream_load_pipe.reset(); - _cur_file_reader = nullptr; - } else { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } - } - const TBrokerRangeDesc& range = _ranges[_next_range]; int64_t start_offset = range.start_offset; if (start_offset != 0) { @@ -155,53 +140,11 @@ Status BrokerScanner::open_file_reader() { _skip_lines = 2; } } - switch (range.file_type) { - case TFileType::FILE_LOCAL: { - LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); - RETURN_IF_ERROR(file_reader->open()); - _cur_file_reader = file_reader; - break; - } - case TFileType::FILE_HDFS: { - FileReader* hdfs_file_reader; - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, start_offset, - &hdfs_file_reader)); - BufferedReader* file_reader = new BufferedReader(_profile, hdfs_file_reader); - RETURN_IF_ERROR(file_reader->open()); - _cur_file_reader = file_reader; - break; - } - case TFileType::FILE_BROKER: { - BrokerReader* broker_reader = - new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, - range.path, start_offset); - RETURN_IF_ERROR(broker_reader->open()); - _cur_file_reader = broker_reader; - break; - } - case TFileType::FILE_S3: { - BufferedReader* s3_reader = new BufferedReader( - _profile, new S3Reader(_params.properties, range.path, start_offset)); - RETURN_IF_ERROR(s3_reader->open()); - _cur_file_reader = s3_reader; - break; - } - case TFileType::FILE_STREAM: { - _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); - if (_stream_load_pipe == nullptr) { - VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id); - return Status::InternalError("unknown stream load id"); - } - _cur_file_reader = _stream_load_pipe.get(); - break; - } - default: { - std::stringstream ss; - ss << "Unknown file type, type=" << range.file_type; - return Status::InternalError(ss.str()); - } - } - return Status::OK(); + + RETURN_IF_ERROR(FileFactory::create_file_reader(range.file_type, _state->exec_env(), _profile, + _broker_addresses, _params.properties, range, + start_offset, _cur_file_reader)); + return _cur_file_reader->open(); } Status BrokerScanner::create_decompressor(TFileFormatType::type type) { @@ -280,11 +223,12 @@ Status BrokerScanner::open_line_reader() { case TFileFormatType::FORMAT_CSV_LZ4FRAME: case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: - _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor, - size, _line_delimiter, _line_delimiter_length); + _cur_line_reader = + new PlainTextLineReader(_profile, _cur_file_reader.get(), _cur_decompressor, size, + _line_delimiter, _line_delimiter_length); break; case TFileFormatType::FORMAT_PROTO: - _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader); + _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader.get()); break; default: { std::stringstream ss; @@ -309,16 +253,6 @@ void BrokerScanner::close() { delete _cur_line_reader; _cur_line_reader = nullptr; } - - if (_cur_file_reader != nullptr) { - if (_stream_load_pipe != nullptr) { - _stream_load_pipe.reset(); - _cur_file_reader = nullptr; - } else { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } - } } void BrokerScanner::split_line(const Slice& line) { diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index f10ce68518..379b1630cd 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -107,7 +107,7 @@ protected: int _line_delimiter_length; // Reader - FileReader* _cur_file_reader; + std::shared_ptr<FileReader> _cur_file_reader; LineReader* _cur_line_reader; Decompressor* _cur_decompressor; bool _cur_line_reader_eof; @@ -117,8 +117,6 @@ protected: // When we fetch range doesn't start from 0 will always skip the first line int _skip_lines; - // used to hold current StreamLoadPipe - std::shared_ptr<StreamLoadPipe> _stream_load_pipe; std::vector<Slice> _split_values; }; diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 0be3d4c089..5e7b8ba309 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -22,14 +22,12 @@ #include <algorithm> #include "env/env.h" -#include "exec/broker_reader.h" -#include "exec/buffered_reader.h" -#include "exec/local_file_reader.h" #include "exec/plain_text_line_reader.h" -#include "exec/s3_reader.h" #include "exprs/expr.h" #include "exprs/json_functions.h" #include "gutil/strings/split.h" +#include "io/buffered_reader.h" +#include "io/file_factory.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -67,7 +65,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* SCOPED_TIMER(_read_timer); // Get one line while (!_scanner_eof) { - if (_cur_file_reader == nullptr || _cur_reader_eof) { + if (!_cur_file_reader || _cur_reader_eof) { RETURN_IF_ERROR(open_next_reader()); // If there isn't any more reader, break this if (_scanner_eof) { @@ -124,16 +122,6 @@ Status JsonScanner::open_based_reader() { } Status JsonScanner::open_file_reader() { - if (_cur_file_reader != nullptr) { - if (_stream_load_pipe != nullptr) { - _stream_load_pipe.reset(); - _cur_file_reader = nullptr; - } else { - delete _cur_file_reader; - _cur_file_reader = nullptr; - } - } - const TBrokerRangeDesc& range = _ranges[_next_range]; int64_t start_offset = range.start_offset; if (start_offset != 0) { @@ -142,45 +130,12 @@ Status JsonScanner::open_file_reader() { if (range.__isset.read_json_by_line) { _read_json_by_line = range.read_json_by_line; } - switch (range.file_type) { - case TFileType::FILE_LOCAL: { - LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset); - RETURN_IF_ERROR(file_reader->open()); - _cur_file_reader = file_reader; - break; - } - case TFileType::FILE_BROKER: { - BrokerReader* broker_reader = - new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, - range.path, start_offset); - RETURN_IF_ERROR(broker_reader->open()); - _cur_file_reader = broker_reader; - break; - } - case TFileType::FILE_S3: { - BufferedReader* s3_reader = new BufferedReader( - _profile, new S3Reader(_params.properties, range.path, start_offset)); - RETURN_IF_ERROR(s3_reader->open()); - _cur_file_reader = s3_reader; - break; - } - case TFileType::FILE_STREAM: { - _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id); - if (_stream_load_pipe == nullptr) { - VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id); - return Status::InternalError("unknown stream load id"); - } - _cur_file_reader = _stream_load_pipe.get(); - break; - } - default: { - std::stringstream ss; - ss << "Unknown file type, type=" << range.file_type; - return Status::InternalError(ss.str()); - } - } + + RETURN_IF_ERROR(FileFactory::create_file_reader(range.file_type, _state->exec_env(), _profile, + _broker_addresses, _params.properties, range, + start_offset, _cur_file_reader)); _cur_reader_eof = false; - return Status::OK(); + return _cur_file_reader->open(); } Status JsonScanner::open_line_reader() { @@ -197,7 +152,7 @@ Status JsonScanner::open_line_reader() { } else { _skip_next_line = false; } - _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, nullptr, size, + _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size, _line_delimiter, _line_delimiter_length); _cur_reader_eof = false; return Status::OK(); @@ -224,7 +179,7 @@ Status JsonScanner::open_json_reader() { } else { _cur_json_reader = new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, _cur_file_reader); + fuzzy_parse, &_scanner_eof, _cur_file_reader.get()); } RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root)); @@ -264,14 +219,6 @@ void JsonScanner::close() { delete _cur_line_reader; _cur_line_reader = nullptr; } - if (_cur_file_reader != nullptr) { - if (_stream_load_pipe != nullptr) { - _stream_load_pipe.reset(); - } else { - delete _cur_file_reader; - } - _cur_file_reader = nullptr; - } } ////// class JsonDataInternal diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index 5dc61c18cb..981e242d96 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -89,7 +89,7 @@ protected: int _line_delimiter_length; // Reader - FileReader* _cur_file_reader; + std::shared_ptr<FileReader> _cur_file_reader; LineReader* _cur_line_reader; JsonReader* _cur_json_reader; bool _cur_reader_eof; @@ -98,9 +98,6 @@ protected: // When we fetch range doesn't start from 0, // we will read to one ahead, and skip the first line bool _skip_next_line; - - // used to hold current StreamLoadPipe - std::shared_ptr<StreamLoadPipe> _stream_load_pipe; }; class JsonDataInternal { diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 138eb729ef..c20aef7d05 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -17,21 +17,14 @@ #include "exec/orc_scanner.h" -#include "exec/broker_reader.h" -#include "exec/buffered_reader.h" -#include "exec/local_file_reader.h" -#include "exec/s3_reader.h" -#include "exprs/expr.h" -#include "runtime/descriptors.h" +#include "io/buffered_reader.h" +#include "io/file_factory.h" +#include "io/local_file_reader.h" #include "runtime/exec_env.h" #include "runtime/raw_value.h" #include "runtime/runtime_state.h" #include "runtime/tuple.h" -#if defined(__x86_64__) -#include "exec/hdfs_file_reader.h" -#endif - // orc include file didn't expose orc::TimezoneError // we have to declare it by hand, following is the source code in orc link // https://github.com/apache/orc/blob/84353fbfc447b06e0924024a8e03c1aaebd3e7a5/c%2B%2B/src/Timezone.hh#L104-L109 @@ -387,44 +380,11 @@ Status ORCScanner::open_next_reader() { } const TBrokerRangeDesc& range = _ranges[_next_range++]; std::unique_ptr<FileReader> file_reader; - switch (range.file_type) { - case TFileType::FILE_LOCAL: { - file_reader.reset(new LocalFileReader(range.path, range.start_offset)); - break; - } - case TFileType::FILE_BROKER: { - int64_t file_size = 0; - // for compatibility - if (range.__isset.file_size) { - file_size = range.file_size; - } - file_reader.reset(new BufferedReader( - _profile, - new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, - range.path, range.start_offset, file_size))); - break; - } - case TFileType::FILE_S3: { - file_reader.reset(new BufferedReader( - _profile, new S3Reader(_params.properties, range.path, range.start_offset))); - break; - } - case TFileType::FILE_HDFS: { -#if defined(__x86_64__) - file_reader.reset( - new HdfsFileReader(range.hdfs_params, range.path, range.start_offset)); - break; -#else - return Status::InternalError("HdfsFileReader do not support on non x86 platform"); -#endif - } - default: { - std::stringstream ss; - ss << "Unknown file type, type=" << range.file_type; - return Status::InternalError(ss.str()); - } - } + RETURN_IF_ERROR(FileFactory::create_file_reader( + range.file_type, _state->exec_env(), _profile, _broker_addresses, + _params.properties, range, range.start_offset, file_reader)); RETURN_IF_ERROR(file_reader->open()); + if (file_reader->size() == 0) { file_reader->close(); continue; diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 880313b6f0..a47e965de5 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -18,13 +18,13 @@ #include "exec/parquet_scanner.h" #include "exec/arrow/parquet_reader.h" -#include "exec/broker_reader.h" -#include "exec/buffered_reader.h" #include "exec/decompressor.h" -#include "exec/hdfs_reader_writer.h" -#include "exec/local_file_reader.h" -#include "exec/s3_reader.h" #include "exec/text_converter.h" +#include "exec/text_converter.hpp" +#include "exprs/expr.h" +#include "io/buffered_reader.h" +#include "io/file_factory.h" +#include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/raw_value.h" #include "runtime/stream_load/load_stream_mgr.h" @@ -101,42 +101,11 @@ Status ParquetScanner::open_next_reader() { } const TBrokerRangeDesc& range = _ranges[_next_range++]; std::unique_ptr<FileReader> file_reader; - switch (range.file_type) { - case TFileType::FILE_LOCAL: { - file_reader.reset(new LocalFileReader(range.path, range.start_offset)); - break; - } - case TFileType::FILE_HDFS: { - FileReader* reader; - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, - range.start_offset, &reader)); - file_reader.reset(reader); - break; - } - case TFileType::FILE_BROKER: { - int64_t file_size = 0; - // for compatibility - if (range.__isset.file_size) { - file_size = range.file_size; - } - file_reader.reset(new BufferedReader( - _profile, - new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, - range.path, range.start_offset, file_size))); - break; - } - case TFileType::FILE_S3: { - file_reader.reset(new BufferedReader( - _profile, new S3Reader(_params.properties, range.path, range.start_offset))); - break; - } - default: { - std::stringstream ss; - ss << "Unknown file type, type=" << range.file_type; - return Status::InternalError(ss.str()); - } - } + RETURN_IF_ERROR(FileFactory::create_file_reader( + range.file_type, _state->exec_env(), _profile, _broker_addresses, + _params.properties, range, range.start_offset, file_reader)); RETURN_IF_ERROR(file_reader->open()); + if (file_reader->size() == 0) { file_reader->close(); continue; diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp index befdb1cc54..5f1da82469 100644 --- a/be/src/exec/parquet_writer.cpp +++ b/be/src/exec/parquet_writer.cpp @@ -21,8 +21,17 @@ #include <arrow/status.h> #include <time.h> -#include "exec/file_writer.h" +#include "common/logging.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "io/file_writer.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/mem_pool.h" #include "util/mysql_global.h" +#include "util/thrift_util.h" #include "util/types.h" namespace doris { diff --git a/be/src/exec/plain_binary_line_reader.cpp b/be/src/exec/plain_binary_line_reader.cpp index 9cf1ff473f..f63671c622 100644 --- a/be/src/exec/plain_binary_line_reader.cpp +++ b/be/src/exec/plain_binary_line_reader.cpp @@ -18,7 +18,7 @@ #include "exec/plain_binary_line_reader.h" #include "common/status.h" -#include "exec/file_reader.h" +#include "io/file_reader.h" namespace doris { diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp index 8932e1c492..825982c5b1 100644 --- a/be/src/exec/plain_text_line_reader.cpp +++ b/be/src/exec/plain_text_line_reader.cpp @@ -19,7 +19,7 @@ #include "common/status.h" #include "exec/decompressor.h" -#include "exec/file_reader.h" +#include "io/file_reader.h" // INPUT_CHUNK must // larger than 15B for correct lz4 file decompressing diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt new file mode 100644 index 0000000000..1be0e05176 --- /dev/null +++ b/be/src/io/CMakeLists.txt @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/io") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/io") + +set(EXEC_FILES + buffered_reader.cpp + local_file_reader.cpp + local_file_writer.cpp + broker_reader.cpp + broker_writer.cpp + s3_reader.cpp + s3_writer.cpp + hdfs_reader_writer.cpp + file_factory.cpp +) + +if (ARCH_AMD64) + set(EXEC_FILES + ${EXEC_FILES} + hdfs_file_reader.cpp + hdfs_writer.cpp + ) +endif() + +add_library(IO STATIC + ${EXEC_FILES} +) diff --git a/be/src/exec/broker_reader.cpp b/be/src/io/broker_reader.cpp similarity index 99% rename from be/src/exec/broker_reader.cpp rename to be/src/io/broker_reader.cpp index 0d65bfc6f5..1745a7bbc2 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/io/broker_reader.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/broker_reader.h" +#include "broker_reader.h" #include <sstream> diff --git a/be/src/exec/broker_reader.h b/be/src/io/broker_reader.h similarity index 98% rename from be/src/exec/broker_reader.h rename to be/src/io/broker_reader.h index 05a4b24c79..94f86ca8df 100644 --- a/be/src/exec/broker_reader.h +++ b/be/src/io/broker_reader.h @@ -23,7 +23,7 @@ #include <string> #include "common/status.h" -#include "exec/file_reader.h" +#include "file_reader.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/Types_types.h" diff --git a/be/src/exec/broker_writer.cpp b/be/src/io/broker_writer.cpp similarity index 99% rename from be/src/exec/broker_writer.cpp rename to be/src/io/broker_writer.cpp index 58f9dd4931..3975abe664 100644 --- a/be/src/exec/broker_writer.cpp +++ b/be/src/io/broker_writer.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/broker_writer.h" +#include "broker_writer.h" #include <sstream> diff --git a/be/src/exec/broker_writer.h b/be/src/io/broker_writer.h similarity index 98% rename from be/src/exec/broker_writer.h rename to be/src/io/broker_writer.h index 9bb8c4c5a5..c4b8bac17c 100644 --- a/be/src/exec/broker_writer.h +++ b/be/src/io/broker_writer.h @@ -23,7 +23,7 @@ #include <string> #include "common/status.h" -#include "exec/file_writer.h" +#include "file_writer.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/Types_types.h" diff --git a/be/src/exec/buffered_reader.cpp b/be/src/io/buffered_reader.cpp similarity index 99% rename from be/src/exec/buffered_reader.cpp rename to be/src/io/buffered_reader.cpp index 5c599d41b7..5a80c4f842 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/io/buffered_reader.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/buffered_reader.h" +#include "buffered_reader.h" #include <algorithm> #include <sstream> diff --git a/be/src/exec/buffered_reader.h b/be/src/io/buffered_reader.h similarity index 98% rename from be/src/exec/buffered_reader.h rename to be/src/io/buffered_reader.h index 937e154ca7..8ffd5cd0ab 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/io/buffered_reader.h @@ -22,7 +22,7 @@ #include <memory> #include "common/status.h" -#include "exec/file_reader.h" +#include "file_reader.h" #include "olap/olap_define.h" #include "util/runtime_profile.h" diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp new file mode 100644 index 0000000000..f8c86a9146 --- /dev/null +++ b/be/src/io/file_factory.cpp @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "file_factory.h" + +#include "broker_reader.h" +#include "broker_writer.h" +#include "buffered_reader.h" +#include "hdfs_reader_writer.h" +#include "local_file_reader.h" +#include "local_file_writer.h" +#include "runtime/exec_env.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "s3_reader.h" +#include "s3_writer.h" + +doris::Status doris::FileFactory::create_file_writer( + TFileType::type type, doris::ExecEnv* env, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, const std::string& path, + int64_t start_offset, std::unique_ptr<FileWriter>& file_writer) { + switch (type) { + case TFileType::FILE_LOCAL: { + file_writer.reset(new LocalFileWriter(path, start_offset)); + break; + } + case TFileType::FILE_BROKER: { + file_writer.reset(new BrokerWriter(env, broker_addresses, properties, path, start_offset)); + break; + } + case TFileType::FILE_S3: { + file_writer.reset(new S3Writer(properties, path, start_offset)); + break; + } + case TFileType::FILE_HDFS: { + RETURN_IF_ERROR(HdfsReaderWriter::create_writer( + const_cast<std::map<std::string, std::string>&>(properties), path, file_writer)); + break; + } + default: + return Status::InternalError("UnSupport File Writer Type: " + std::to_string(type)); + } + + return Status::OK(); +} + +doris::Status doris::FileFactory::_new_file_reader( + doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, const TBrokerRangeDesc& range, + int64_t start_offset, FileReader*& file_reader) { + switch (type) { + case TFileType::FILE_LOCAL: { + file_reader = new LocalFileReader(range.path, start_offset); + break; + } + case TFileType::FILE_BROKER: { + file_reader = new BufferedReader( + profile, + new BrokerReader(env, broker_addresses, properties, range.path, start_offset, + range.__isset.file_size ? range.file_size : 0)); + break; + } + case TFileType::FILE_S3: { + file_reader = + new BufferedReader(profile, new S3Reader(properties, range.path, start_offset)); + break; + } + case TFileType::FILE_HDFS: { + FileReader* hdfs_reader = nullptr; + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, start_offset, + &hdfs_reader)); + file_reader = new BufferedReader(profile, hdfs_reader); + break; + } + default: + return Status::InternalError("UnSupport File Reader Type: " + std::to_string(type)); + } + + return Status::OK(); +} + +doris::Status doris::FileFactory::create_file_reader( + doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, const doris::TBrokerRangeDesc& range, + int64_t start_offset, std::unique_ptr<FileReader>& file_reader) { + if (type == TFileType::FILE_STREAM) { + return Status::InternalError("UnSupport UniquePtr For FileStream type"); + } + + FileReader* file_reader_ptr; + RETURN_IF_ERROR(_new_file_reader(type, env, profile, broker_addresses, properties, range, + start_offset, file_reader_ptr)); + file_reader.reset(file_reader_ptr); + + return Status::OK(); +} + +doris::Status doris::FileFactory::create_file_reader( + doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, const doris::TBrokerRangeDesc& range, + int64_t start_offset, std::shared_ptr<FileReader>& file_reader) { + if (type == TFileType::FILE_STREAM) { + file_reader = env->load_stream_mgr()->get(range.load_id); + if (!file_reader) { + VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id); + return Status::InternalError("unknown stream load id"); + } + } else { + FileReader* file_reader_ptr; + RETURN_IF_ERROR(_new_file_reader(type, env, profile, broker_addresses, properties, range, + start_offset, file_reader_ptr)); + file_reader.reset(file_reader_ptr); + } + return Status::OK(); +} diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h new file mode 100644 index 0000000000..36c2871599 --- /dev/null +++ b/be/src/io/file_factory.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "file_reader.h" +#include "file_writer.h" +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" + +namespace doris { +class ExecEnv; +class TNetworkAddress; +class RuntimeProfile; + +class FileFactory { +public: + static Status create_file_writer(TFileType::type type, ExecEnv* env, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, + const std::string& path, int64_t start_offset, + std::unique_ptr<FileWriter>& file_writer); + + // Because StreamLoadPipe use std::shared_ptr, here we have to support both unique_ptr + // and shared_ptr create_file_reader + static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, + const TBrokerRangeDesc& range, int64_t start_offset, + std::unique_ptr<FileReader>& file_reader); + + static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, + const TBrokerRangeDesc& range, int64_t start_offset, + std::shared_ptr<FileReader>& file_reader); + + static TFileType::type convert_storage_type(TStorageBackendType::type type) { + switch (type) { + case TStorageBackendType::LOCAL: + return TFileType::FILE_LOCAL; + case TStorageBackendType::S3: + return TFileType::FILE_S3; + case TStorageBackendType::BROKER: + return TFileType::FILE_BROKER; + case TStorageBackendType::HDFS: + return TFileType::FILE_HDFS; + default: + LOG(FATAL) << "not match type to convert, from type:" << type; + } + __builtin_unreachable(); + } + +private: + // Note: if the function return Status::OK() means new the file_reader. the caller + // should delete the memory of file_reader or use the smart_ptr to hold the own of file_reader + static Status _new_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile, + const std::vector<TNetworkAddress>& broker_addresses, + const std::map<std::string, std::string>& properties, + const TBrokerRangeDesc& range, int64_t start_offset, + FileReader*& file_reader); +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/file_reader.h b/be/src/io/file_reader.h similarity index 100% rename from be/src/exec/file_reader.h rename to be/src/io/file_reader.h diff --git a/be/src/exec/file_writer.h b/be/src/io/file_writer.h similarity index 100% rename from be/src/exec/file_writer.h rename to be/src/io/file_writer.h diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp similarity index 99% rename from be/src/exec/hdfs_file_reader.cpp rename to be/src/io/hdfs_file_reader.cpp index 25e3a09638..5d4dc2cc5e 100644 --- a/be/src/exec/hdfs_file_reader.cpp +++ b/be/src/io/hdfs_file_reader.cpp @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#include "exec/hdfs_file_reader.h" +#include "hdfs_file_reader.h" #include <sys/stat.h> #include <unistd.h> diff --git a/be/src/exec/hdfs_file_reader.h b/be/src/io/hdfs_file_reader.h similarity index 98% rename from be/src/exec/hdfs_file_reader.h rename to be/src/io/hdfs_file_reader.h index d4430de3e2..83c8efb7dc 100644 --- a/be/src/exec/hdfs_file_reader.h +++ b/be/src/io/hdfs_file_reader.h @@ -19,7 +19,7 @@ #include <hdfs/hdfs.h> -#include "exec/file_reader.h" +#include "file_reader.h" #include "gen_cpp/PlanNodes_types.h" namespace doris { diff --git a/be/src/exec/hdfs_reader_writer.cpp b/be/src/io/hdfs_reader_writer.cpp similarity index 84% rename from be/src/exec/hdfs_reader_writer.cpp rename to be/src/io/hdfs_reader_writer.cpp index 956fc9e40e..9a072512c0 100644 --- a/be/src/exec/hdfs_reader_writer.cpp +++ b/be/src/io/hdfs_reader_writer.cpp @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "exec/hdfs_reader_writer.h" +#include "hdfs_reader_writer.h" #if defined(__x86_64__) -#include "exec/hdfs_file_reader.h" -#include "exec/hdfs_writer.h" +#include "hdfs_file_reader.h" +#include "hdfs_writer.h" #endif namespace doris { @@ -35,9 +35,10 @@ Status HdfsReaderWriter::create_reader(const THdfsParams& hdfs_params, const std } Status HdfsReaderWriter::create_writer(std::map<std::string, std::string>& properties, - const std::string& path, FileWriter** writer) { + const std::string& path, + std::unique_ptr<FileWriter>& writer) { #if defined(__x86_64__) - *writer = new HDFSWriter(properties, path); + writer.reset(new HDFSWriter(properties, path)); return Status::OK(); #else return Status::InternalError("HdfsWriter do not support on non x86 platform"); diff --git a/be/src/exec/hdfs_reader_writer.h b/be/src/io/hdfs_reader_writer.h similarity index 91% rename from be/src/exec/hdfs_reader_writer.h rename to be/src/io/hdfs_reader_writer.h index e0decf2035..160306ffce 100644 --- a/be/src/exec/hdfs_reader_writer.h +++ b/be/src/io/hdfs_reader_writer.h @@ -17,8 +17,8 @@ #pragma once -#include "exec/file_reader.h" -#include "exec/file_writer.h" +#include "file_reader.h" +#include "file_writer.h" #include "gen_cpp/PlanNodes_types.h" namespace doris { @@ -35,7 +35,7 @@ public: int64_t start_offset, FileReader** reader); static Status create_writer(std::map<std::string, std::string>& properties, - const std::string& path, FileWriter** writer); + const std::string& path, std::unique_ptr<FileWriter>& writer); }; } // namespace doris diff --git a/be/src/exec/hdfs_writer.cpp b/be/src/io/hdfs_writer.cpp similarity index 99% rename from be/src/exec/hdfs_writer.cpp rename to be/src/io/hdfs_writer.cpp index a362488d8b..16b2516ca8 100644 --- a/be/src/exec/hdfs_writer.cpp +++ b/be/src/io/hdfs_writer.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/hdfs_writer.h" +#include "hdfs_writer.h" #include <filesystem> diff --git a/be/src/exec/hdfs_writer.h b/be/src/io/hdfs_writer.h similarity index 98% rename from be/src/exec/hdfs_writer.h rename to be/src/io/hdfs_writer.h index 8bc9060c2a..15d9a1fde8 100644 --- a/be/src/exec/hdfs_writer.h +++ b/be/src/io/hdfs_writer.h @@ -22,7 +22,7 @@ #include <map> #include <string> -#include "exec/file_writer.h" +#include "file_writer.h" namespace doris { class HDFSWriter : public FileWriter { diff --git a/be/src/exec/local_file_reader.cpp b/be/src/io/local_file_reader.cpp similarity index 99% rename from be/src/exec/local_file_reader.cpp rename to be/src/io/local_file_reader.cpp index d5c8454532..e8e2d38ea2 100644 --- a/be/src/exec/local_file_reader.cpp +++ b/be/src/io/local_file_reader.cpp @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#include "exec/local_file_reader.h" +#include "local_file_reader.h" #include <sys/stat.h> #include <unistd.h> diff --git a/be/src/exec/local_file_reader.h b/be/src/io/local_file_reader.h similarity index 98% rename from be/src/exec/local_file_reader.h rename to be/src/io/local_file_reader.h index 3224f94562..c525804395 100644 --- a/be/src/exec/local_file_reader.h +++ b/be/src/io/local_file_reader.h @@ -20,7 +20,7 @@ #define _FILE_OFFSET_BITS 64 #include <stdio.h> -#include "exec/file_reader.h" +#include "file_reader.h" namespace doris { diff --git a/be/src/exec/local_file_writer.cpp b/be/src/io/local_file_writer.cpp similarity index 75% rename from be/src/exec/local_file_writer.cpp rename to be/src/io/local_file_writer.cpp index 056d4b0cea..3c425a46aa 100644 --- a/be/src/exec/local_file_writer.cpp +++ b/be/src/io/local_file_writer.cpp @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "exec/local_file_writer.h" +#include "local_file_writer.h" +#include "service/backend_options.h" #include "util/error_util.h" +#include "util/file_utils.h" namespace doris { @@ -29,6 +31,8 @@ LocalFileWriter::~LocalFileWriter() { } Status LocalFileWriter::open() { + RETURN_IF_ERROR(_check_file_path(_path)); + _fp = fopen(_path.c_str(), "w+"); if (_fp == nullptr) { std::stringstream ss; @@ -72,4 +76,18 @@ Status LocalFileWriter::close() { return Status::OK(); } +Status LocalFileWriter::_check_file_path(const std::string& file_path) { + // For local file writer, the file_path is a local dir. + // Here we do a simple security verification by checking whether the file exists. + // Because the file path is currently arbitrarily specified by the user, + // Doris is not responsible for ensuring the correctness of the path. + // This is just to prevent overwriting the existing file. + if (FileUtils::check_exist(file_path)) { + return Status::InternalError("File already exists: " + file_path + + ". Host: " + BackendOptions::get_localhost()); + } + + return Status::OK(); +} + } // end namespace doris diff --git a/be/src/exec/local_file_writer.h b/be/src/io/local_file_writer.h similarity index 90% rename from be/src/exec/local_file_writer.h rename to be/src/io/local_file_writer.h index 63be5e9d79..7d9da485c3 100644 --- a/be/src/exec/local_file_writer.h +++ b/be/src/io/local_file_writer.h @@ -19,7 +19,7 @@ #include <stdio.h> -#include "exec/file_writer.h" +#include "file_writer.h" namespace doris { @@ -28,7 +28,8 @@ class RuntimeState; class LocalFileWriter : public FileWriter { public: LocalFileWriter(const std::string& path, int64_t start_offset); - virtual ~LocalFileWriter(); + + ~LocalFileWriter() override; Status open() override; @@ -37,6 +38,8 @@ public: virtual Status close() override; private: + static Status _check_file_path(const std::string& file_path); + std::string _path; int64_t _start_offset; FILE* _fp; diff --git a/be/src/exec/s3_reader.cpp b/be/src/io/s3_reader.cpp similarity index 99% rename from be/src/exec/s3_reader.cpp rename to be/src/io/s3_reader.cpp index 30e6daaa59..c932e2d886 100644 --- a/be/src/exec/s3_reader.cpp +++ b/be/src/io/s3_reader.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/s3_reader.h" +#include "s3_reader.h" #include <aws/s3/S3Client.h> #include <aws/s3/model/GetObjectRequest.h> diff --git a/be/src/exec/s3_reader.h b/be/src/io/s3_reader.h similarity index 98% rename from be/src/exec/s3_reader.h rename to be/src/io/s3_reader.h index 0de0b0944e..a1464324df 100644 --- a/be/src/exec/s3_reader.h +++ b/be/src/io/s3_reader.h @@ -20,7 +20,7 @@ #include <map> #include <string> -#include "exec/file_reader.h" +#include "file_reader.h" #include "util/s3_uri.h" namespace Aws { diff --git a/be/src/exec/s3_writer.cpp b/be/src/io/s3_writer.cpp similarity index 99% rename from be/src/exec/s3_writer.cpp rename to be/src/io/s3_writer.cpp index 8b44c621d5..df37e7d820 100644 --- a/be/src/exec/s3_writer.cpp +++ b/be/src/io/s3_writer.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/s3_writer.h" +#include "s3_writer.h" #include <aws/core/utils/FileSystemUtils.h> #include <aws/s3/S3Client.h> diff --git a/be/src/exec/s3_writer.h b/be/src/io/s3_writer.h similarity index 98% rename from be/src/exec/s3_writer.h rename to be/src/io/s3_writer.h index 09084ac67b..ae2756da08 100644 --- a/be/src/exec/s3_writer.h +++ b/be/src/io/s3_writer.h @@ -20,7 +20,7 @@ #include <map> #include <string> -#include "exec/file_writer.h" +#include "file_writer.h" #include "util/s3_uri.h" namespace Aws { diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp index 8b60d0f5f9..9f72c365de 100644 --- a/be/src/runtime/export_sink.cpp +++ b/be/src/runtime/export_sink.cpp @@ -21,13 +21,10 @@ #include <sstream> -#include "exec/broker_writer.h" -#include "exec/hdfs_reader_writer.h" -#include "exec/local_file_writer.h" -#include "exec/s3_writer.h" #include "exprs/expr.h" #include "exprs/expr_context.h" #include "gutil/strings/numbers.h" +#include "io/file_factory.h" #include "runtime/large_int_value.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" @@ -247,50 +244,14 @@ Status ExportSink::open_file_writer() { } std::string file_name = gen_file_name(); - // TODO(lingbin): gen file path - switch (_t_export_sink.file_type) { - case TFileType::FILE_LOCAL: { - LocalFileWriter* file_writer = - new LocalFileWriter(_t_export_sink.export_path + "/" + file_name, 0); - RETURN_IF_ERROR(file_writer->open()); - _file_writer.reset(file_writer); - break; - } - case TFileType::FILE_BROKER: { - BrokerWriter* broker_writer = new BrokerWriter( - _state->exec_env(), _t_export_sink.broker_addresses, _t_export_sink.properties, - _t_export_sink.export_path + "/" + file_name, 0 /* offset */); - RETURN_IF_ERROR(broker_writer->open()); - _file_writer.reset(broker_writer); - break; - } - case TFileType::FILE_S3: { - S3Writer* s3_writer = - new S3Writer(_t_export_sink.properties, - _t_export_sink.export_path + "/" + file_name, 0 /* offset */); - RETURN_IF_ERROR(s3_writer->open()); - _file_writer.reset(s3_writer); - break; - } - case TFileType::FILE_HDFS: { - FileWriter* hdfs_writer; - RETURN_IF_ERROR(HdfsReaderWriter::create_writer( - const_cast<std::map<std::string, std::string>&>(_t_export_sink.properties), - _t_export_sink.export_path + "/" + file_name, &hdfs_writer)); - RETURN_IF_ERROR(hdfs_writer->open()); - _file_writer.reset(hdfs_writer); - break; - } - default: { - std::stringstream ss; - ss << "Unknown file type, type=" << _t_export_sink.file_type; - return Status::InternalError(ss.str()); - } - } - + RETURN_IF_ERROR(FileFactory::create_file_writer( + _t_export_sink.file_type, _state->exec_env(), _t_export_sink.broker_addresses, + _t_export_sink.properties, _t_export_sink.export_path + "/" + file_name, 0, + _file_writer)); _state->add_export_output_file(_t_export_sink.export_path + "/" + file_name); - return Status::OK(); + + return _file_writer->open(); } // TODO(lingbin): add some other info to file name, like partition diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp index f2351476a3..8f4e0c0d03 100644 --- a/be/src/runtime/file_result_writer.cpp +++ b/be/src/runtime/file_result_writer.cpp @@ -18,15 +18,12 @@ #include "runtime/file_result_writer.h" #include "common/consts.h" -#include "exec/broker_writer.h" -#include "exec/hdfs_reader_writer.h" -#include "exec/local_file_writer.h" #include "exec/parquet_writer.h" -#include "exec/s3_writer.h" #include "exprs/expr_context.h" #include "gen_cpp/PaloInternalService_types.h" #include "gutil/strings/numbers.h" #include "gutil/strings/substitute.h" +#include "io/file_factory.h" #include "runtime/buffer_control_block.h" #include "runtime/large_int_value.h" #include "runtime/primitive_type.h" @@ -95,15 +92,6 @@ Status FileResultWriter::_get_success_file_name(std::string* file_name) { ss << _file_opts->file_path << _file_opts->success_file_name; *file_name = ss.str(); if (_storage_type == TStorageBackendType::LOCAL) { - // For local file writer, the file_path is a local dir. - // Here we do a simple security verification by checking whether the file exists. - // Because the file path is currently arbitrarily specified by the user, - // Doris is not responsible for ensuring the correctness of the path. - // This is just to prevent overwriting the existing file. - if (FileUtils::check_exist(*file_name)) { - return Status::InternalError("File already exists: " + *file_name + - ". Host: " + BackendOptions::get_localhost()); - } } return Status::OK(); @@ -116,26 +104,18 @@ Status FileResultWriter::_create_next_file_writer() { } Status FileResultWriter::_create_file_writer(const std::string& file_name) { - if (_storage_type == TStorageBackendType::LOCAL) { - _file_writer = new LocalFileWriter(file_name, 0 /* start offset */); - } else if (_storage_type == TStorageBackendType::BROKER) { - _file_writer = - new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses, - _file_opts->broker_properties, file_name, 0 /*start offset*/); - } else if (_storage_type == TStorageBackendType::S3) { - _file_writer = new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */); - } else if (_storage_type == TStorageBackendType::HDFS) { - RETURN_IF_ERROR(HdfsReaderWriter::create_writer( - const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties), - file_name, &_file_writer)); - } + RETURN_IF_ERROR(FileFactory::create_file_writer( + FileFactory::convert_storage_type(_storage_type), _state->exec_env(), + _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0, + _file_writer)); RETURN_IF_ERROR(_file_writer->open()); + switch (_file_opts->file_format) { case TFileFormatType::FORMAT_CSV_PLAIN: // just use file writer is enough break; case TFileFormatType::FORMAT_PARQUET: - _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs, + _parquet_writer = new ParquetWriterWrapper(_file_writer.get(), _output_expr_ctxs, _file_opts->file_properties, _file_opts->schema, _output_object_data); break; @@ -417,12 +397,8 @@ Status FileResultWriter::_close_file_writer(bool done, bool only_close) { COUNTER_UPDATE(_written_data_bytes, _current_written_bytes); delete _parquet_writer; _parquet_writer = nullptr; - delete _file_writer; - _file_writer = nullptr; - } else if (_file_writer != nullptr) { + } else if (_file_writer) { _file_writer->close(); - delete _file_writer; - _file_writer = nullptr; } if (only_close) { diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h index b8d9f71d77..03f53bd165 100644 --- a/be/src/runtime/file_result_writer.h +++ b/be/src/runtime/file_result_writer.h @@ -132,7 +132,7 @@ private: // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. - FileWriter* _file_writer = nullptr; + std::unique_ptr<FileWriter> _file_writer; // parquet file writer ParquetWriterWrapper* _parquet_writer = nullptr; // Used to buffer the export data of plain text diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.h b/be/src/runtime/routine_load/kafka_consumer_pipe.h index 6e7fc43eec..6d01bbe091 100644 --- a/be/src/runtime/routine_load/kafka_consumer_pipe.h +++ b/be/src/runtime/routine_load/kafka_consumer_pipe.h @@ -23,7 +23,7 @@ #include <string> #include <vector> -#include "exec/file_reader.h" +#include "io/file_reader.h" #include "librdkafka/rdkafka.h" #include "runtime/message_body_sink.h" #include "runtime/stream_load/stream_load_pipe.h" diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index d793d1cbd6..716a145f61 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -21,8 +21,8 @@ #include <deque> #include <mutex> -#include "exec/file_reader.h" #include "gen_cpp/internal_service.pb.h" +#include "io/file_reader.h" #include "runtime/message_body_sink.h" #include "util/bit_util.h" #include "util/byte_buffer.h" diff --git a/be/src/util/broker_load_error_hub.cpp b/be/src/util/broker_load_error_hub.cpp index 84b75d51f9..a8c0441b27 100644 --- a/be/src/util/broker_load_error_hub.cpp +++ b/be/src/util/broker_load_error_hub.cpp @@ -17,7 +17,7 @@ #include "util/broker_load_error_hub.h" -#include "exec/broker_writer.h" +#include "io/broker_writer.h" #include "util/defer_op.h" namespace doris { diff --git a/be/src/util/broker_storage_backend.cpp b/be/src/util/broker_storage_backend.cpp index c812e98f48..fc653bcb2c 100644 --- a/be/src/util/broker_storage_backend.cpp +++ b/be/src/util/broker_storage_backend.cpp @@ -18,13 +18,13 @@ #include "util/broker_storage_backend.h" #include "env/env.h" -#include "exec/broker_reader.h" -#include "exec/broker_writer.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/TPaloBrokerService.h" +#include "io/broker_reader.h" +#include "io/broker_writer.h" #include "olap/file_helper.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index b44475ce19..971f1b45bc 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -16,12 +16,8 @@ // under the License. #include "exec/arrow/parquet_reader.h" -#include "exec/broker_reader.h" -#include "exec/buffered_reader.h" -#include "exec/hdfs_reader_writer.h" -#include "exec/local_file_reader.h" -#include "exec/s3_reader.h" #include "exprs/expr.h" +#include "io/file_factory.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "vec/data_types/data_type_factory.hpp" @@ -61,41 +57,9 @@ Status VArrowScanner::_open_next_reader() { } const TBrokerRangeDesc& range = _ranges[_next_range++]; std::unique_ptr<FileReader> file_reader; - switch (range.file_type) { - case TFileType::FILE_LOCAL: { - file_reader.reset(new LocalFileReader(range.path, range.start_offset)); - break; - } - case TFileType::FILE_HDFS: { - FileReader* reader; - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, - range.start_offset, &reader)); - file_reader.reset(reader); - break; - } - case TFileType::FILE_BROKER: { - int64_t file_size = 0; - // for compatibility - if (range.__isset.file_size) { - file_size = range.file_size; - } - file_reader.reset(new BufferedReader( - _profile, - new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, - range.path, range.start_offset, file_size))); - break; - } - case TFileType::FILE_S3: { - file_reader.reset(new BufferedReader( - _profile, new S3Reader(_params.properties, range.path, range.start_offset))); - break; - } - default: { - std::stringstream ss; - ss << "Unknown file type, type=" << range.file_type; - return Status::InternalError(ss.str()); - } - } + RETURN_IF_ERROR(FileFactory::create_file_reader( + range.file_type, _state->exec_env(), _profile, _broker_addresses, + _params.properties, range, range.start_offset, file_reader)); RETURN_IF_ERROR(file_reader->open()); if (file_reader->size() == 0) { file_reader->close(); diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index 837b3ec289..1acd304e77 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -105,7 +105,7 @@ Status VJsonScanner::open_vjson_reader() { } else { _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, fuzzy_parse, &_scanner_eof, - _cur_file_reader)); + _cur_file_reader.get())); } RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root)); diff --git a/be/test/exec/broker_reader_test.cpp b/be/test/exec/broker_reader_test.cpp index c9f19279e4..0923b4edb4 100644 --- a/be/test/exec/broker_reader_test.cpp +++ b/be/test/exec/broker_reader_test.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/broker_reader.h" +#include "io/broker_reader.h" #include <gtest/gtest.h> diff --git a/be/test/exec/broker_scan_node_test.cpp b/be/test/exec/broker_scan_node_test.cpp index 18128200ea..bc7c91dff3 100644 --- a/be/test/exec/broker_scan_node_test.cpp +++ b/be/test/exec/broker_scan_node_test.cpp @@ -24,10 +24,10 @@ #include <vector> #include "common/object_pool.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" diff --git a/be/test/exec/broker_scanner_test.cpp b/be/test/exec/broker_scanner_test.cpp index 689e876754..0a65982058 100644 --- a/be/test/exec/broker_scanner_test.cpp +++ b/be/test/exec/broker_scanner_test.cpp @@ -24,10 +24,10 @@ #include <vector> #include "common/object_pool.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/mem_tracker.h" #include "runtime/runtime_state.h" diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp index 7fe6c47fee..940635a7f1 100644 --- a/be/test/exec/buffered_reader_test.cpp +++ b/be/test/exec/buffered_reader_test.cpp @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "exec/buffered_reader.h" +#include "io/buffered_reader.h" #include <gtest/gtest.h> -#include "exec/local_file_reader.h" +#include "io/local_file_reader.h" #include "util/stopwatch.hpp" namespace doris { diff --git a/be/test/exec/hdfs_file_reader_test.cpp b/be/test/exec/hdfs_file_reader_test.cpp index 6807272bac..a5e85f2bc8 100644 --- a/be/test/exec/hdfs_file_reader_test.cpp +++ b/be/test/exec/hdfs_file_reader_test.cpp @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "exec/hdfs_file_reader.h" +#include "io/hdfs_file_reader.h" #include <gtest/gtest.h> -#include "exec/hdfs_reader_writer.h" +#include "io/hdfs_reader_writer.h" namespace doris { diff --git a/be/test/exec/json_scanner_test.cpp b/be/test/exec/json_scanner_test.cpp index 582e26b7c5..744746f7b7 100644 --- a/be/test/exec/json_scanner_test.cpp +++ b/be/test/exec/json_scanner_test.cpp @@ -24,11 +24,11 @@ #include "common/object_pool.h" #include "exec/broker_scan_node.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "exprs/decimalv2_operators.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/row_batch.h" diff --git a/be/test/exec/json_scanner_with_jsonpath_test.cpp b/be/test/exec/json_scanner_with_jsonpath_test.cpp index 9db0bd37b4..0394de1035 100644 --- a/be/test/exec/json_scanner_with_jsonpath_test.cpp +++ b/be/test/exec/json_scanner_with_jsonpath_test.cpp @@ -24,10 +24,10 @@ #include "common/object_pool.h" #include "exec/broker_scan_node.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/row_batch.h" diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp index 3712f6b141..614d9f279f 100644 --- a/be/test/exec/multi_bytes_separator_test.cpp +++ b/be/test/exec/multi_bytes_separator_test.cpp @@ -23,10 +23,10 @@ #include "common/object_pool.h" #include "exec/broker_scanner.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/mem_tracker.h" #include "runtime/runtime_state.h" diff --git a/be/test/exec/orc_scanner_test.cpp b/be/test/exec/orc_scanner_test.cpp index f5f99924f2..cd1023e692 100644 --- a/be/test/exec/orc_scanner_test.cpp +++ b/be/test/exec/orc_scanner_test.cpp @@ -27,11 +27,11 @@ #include "common/object_pool.h" #include "exec/broker_scan_node.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "exprs/decimalv2_operators.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp index 8db72313ba..cbd673d4dc 100644 --- a/be/test/exec/parquet_scanner_test.cpp +++ b/be/test/exec/parquet_scanner_test.cpp @@ -24,10 +24,10 @@ #include "common/object_pool.h" #include "exec/broker_scan_node.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" diff --git a/be/test/exec/plain_text_line_reader_bzip_test.cpp b/be/test/exec/plain_text_line_reader_bzip_test.cpp index 747313c803..900f261673 100644 --- a/be/test/exec/plain_text_line_reader_bzip_test.cpp +++ b/be/test/exec/plain_text_line_reader_bzip_test.cpp @@ -18,8 +18,8 @@ #include <gtest/gtest.h> #include "exec/decompressor.h" -#include "exec/local_file_reader.h" #include "exec/plain_text_line_reader.h" +#include "io/local_file_reader.h" #include "util/runtime_profile.h" namespace doris { diff --git a/be/test/exec/plain_text_line_reader_gzip_test.cpp b/be/test/exec/plain_text_line_reader_gzip_test.cpp index 1e461e8b82..fea15d00c4 100644 --- a/be/test/exec/plain_text_line_reader_gzip_test.cpp +++ b/be/test/exec/plain_text_line_reader_gzip_test.cpp @@ -18,8 +18,8 @@ #include <gtest/gtest.h> #include "exec/decompressor.h" -#include "exec/local_file_reader.h" #include "exec/plain_text_line_reader.h" +#include "io/local_file_reader.h" #include "util/runtime_profile.h" namespace doris { diff --git a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp index 0ebd218b28..f6d0844455 100644 --- a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp +++ b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp @@ -18,8 +18,8 @@ #include <gtest/gtest.h> #include "exec/decompressor.h" -#include "exec/local_file_reader.h" #include "exec/plain_text_line_reader.h" +#include "io/local_file_reader.h" #include "util/runtime_profile.h" namespace doris { diff --git a/be/test/exec/plain_text_line_reader_lzop_test.cpp b/be/test/exec/plain_text_line_reader_lzop_test.cpp index f8f8b6090d..99aa6336e0 100644 --- a/be/test/exec/plain_text_line_reader_lzop_test.cpp +++ b/be/test/exec/plain_text_line_reader_lzop_test.cpp @@ -18,8 +18,8 @@ #include <gtest/gtest.h> #include "exec/decompressor.h" -#include "exec/local_file_reader.h" #include "exec/plain_text_line_reader.h" +#include "io/local_file_reader.h" #include "util/runtime_profile.h" namespace doris { diff --git a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp index b9361999b4..815d119ba1 100644 --- a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp +++ b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp @@ -18,8 +18,8 @@ #include <gtest/gtest.h> #include "exec/decompressor.h" -#include "exec/local_file_reader.h" #include "exec/plain_text_line_reader.h" +#include "io/local_file_reader.h" #include "util/runtime_profile.h" namespace doris { diff --git a/be/test/exec/s3_reader_test.cpp b/be/test/exec/s3_reader_test.cpp index 3c11a19180..d41a78975a 100644 --- a/be/test/exec/s3_reader_test.cpp +++ b/be/test/exec/s3_reader_test.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/s3_reader.h" +#include "io/s3_reader.h" #include <aws/core/Aws.h> #include <gtest/gtest.h> @@ -28,7 +28,7 @@ #include <string> #include <vector> -#include "exec/s3_writer.h" +#include "io/s3_writer.h" namespace doris { static const std::string AK = ""; diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp b/be/test/vec/exec/vbroker_scan_node_test.cpp index 0683cf4b7a..719d5014ea 100644 --- a/be/test/vec/exec/vbroker_scan_node_test.cpp +++ b/be/test/vec/exec/vbroker_scan_node_test.cpp @@ -23,13 +23,13 @@ #include <vector> #include "common/object_pool.h" -#include "exec/local_file_reader.h" #include "exprs/binary_predicate.h" #include "exprs/cast_functions.h" #include "exprs/literal.h" #include "exprs/slot_ref.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/mem_tracker.h" #include "runtime/primitive_type.h" diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index 428d82343c..27c4643050 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -23,10 +23,10 @@ #include <vector> #include "common/object_pool.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/mem_tracker.h" #include "runtime/runtime_state.h" diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp index 393ff80f16..b059faf010 100644 --- a/be/test/vec/exec/vjson_scanner_test.cpp +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -26,11 +26,11 @@ #include "common/object_pool.h" #include "exec/broker_scan_node.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "exprs/decimalv2_operators.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/row_batch.h" diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp index 92a9013786..e6b3b2a96f 100644 --- a/be/test/vec/exec/vorc_scanner_test.cpp +++ b/be/test/vec/exec/vorc_scanner_test.cpp @@ -26,12 +26,12 @@ #include <vector> #include "common/object_pool.h" -#include "exec/local_file_reader.h" #include "exec/orc_scanner.h" #include "exprs/cast_functions.h" #include "exprs/decimalv2_operators.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp index ba8f70ce70..6d3810cc73 100644 --- a/be/test/vec/exec/vparquet_scanner_test.cpp +++ b/be/test/vec/exec/vparquet_scanner_test.cpp @@ -23,10 +23,10 @@ #include <vector> #include "common/object_pool.h" -#include "exec/local_file_reader.h" #include "exprs/cast_functions.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" +#include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org