This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ec055e1acb [feature](new file reader) Integrate new file reader
(#15175)
ec055e1acb is described below
commit ec055e1acbcc040d97ed3d15b88af2a695a4078b
Author: Tiewei Fang <[email protected]>
AuthorDate: Mon Dec 26 08:55:52 2022 +0800
[feature](new file reader) Integrate new file reader (#15175)
---
be/src/http/action/stream_load.cpp | 13 +-
be/src/io/CMakeLists.txt | 3 +-
be/src/io/buffered_reader.cpp | 12 +-
be/src/io/buffered_reader.h | 5 +-
be/src/io/file_factory.cpp | 142 ++++++++++---
be/src/io/file_factory.h | 41 ++++
be/src/io/fs/hdfs_file_reader.cpp | 11 +-
be/src/io/fs/hdfs_file_system.cpp | 42 ++--
be/src/io/fs/hdfs_file_system.h | 2 +-
.../fs/kafka_consumer_pipe.h} | 11 +-
...m_load_pipe_reader.cpp => stream_load_pipe.cpp} | 53 +++--
...tream_load_pipe_reader.h => stream_load_pipe.h} | 16 +-
be/src/io/hdfs_reader_writer.cpp | 44 ----
be/src/runtime/exec_env.h | 9 +
be/src/runtime/fragment_mgr.cpp | 23 +--
be/src/runtime/fragment_mgr.h | 9 +-
be/src/runtime/routine_load/data_consumer.h | 2 -
.../runtime/routine_load/data_consumer_group.cpp | 12 +-
.../routine_load/routine_load_task_executor.cpp | 28 +--
be/src/runtime/stream_load/new_load_stream_mgr.cpp | 4 +-
be/src/runtime/stream_load/new_load_stream_mgr.h | 8 +-
be/src/runtime/stream_load/stream_load_context.h | 3 +-
be/src/util/hdfs_storage_backend.cpp | 5 +-
be/src/vec/CMakeLists.txt | 2 +-
be/src/vec/exec/format/csv/csv_reader.cpp | 65 ++++--
be/src/vec/exec/format/csv/csv_reader.h | 12 +-
.../exec/format/file_reader/new_file_factory.cpp | 201 ------------------
.../vec/exec/format/file_reader/new_file_factory.h | 115 -----------
.../file_reader/new_plain_binary_line_reader.cpp | 71 +++++++
.../file_reader/new_plain_binary_line_reader.h} | 27 ++-
.../file_reader/new_plain_text_line_reader.cpp | 18 +-
.../file_reader/new_plain_text_line_reader.h | 8 +-
be/src/vec/exec/format/json/new_json_reader.cpp | 79 ++++---
be/src/vec/exec/format/json/new_json_reader.h | 19 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 57 +++--
be/src/vec/exec/format/orc/vorc_reader.h | 17 +-
.../vec/exec/format/parquet/parquet_thrift_util.h | 18 +-
.../exec/format/parquet/vparquet_column_reader.cpp | 10 +-
.../exec/format/parquet/vparquet_column_reader.h | 8 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 2 +-
.../exec/format/parquet/vparquet_group_reader.h | 5 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 36 ++--
be/src/vec/exec/format/parquet/vparquet_reader.h | 8 +-
.../runtime/routine_load_task_executor_test.cpp | 22 +-
be/test/vec/exec/parquet/parquet_reader_test.cpp | 7 +-
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 78 ++++---
.../data/load_p0/stream_load/test_json_load.out | 165 ---------------
.../load_p0/stream_load/test_json_load.groovy | 230 ++-------------------
48 files changed, 708 insertions(+), 1070 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 5097de5c0d..6595bb6d90 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -40,16 +40,16 @@
#include "http/http_request.h"
#include "http/http_response.h"
#include "http/utils.h"
+#include "io/fs/stream_load_pipe.h"
#include "olap/storage_engine.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/plan_fragment_executor.h"
-#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_executor.h"
-#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/byte_buffer.h"
#include "util/debug_util.h"
@@ -396,10 +396,11 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req, StreamLoadContext*
request.__set_header_type(ctx->header_type);
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
- auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /*
max_buffered_bytes */,
- 64 * 1024 /*
min_chunk_size */,
- ctx->body_bytes /*
total_length */);
- RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
+ auto pipe = std::make_shared<io::StreamLoadPipe>(
+ kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /*
min_chunk_size */,
+ ctx->body_bytes /* total_length */);
+ RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe));
+
request.fileType = TFileType::FILE_STREAM;
ctx->body_sink = pipe;
} else {
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index c19188deb4..a8ea5ff232 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -28,7 +28,6 @@ set(IO_FILES
file_factory.cpp
hdfs_builder.cpp
hdfs_file_reader.cpp
- hdfs_reader_writer.cpp
hdfs_writer.cpp
local_file_reader.cpp
local_file_writer.cpp
@@ -45,7 +44,7 @@ set(IO_FILES
fs/hdfs_file_reader.cpp
fs/broker_file_system.cpp
fs/broker_file_reader.cpp
- fs/stream_load_pipe_reader.cpp
+ fs/stream_load_pipe.cpp
cache/dummy_file_cache.cpp
cache/file_cache.cpp
cache/file_cache_manager.cpp
diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index 021f0b9d23..d3944532cf 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -21,6 +21,7 @@
#include <sstream>
#include "common/config.h"
+#include "olap/iterators.h"
#include "olap/olap_define.h"
#include "util/bit_util.h"
@@ -185,7 +186,7 @@ bool BufferedReader::closed() {
return _reader->closed();
}
-BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t
offset,
+BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file,
uint64_t offset,
uint64_t length, size_t
max_buf_size)
: _file(file),
_file_start_offset(offset),
@@ -223,11 +224,12 @@ Status BufferedFileStreamReader::read_bytes(const
uint8_t** buf, uint64_t offset
int64_t has_read = 0;
SCOPED_RAW_TIMER(&_statistics.read_time);
while (has_read < to_read) {
- int64_t loop_read = 0;
- RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read -
has_read, &loop_read,
- _buf.get() + buf_remaining + has_read));
+ size_t loop_read = 0;
+ Slice resutl(_buf.get() + buf_remaining + has_read, to_read -
has_read);
+ IOContext io_context;
+ RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, resutl,
io_context, &loop_read));
_statistics.read_calls++;
- if (loop_read <= 0) {
+ if (loop_read == 0) {
break;
}
has_read += loop_read;
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index abcf24916e..503f5af5ae 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -23,6 +23,7 @@
#include "common/status.h"
#include "io/file_reader.h"
+#include "io/fs/file_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
@@ -113,7 +114,7 @@ protected:
class BufferedFileStreamReader : public BufferedStreamReader {
public:
- BufferedFileStreamReader(FileReader* file, uint64_t offset, uint64_t
length,
+ BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset,
uint64_t length,
size_t max_buf_size);
~BufferedFileStreamReader() override = default;
@@ -122,7 +123,7 @@ public:
private:
std::unique_ptr<uint8_t[]> _buf;
- FileReader* _file;
+ io::FileReaderSPtr _file;
uint64_t _file_start_offset;
uint64_t _file_end_offset;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 61dbfbfbf4..2c8b56ea50 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -17,22 +17,32 @@
#include "io/file_factory.h"
+#include "common/status.h"
#include "io/broker_reader.h"
#include "io/broker_writer.h"
#include "io/buffered_reader.h"
-#include "io/hdfs_reader_writer.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "io/hdfs_file_reader.h"
+#include "io/hdfs_writer.h"
#include "io/local_file_reader.h"
#include "io/local_file_writer.h"
#include "io/s3_reader.h"
#include "io/s3_writer.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
-doris::Status doris::FileFactory::create_file_writer(
- TFileType::type type, doris::ExecEnv* env,
- const std::vector<TNetworkAddress>& broker_addresses,
- const std::map<std::string, std::string>& properties, const
std::string& path,
- int64_t start_offset, std::unique_ptr<FileWriter>& file_writer) {
+namespace doris {
+
+Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
+ const std::vector<TNetworkAddress>&
broker_addresses,
+ const std::map<std::string,
std::string>& properties,
+ const std::string& path, int64_t
start_offset,
+ std::unique_ptr<FileWriter>&
file_writer) {
switch (type) {
case TFileType::FILE_LOCAL: {
file_writer.reset(new LocalFileWriter(path, start_offset));
@@ -47,7 +57,7 @@ doris::Status doris::FileFactory::create_file_writer(
break;
}
case TFileType::FILE_HDFS: {
- RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+ RETURN_IF_ERROR(create_hdfs_writer(
const_cast<std::map<std::string, std::string>&>(properties),
path, file_writer));
break;
}
@@ -60,11 +70,11 @@ doris::Status doris::FileFactory::create_file_writer(
// ============================
// broker scan node/unique ptr
-doris::Status doris::FileFactory::create_file_reader(
- doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile*
profile,
- const std::vector<TNetworkAddress>& broker_addresses,
- const std::map<std::string, std::string>& properties, const
doris::TBrokerRangeDesc& range,
- int64_t start_offset, std::unique_ptr<FileReader>& file_reader) {
+Status FileFactory::create_file_reader(TFileType::type type, ExecEnv* env,
RuntimeProfile* profile,
+ const std::vector<TNetworkAddress>&
broker_addresses,
+ const std::map<std::string,
std::string>& properties,
+ const TBrokerRangeDesc& range, int64_t
start_offset,
+ std::unique_ptr<FileReader>&
file_reader) {
FileReader* file_reader_ptr;
switch (type) {
case TFileType::FILE_LOCAL: {
@@ -85,8 +95,7 @@ doris::Status doris::FileFactory::create_file_reader(
}
case TFileType::FILE_HDFS: {
FileReader* hdfs_reader = nullptr;
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params,
range.path, start_offset,
- &hdfs_reader));
+ hdfs_reader = new HdfsFileReader(range.hdfs_params, range.path,
start_offset);
file_reader_ptr = new BufferedReader(profile, hdfs_reader);
break;
}
@@ -100,13 +109,12 @@ doris::Status doris::FileFactory::create_file_reader(
// ============================
// file scan node/unique ptr
-doris::Status doris::FileFactory::create_file_reader(RuntimeProfile* profile,
- const
TFileScanRangeParams& params,
- const std::string& path,
int64_t start_offset,
- int64_t file_size,
int64_t buffer_size,
-
std::unique_ptr<FileReader>& file_reader) {
+Status FileFactory::create_file_reader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
+ const std::string& path, int64_t
start_offset,
+ int64_t file_size, int64_t buffer_size,
+ std::unique_ptr<FileReader>&
file_reader) {
FileReader* file_reader_ptr;
- doris::TFileType::type type = params.file_type;
+ TFileType::type type = params.file_type;
switch (type) {
case TFileType::FILE_LOCAL: {
file_reader_ptr = new LocalFileReader(path, start_offset);
@@ -117,8 +125,7 @@ doris::Status
doris::FileFactory::create_file_reader(RuntimeProfile* profile,
break;
}
case TFileType::FILE_HDFS: {
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(params.hdfs_params,
path, start_offset,
- &file_reader_ptr));
+ file_reader_ptr = new HdfsFileReader(params.hdfs_params, path,
start_offset);
break;
}
case TFileType::FILE_BROKER: {
@@ -138,12 +145,99 @@ doris::Status
doris::FileFactory::create_file_reader(RuntimeProfile* profile,
return Status::OK();
}
+Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/,
+ const FileSystemProperties&
system_properties,
+ const FileDescription& file_description,
+ std::unique_ptr<io::FileSystem>*
file_system,
+ io::FileReaderSPtr* file_reader) {
+ TFileType::type type = system_properties.system_type;
+ io::FileSystem* file_system_ptr = nullptr;
+ switch (type) {
+ case TFileType::FILE_LOCAL: {
+ RETURN_IF_ERROR(
+
io::global_local_filesystem()->open_file(file_description.path, file_reader));
+ break;
+ }
+ case TFileType::FILE_S3: {
+ RETURN_IF_ERROR(create_s3_reader(system_properties.properties,
file_description.path,
+ &file_system_ptr, file_reader));
+ break;
+ }
+ case TFileType::FILE_HDFS: {
+ RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params,
file_description.path,
+ &file_system_ptr, file_reader));
+ break;
+ }
+ case TFileType::FILE_BROKER: {
+
RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0],
+ system_properties.properties,
file_description.path,
+ &file_system_ptr, file_reader));
+ break;
+ }
+ default:
+ return Status::NotSupported("unsupported file reader type: {}",
std::to_string(type));
+ }
+ file_system->reset(file_system_ptr);
+ return Status::OK();
+}
+
// file scan node/stream load pipe
-doris::Status doris::FileFactory::create_pipe_reader(const TUniqueId& load_id,
-
std::shared_ptr<FileReader>& file_reader) {
+Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
io::FileReaderSPtr* file_reader) {
+ *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
+ if (!(*file_reader)) {
+ return Status::InternalError("unknown stream load id: {}",
UniqueId(load_id).to_string());
+ }
+ return Status::OK();
+}
+
+Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
+ std::shared_ptr<FileReader>&
file_reader) {
file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id);
if (!file_reader) {
return Status::InternalError("unknown stream load id: {}",
UniqueId(load_id).to_string());
}
return Status::OK();
}
+
+Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const
std::string& path,
+ io::FileSystem** hdfs_file_system,
+ io::FileReaderSPtr* reader) {
+ *hdfs_file_system = new io::HdfsFileSystem(hdfs_params, "");
+
RETURN_IF_ERROR((dynamic_cast<io::HdfsFileSystem*>(*hdfs_file_system))->connect());
+ RETURN_IF_ERROR((*hdfs_file_system)->open_file(path, reader));
+ return Status::OK();
+}
+
+Status FileFactory::create_hdfs_writer(const std::map<std::string,
std::string>& properties,
+ const std::string& path,
+ std::unique_ptr<FileWriter>& writer) {
+ writer.reset(new HDFSWriter(properties, path));
+ return Status::OK();
+}
+
+Status FileFactory::create_s3_reader(const std::map<std::string, std::string>&
prop,
+ const std::string& path, io::FileSystem**
s3_file_system,
+ io::FileReaderSPtr* reader) {
+ S3URI s3_uri(path);
+ if (!s3_uri.parse()) {
+ return Status::InvalidArgument("s3 uri is invalid: {}", path);
+ }
+ S3Conf s3_conf;
+ RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri,
&s3_conf));
+ *s3_file_system = new io::S3FileSystem(s3_conf, "");
+
RETURN_IF_ERROR((dynamic_cast<io::S3FileSystem*>(*s3_file_system))->connect());
+ RETURN_IF_ERROR((*s3_file_system)->open_file(s3_uri.get_key(), reader));
+ return Status::OK();
+}
+
+Status FileFactory::create_broker_reader(const TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& prop,
+ const std::string& path,
+ io::FileSystem** broker_file_system,
+ io::FileReaderSPtr* reader) {
+ *broker_file_system = new io::BrokerFileSystem(broker_addr, prop);
+
RETURN_IF_ERROR((dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect());
+ RETURN_IF_ERROR((*broker_file_system)->open_file(path, reader));
+ return Status::OK();
+}
+} // namespace doris
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 48e22737eb..69d1158816 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -20,12 +20,29 @@
#include "gen_cpp/Types_types.h"
#include "io/file_reader.h"
#include "io/file_writer.h"
+#include "io/fs/file_reader.h"
namespace doris {
+namespace io {
+class FileSystem;
+}
class ExecEnv;
class TNetworkAddress;
class RuntimeProfile;
+struct FileSystemProperties {
+ TFileType::type system_type;
+ std::map<std::string, std::string> properties;
+ THdfsParams hdfs_params;
+ std::vector<TNetworkAddress> broker_addresses;
+};
+
+struct FileDescription {
+ std::string path;
+ int64_t start_offset;
+ size_t file_size;
+};
+
class FileFactory {
public:
// Create FileWriter
@@ -53,10 +70,34 @@ public:
int64_t file_size, int64_t buffer_size,
std::unique_ptr<FileReader>& file_reader);
+ static Status create_file_reader(RuntimeProfile* profile,
+ const FileSystemProperties&
system_properties,
+ const FileDescription& file_description,
+ std::unique_ptr<io::FileSystem>*
file_system,
+ io::FileReaderSPtr* file_reader);
+
// Create FileReader for stream load pipe
+ static Status create_pipe_reader(const TUniqueId& load_id,
io::FileReaderSPtr* file_reader);
+
+ // [deprecated] Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id,
std::shared_ptr<FileReader>& file_reader);
+ static Status create_hdfs_reader(const THdfsParams& hdfs_params, const
std::string& path,
+ io::FileSystem** hdfs_file_system,
io::FileReaderSPtr* reader);
+
+ static Status create_hdfs_writer(const std::map<std::string, std::string>&
properties,
+ const std::string& path,
std::unique_ptr<FileWriter>& writer);
+
+ static Status create_s3_reader(const std::map<std::string, std::string>&
prop,
+ const std::string& path, io::FileSystem**
s3_file_system,
+ io::FileReaderSPtr* reader);
+
+ static Status create_broker_reader(const TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& prop,
+ const std::string& path,
io::FileSystem** hdfs_file_system,
+ io::FileReaderSPtr* reader);
+
static TFileType::type convert_storage_type(TStorageBackendType::type
type) {
switch (type) {
case TStorageBackendType::LOCAL:
diff --git a/be/src/io/fs/hdfs_file_reader.cpp
b/be/src/io/fs/hdfs_file_reader.cpp
index ef03541387..39c9795e95 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -61,6 +61,14 @@ Status HdfsFileReader::read_at(size_t offset, Slice result,
const IOContext& /*i
return Status::IOError("offset exceeds file size(offset: {}, file
size: {}, path: {})",
offset, _file_size, _path.native());
}
+
+ auto handle = _fs->get_handle();
+ int res = hdfsSeek(handle->hdfs_fs, _hdfs_file, offset);
+ if (res != 0) {
+ return Status::InternalError("Seek to offset failed. (BE: {})
offset={}, err: {}",
+ BackendOptions::get_localhost(), offset,
hdfsGetLastError());
+ }
+
size_t bytes_req = result.size;
char* to = result.data;
bytes_req = std::min(bytes_req, _file_size - offset);
@@ -69,8 +77,7 @@ Status HdfsFileReader::read_at(size_t offset, Slice result,
const IOContext& /*i
return Status::OK();
}
- auto handle = _fs->get_handle();
- int64_t has_read = 0;
+ size_t has_read = 0;
while (has_read < bytes_req) {
int64_t loop_read =
hdfsRead(handle->hdfs_fs, _hdfs_file, to + has_read, bytes_req
- has_read);
diff --git a/be/src/io/fs/hdfs_file_system.cpp
b/be/src/io/fs/hdfs_file_system.cpp
index cafa7ca34c..b053aeebb4 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -63,14 +63,8 @@ private:
HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const
std::string& path)
: RemoteFileSystem(path, "", FileSystemType::HDFS),
_hdfs_params(hdfs_params),
- _path(path),
_fs_handle(nullptr) {
_namenode = _hdfs_params.fs_name;
- // if the format of _path is hdfs://ip:port/path, replace it to /path.
- // path like hdfs://ip:port/path can't be used by libhdfs3.
- if (_path.find(_namenode) != std::string::npos) {
- _path = _path.substr(_namenode.size());
- }
}
HdfsFileSystem::~HdfsFileSystem() {
@@ -101,9 +95,12 @@ Status HdfsFileSystem::create_file(const Path& /*path*/,
FileWriterPtr* /*writer
Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
CHECK_HDFS_HANDLE(_fs_handle);
- size_t file_len = -1;
+ size_t file_len = 0;
RETURN_IF_ERROR(file_size(path, &file_len));
- auto hdfs_file = hdfsOpenFile(_fs_handle->hdfs_fs, path.string().c_str(),
O_RDONLY, 0, 0, 0);
+
+ Path real_path = _covert_path(path);
+ auto hdfs_file =
+ hdfsOpenFile(_fs_handle->hdfs_fs, real_path.string().c_str(),
O_RDONLY, 0, 0, 0);
if (hdfs_file == nullptr) {
if (_fs_handle->from_cache) {
// hdfsFS may be disconnected if not used for a long time
@@ -130,9 +127,10 @@ Status HdfsFileSystem::open_file(const Path& path,
FileReaderSPtr* reader) {
Status HdfsFileSystem::delete_file(const Path& path) {
CHECK_HDFS_HANDLE(_fs_handle);
+ Path real_path = _covert_path(path);
// The recursive argument `is_recursive` is irrelevant if path is a file.
int is_recursive = 0;
- int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(),
is_recursive);
+ int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(),
is_recursive);
if (res == -1) {
return Status::InternalError("Failed to delete file {}",
path.string());
}
@@ -141,7 +139,8 @@ Status HdfsFileSystem::delete_file(const Path& path) {
Status HdfsFileSystem::create_directory(const Path& path) {
CHECK_HDFS_HANDLE(_fs_handle);
- int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, path.string().c_str());
+ Path real_path = _covert_path(path);
+ int res = hdfsCreateDirectory(_fs_handle->hdfs_fs,
real_path.string().c_str());
if (res == -1) {
return Status::InternalError("Failed to create directory {}",
path.string());
}
@@ -150,9 +149,10 @@ Status HdfsFileSystem::create_directory(const Path& path) {
Status HdfsFileSystem::delete_directory(const Path& path) {
CHECK_HDFS_HANDLE(_fs_handle);
+ Path real_path = _covert_path(path);
// delete in recursive mode
int is_recursive = 1;
- int res = hdfsDelete(_fs_handle->hdfs_fs, path.string().c_str(),
is_recursive);
+ int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(),
is_recursive);
if (res == -1) {
return Status::InternalError("Failed to delete directory {}",
path.string());
}
@@ -161,7 +161,8 @@ Status HdfsFileSystem::delete_directory(const Path& path) {
Status HdfsFileSystem::exists(const Path& path, bool* res) const {
CHECK_HDFS_HANDLE(_fs_handle);
- int is_exists = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str());
+ Path real_path = _covert_path(path);
+ int is_exists = hdfsExists(_fs_handle->hdfs_fs,
real_path.string().c_str());
if (is_exists == 0) {
*res = true;
} else {
@@ -172,7 +173,8 @@ Status HdfsFileSystem::exists(const Path& path, bool* res)
const {
Status HdfsFileSystem::file_size(const Path& path, size_t* file_size) const {
CHECK_HDFS_HANDLE(_fs_handle);
- hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs,
path.string().c_str());
+ Path real_path = _covert_path(path);
+ hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs,
real_path.string().c_str());
if (file_info == nullptr) {
return Status::InternalError("Failed to get file size of {}",
path.string());
}
@@ -183,9 +185,10 @@ Status HdfsFileSystem::file_size(const Path& path, size_t*
file_size) const {
Status HdfsFileSystem::list(const Path& path, std::vector<Path>* files) {
CHECK_HDFS_HANDLE(_fs_handle);
+ Path real_path = _covert_path(path);
int numEntries = 0;
hdfsFileInfo* file_info =
- hdfsListDirectory(_fs_handle->hdfs_fs, path.string().c_str(),
&numEntries);
+ hdfsListDirectory(_fs_handle->hdfs_fs, real_path.string().c_str(),
&numEntries);
if (file_info == nullptr) {
return Status::InternalError("Failed to list files/directors of {}",
path.string());
}
@@ -200,6 +203,17 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
return _fs_handle;
}
+Path HdfsFileSystem::_covert_path(const Path& path) const {
+ // if the format of path is hdfs://ip:port/path, replace it to /path.
+ // path like hdfs://ip:port/path can't be used by libhdfs3.
+ Path real_path(path);
+ if (path.string().find(_namenode) != std::string::npos) {
+ std::string real_path_str = path.string().substr(_namenode.size());
+ real_path = real_path_str;
+ }
+ return real_path;
+}
+
// ************* HdfsFileSystemCache ******************
int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index f86e73d1d3..01e8da58ca 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -119,9 +119,9 @@ public:
HdfsFileSystemHandle* get_handle();
private:
+ Path _covert_path(const Path& path) const;
const THdfsParams& _hdfs_params;
std::string _namenode;
- std::string _path;
// do not use std::shared_ptr or std::unique_ptr
// _fs_handle is managed by HdfsFileSystemCache
HdfsFileSystemHandle* _fs_handle;
diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe_reader.h
b/be/src/io/fs/kafka_consumer_pipe.h
similarity index 77%
rename from be/src/runtime/routine_load/kafka_consumer_pipe_reader.h
rename to be/src/io/fs/kafka_consumer_pipe.h
index 6555057c5f..6aab83c3b2 100644
--- a/be/src/runtime/routine_load/kafka_consumer_pipe_reader.h
+++ b/be/src/io/fs/kafka_consumer_pipe.h
@@ -17,17 +17,16 @@
#pragma once
-#include "io/fs/stream_load_pipe_reader.h"
+#include "io/fs/stream_load_pipe.h"
namespace doris {
namespace io {
-class KafkaConsumerPipeReader : public StreamLoadPipeReader {
+class KafkaConsumerPipe : public StreamLoadPipe {
public:
- KafkaConsumerPipeReader(size_t max_buffered_bytes = 1024 * 1024,
- size_t min_chunk_size = 64 * 1024)
- : StreamLoadPipeReader(max_buffered_bytes, min_chunk_size) {}
+ KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t
min_chunk_size = 64 * 1024)
+ : StreamLoadPipe(max_buffered_bytes, min_chunk_size) {}
- ~KafkaConsumerPipeReader() override = default;
+ ~KafkaConsumerPipe() override = default;
Status append_with_line_delimiter(const char* data, size_t size) {
Status st = append(data, size);
diff --git a/be/src/io/fs/stream_load_pipe_reader.cpp
b/be/src/io/fs/stream_load_pipe.cpp
similarity index 76%
rename from be/src/io/fs/stream_load_pipe_reader.cpp
rename to be/src/io/fs/stream_load_pipe.cpp
index 6cc7b0a985..cc5132478c 100644
--- a/be/src/io/fs/stream_load_pipe_reader.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -15,32 +15,34 @@
// specific language governing permissions and limitations
// under the License.
-#include "stream_load_pipe_reader.h"
+#include "stream_load_pipe.h"
#include <gen_cpp/internal_service.pb.h>
+#include "olap/iterators.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
namespace doris {
namespace io {
-StreamLoadPipeReader::StreamLoadPipeReader(size_t max_buffered_bytes, size_t
min_chunk_size,
- int64_t total_length, bool
use_proto)
+StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t
min_chunk_size,
+ int64_t total_length, bool use_proto)
: _buffered_bytes(0),
_proto_buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size),
+ _total_length(total_length),
_use_proto(use_proto) {}
-StreamLoadPipeReader::~StreamLoadPipeReader() {
+StreamLoadPipe::~StreamLoadPipe() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
while (!_buf_queue.empty()) {
_buf_queue.pop_front();
}
}
-Status StreamLoadPipeReader::read_at(size_t /*offset*/, Slice result, const
IOContext& /*io_ctx*/,
- size_t* bytes_read) {
+Status StreamLoadPipe::read_at(size_t /*offset*/, Slice result, const
IOContext& /*io_ctx*/,
+ size_t* bytes_read) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
*bytes_read = 0;
size_t bytes_req = result.size;
@@ -79,15 +81,38 @@ Status StreamLoadPipeReader::read_at(size_t /*offset*/,
Slice result, const IOCo
return Status::OK();
}
-Status StreamLoadPipeReader::append_and_flush(const char* data, size_t size,
- size_t proto_byte_size) {
+// If _total_length == -1, this should be a Kafka routine load task,
+// just get the next buffer directly from the buffer queue, because one buffer
contains a complete piece of data.
+// Otherwise, this should be a stream load task that needs to read the
specified amount of data.
+Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data,
size_t* length) {
+ if (_total_length < -1) {
+ return Status::InternalError("invalid, _total_length is: {}",
_total_length);
+ } else if (_total_length == 0) {
+ // no data
+ *length = 0;
+ return Status::OK();
+ }
+
+ if (_total_length == -1) {
+ return _read_next_buffer(data, length);
+ }
+
+ // _total_length > 0, read the entire data
+ data->reset(new uint8_t[_total_length]);
+ Slice result(data->get(), _total_length);
+ IOContext io_ctx;
+ Status st = read_at(0, result, io_ctx, length);
+ return st;
+}
+
+Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t
proto_byte_size) {
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size
+ 1));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
}
-Status StreamLoadPipeReader::append(const char* data, size_t size) {
+Status StreamLoadPipe::append(const char* data, size_t size) {
size_t pos = 0;
if (_write_buf != nullptr) {
if (size < _write_buf->remaining()) {
@@ -110,7 +135,7 @@ Status StreamLoadPipeReader::append(const char* data,
size_t size) {
return Status::OK();
}
-Status StreamLoadPipeReader::append(const ByteBufferPtr& buf) {
+Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
if (_write_buf != nullptr) {
_write_buf->flip();
RETURN_IF_ERROR(_append(_write_buf));
@@ -120,7 +145,7 @@ Status StreamLoadPipeReader::append(const ByteBufferPtr&
buf) {
}
// read the next buffer from _buf_queue
-Status StreamLoadPipeReader::_read_next_buffer(std::unique_ptr<uint8_t[]>*
data, int64_t* length) {
+Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data,
size_t* length) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
@@ -150,7 +175,7 @@ Status
StreamLoadPipeReader::_read_next_buffer(std::unique_ptr<uint8_t[]>* data,
return Status::OK();
}
-Status StreamLoadPipeReader::_append(const ByteBufferPtr& buf, size_t
proto_byte_size) {
+Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t
proto_byte_size) {
{
std::unique_lock<std::mutex> l(_lock);
// if _buf_queue is empty, we append this buf without size check
@@ -180,7 +205,7 @@ Status StreamLoadPipeReader::_append(const ByteBufferPtr&
buf, size_t proto_byte
}
// called when producer finished
-Status StreamLoadPipeReader::finish() {
+Status StreamLoadPipe::finish() {
if (_write_buf != nullptr) {
_write_buf->flip();
_append(_write_buf);
@@ -195,7 +220,7 @@ Status StreamLoadPipeReader::finish() {
}
// called when producer/consumer failed
-void StreamLoadPipeReader::cancel(const std::string& reason) {
+void StreamLoadPipe::cancel(const std::string& reason) {
{
std::lock_guard<std::mutex> l(_lock);
_cancelled = true;
diff --git a/be/src/io/fs/stream_load_pipe_reader.h
b/be/src/io/fs/stream_load_pipe.h
similarity index 85%
rename from be/src/io/fs/stream_load_pipe_reader.h
rename to be/src/io/fs/stream_load_pipe.h
index a5cc95e3e8..59d391d5ad 100644
--- a/be/src/io/fs/stream_load_pipe_reader.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -28,13 +28,13 @@ namespace io {
const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
-class StreamLoadPipeReader : public MessageBodySink, public FileReader {
+class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
- StreamLoadPipeReader(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
- size_t min_chunk_size = 64 * 1024, int64_t
total_length = -1,
- bool use_proto = false);
+ StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
+ size_t min_chunk_size = 64 * 1024, int64_t total_length =
-1,
+ bool use_proto = false);
- ~StreamLoadPipeReader() override;
+ ~StreamLoadPipe() override;
Status append_and_flush(const char* data, size_t size, size_t
proto_byte_size = 0);
@@ -63,9 +63,11 @@ public:
// called when producer/consumer failed
void cancel(const std::string& reason) override;
+ Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length);
+
private:
// read the next buffer from _buf_queue
- Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, int64_t*
length);
+ Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length);
Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0);
@@ -82,7 +84,7 @@ private:
// The default is -1, which means that the data arrives in a stream
// and the length is unknown.
// size_t is unsigned, so use int64_t
- // int64_t _total_length = -1;
+ int64_t _total_length = -1;
bool _use_proto = false;
std::deque<ByteBufferPtr> _buf_queue;
std::condition_variable _put_cond;
diff --git a/be/src/io/hdfs_reader_writer.cpp b/be/src/io/hdfs_reader_writer.cpp
deleted file mode 100644
index 3cbe2f4436..0000000000
--- a/be/src/io/hdfs_reader_writer.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/hdfs_reader_writer.h"
-
-#include "io/hdfs_file_reader.h"
-#include "io/hdfs_writer.h"
-
-namespace doris {
-
-Status HdfsReaderWriter::create_reader(const THdfsParams& hdfs_params, const
std::string& path,
- int64_t start_offset, FileReader**
reader) {
- *reader = new HdfsFileReader(hdfs_params, path, start_offset);
- return Status::OK();
-}
-
-Status HdfsReaderWriter::create_reader(const std::map<std::string,
std::string>& hdfs_params,
- const std::string& path, int64_t
start_offset,
- FileReader** reader) {
- *reader = new HdfsFileReader(hdfs_params, path, start_offset);
- return Status::OK();
-}
-
-Status HdfsReaderWriter::create_writer(const std::map<std::string,
std::string>& properties,
- const std::string& path,
- std::unique_ptr<FileWriter>& writer) {
- writer.reset(new HDFSWriter(properties, path));
- return Status::OK();
-}
-} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0b36a2b8e8..e3ab690168 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -182,6 +182,15 @@ public:
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
doris::vectorized::ScannerScheduler* scanner_scheduler() { return
_scanner_scheduler; }
+ // only for unit test
+ void set_master_info(TMasterInfo* master_info) { this->_master_info =
master_info; }
+ void set_new_load_stream_mgr(NewLoadStreamMgr* new_load_stream_mgr) {
+ this->_new_load_stream_mgr = new_load_stream_mgr;
+ }
+ void set_stream_load_executor(StreamLoadExecutor* stream_load_executor) {
+ this->_stream_load_executor = stream_load_executor;
+ }
+
private:
Status _init(const std::vector<StorePath>& store_paths);
void _destroy();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 134df9f221..c89cfc69ef 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -33,6 +33,7 @@
#include "gen_cpp/QueryPlanExtra_types.h"
#include "gen_cpp/Types_types.h"
#include "gutil/strings/substitute.h"
+#include "io/fs/stream_load_pipe.h"
#include "opentelemetry/trace/scope.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/client_cache.h"
@@ -41,9 +42,8 @@
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/runtime_filter_mgr.h"
-#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
-#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
@@ -137,8 +137,8 @@ public:
std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return
_fragments_ctx; }
- void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
- std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }
+ void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
+ std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
void set_need_wait_execution_trigger() { _need_wait_execution_trigger =
true; }
@@ -172,7 +172,7 @@ private:
std::shared_ptr<RuntimeFilterMergeControllerEntity>
_merge_controller_handler;
// The pipe for data transfering, such as insert.
- std::shared_ptr<StreamLoadPipe> _pipe;
+ std::shared_ptr<io::StreamLoadPipe> _pipe;
// If set the true, this plan fragment will be executed only after FE send
execution start rpc.
bool _need_wait_execution_trigger = false;
@@ -528,14 +528,13 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params) {
stream_load_ctx->auth.auth_code_uuid = params.txn_conf.auth_code_uuid;
stream_load_ctx->need_commit_self = true;
stream_load_ctx->need_rollback = true;
- // total_length == -1 means read one message from pipe in once time,
don't care the length.
- auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /*
max_buffered_bytes */,
- 64 * 1024 /*
min_chunk_size */,
- -1 /* total_length */,
true /* use_proto */);
+ auto pipe = std::make_shared<io::StreamLoadPipe>(
+ kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /*
min_chunk_size */,
+ -1 /* total_length */, true /* use_proto */);
stream_load_ctx->body_sink = pipe;
stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
- RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_ctx->id,
pipe));
+
RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id,
pipe));
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
set_pipe(params.params.fragment_instance_id, pipe);
@@ -562,7 +561,7 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
}
void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
- std::shared_ptr<StreamLoadPipe> pipe) {
+ std::shared_ptr<io::StreamLoadPipe> pipe) {
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
@@ -572,7 +571,7 @@ void FragmentMgr::set_pipe(const TUniqueId&
fragment_instance_id,
}
}
-std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId&
fragment_instance_id) {
+std::shared_ptr<io::StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId&
fragment_instance_id) {
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index afd5408bbf..050db423dd 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -45,6 +45,10 @@ namespace pipeline {
class PipelineFragmentContext;
}
+namespace io {
+class StreamLoadPipe;
+}
+
class QueryFragmentsCtx;
class ExecEnv;
class FragmentExecState;
@@ -54,7 +58,6 @@ class TExecPlanFragmentParams;
class TExecPlanFragmentParamsList;
class TUniqueId;
class RuntimeFilterMergeController;
-class StreamLoadPipe;
std::string to_load_error_http_path(const std::string& file_name);
@@ -104,9 +107,9 @@ public:
Status merge_filter(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
- void set_pipe(const TUniqueId& fragment_instance_id,
std::shared_ptr<StreamLoadPipe> pipe);
+ void set_pipe(const TUniqueId& fragment_instance_id,
std::shared_ptr<io::StreamLoadPipe> pipe);
- std::shared_ptr<StreamLoadPipe> get_pipe(const TUniqueId&
fragment_instance_id);
+ std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId&
fragment_instance_id);
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state,
FinishCallback cb);
diff --git a/be/src/runtime/routine_load/data_consumer.h
b/be/src/runtime/routine_load/data_consumer.h
index 173ffbd82a..afd4d9f6f7 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -30,7 +30,6 @@ namespace doris {
class KafkaConsumerPipe;
class Status;
-class StreamLoadPipe;
class DataConsumer {
public:
@@ -156,7 +155,6 @@ private:
KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
- std::shared_ptr<KafkaConsumerPipe> _k_consumer_pipe;
};
} // end namespace doris
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 0640591ece..869d427568 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -16,10 +16,10 @@
// under the License.
#include "runtime/routine_load/data_consumer_group.h"
+#include "io/fs/kafka_consumer_pipe.h"
#include "librdkafka/rdkafka.h"
#include "librdkafka/rdkafkacpp.h"
#include "runtime/routine_load/data_consumer.h"
-#include "runtime/routine_load/kafka_consumer_pipe.h"
#include "runtime/stream_load/stream_load_context.h"
namespace doris {
@@ -96,8 +96,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext*
ctx) {
int64_t left_rows = ctx->max_batch_rows;
int64_t left_bytes = ctx->max_batch_size;
- std::shared_ptr<KafkaConsumerPipe> kafka_pipe =
- std::static_pointer_cast<KafkaConsumerPipe>(ctx->body_sink);
+ std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
+ std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " <<
left_time
<< ", batch rows: " << left_rows << ", batch size: " <<
left_bytes << ". "
@@ -107,11 +107,11 @@ Status
KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset;
//improve performance
- Status (KafkaConsumerPipe::*append_data)(const char* data, size_t size);
+ Status (io::KafkaConsumerPipe::*append_data)(const char* data, size_t
size);
if (ctx->format == TFileFormatType::FORMAT_JSON) {
- append_data = &KafkaConsumerPipe::append_json;
+ append_data = &io::KafkaConsumerPipe::append_json;
} else {
- append_data = &KafkaConsumerPipe::append_with_line_delimiter;
+ append_data = &io::KafkaConsumerPipe::append_with_line_delimiter;
}
MonotonicStopWatch watch;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index a5535199e2..da8842ac59 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -23,9 +23,10 @@
#include "gen_cpp/BackendService_types.h"
#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/Types_types.h"
+#include "io/fs/kafka_consumer_pipe.h"
+#include "olap/iterators.h"
#include "runtime/exec_env.h"
#include "runtime/routine_load/data_consumer_group.h"
-#include "runtime/routine_load/kafka_consumer_pipe.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/defer_op.h"
#include "util/uid_util.h"
@@ -193,9 +194,9 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
TStatus tstatus;
tstatus.status_code = TStatusCode::OK;
put_result.status = tstatus;
- put_result.params = std::move(task.params);
+ put_result.params = task.params;
put_result.__isset.params = true;
- ctx->put_result = std::move(put_result);
+ ctx->put_result = put_result;
if (task.__isset.format) {
ctx->format = task.format;
}
@@ -267,10 +268,10 @@ void
RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
HANDLE_ERROR(consumer_pool->get_consumer_grp(ctx, &consumer_grp), "failed
to get consumers");
// create and set pipe
- std::shared_ptr<StreamLoadPipe> pipe;
+ std::shared_ptr<io::StreamLoadPipe> pipe;
switch (ctx->load_src_type) {
case TLoadSourceType::KAFKA: {
- pipe = std::make_shared<KafkaConsumerPipe>();
+ pipe = std::make_shared<io::KafkaConsumerPipe>();
Status st =
std::static_pointer_cast<KafkaDataConsumerGroup>(consumer_grp)
->assign_topic_partitions(ctx);
if (!st.ok()) {
@@ -291,7 +292,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext*
ctx, DataConsumerPool
ctx->body_sink = pipe;
// must put pipe before executing plan fragment
- HANDLE_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe), "failed to
add pipe");
+ HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe), "failed
to add pipe");
#ifndef BE_TEST
// execute plan fragment, async
@@ -365,32 +366,31 @@ void
RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status&
_exec_env->stream_load_executor()->rollback_txn(ctx);
ctx->need_rollback = false;
}
- if (ctx->body_sink.get() != nullptr) {
+ if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(err_msg);
}
-
- return;
}
// for test only
Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx)
{
auto mock_consumer = [this, ctx]() {
ctx->ref();
- std::shared_ptr<StreamLoadPipe> pipe =
_exec_env->load_stream_mgr()->get(ctx->id);
- bool eof = false;
+ std::shared_ptr<io::StreamLoadPipe> pipe =
_exec_env->new_load_stream_mgr()->get(ctx->id);
std::stringstream ss;
while (true) {
char one;
int64_t len = 1;
- int64_t read_bytes = 0;
- Status st = pipe->read((uint8_t*)&one, len, &read_bytes, &eof);
+ size_t read_bytes = 0;
+ Slice result((uint8_t*)&one, len);
+ IOContext io_ctx;
+ Status st = pipe->read_at(0, result, io_ctx, &read_bytes);
if (!st.ok()) {
LOG(WARNING) << "read failed";
ctx->promise.set_value(st);
break;
}
- if (eof) {
+ if (read_bytes == 0) {
ctx->promise.set_value(Status::OK());
break;
}
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.cpp
b/be/src/runtime/stream_load/new_load_stream_mgr.cpp
index 20cbb295cb..c7a57362fe 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.cpp
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.cpp
@@ -22,8 +22,8 @@ namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(new_stream_load_pipe_count,
MetricUnit::NOUNIT);
NewLoadStreamMgr::NewLoadStreamMgr() {
- // Each StreamLoadPipeReader has a limited buffer size (default 1M), it's
not needed to count the
- // actual size of all StreamLoadPipeReader.
+ // Each StreamLoadPipe has a limited buffer size (default 1M), it's not
needed to count the
+ // actual size of all StreamLoadPipe.
REGISTER_HOOK_METRIC(new_stream_load_pipe_count, [this]() { return
_stream_map.size(); });
}
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h
b/be/src/runtime/stream_load/new_load_stream_mgr.h
index c009fbc71e..60cd3aa446 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.h
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.h
@@ -21,7 +21,7 @@
#include <mutex>
#include <unordered_map>
-#include "io/fs/stream_load_pipe_reader.h"
+#include "io/fs/stream_load_pipe.h"
#include "util/doris_metrics.h"
#include "util/uid_util.h"
@@ -34,7 +34,7 @@ public:
NewLoadStreamMgr();
~NewLoadStreamMgr();
- Status put(const UniqueId& id, std::shared_ptr<io::StreamLoadPipeReader>
stream) {
+ Status put(const UniqueId& id, std::shared_ptr<io::StreamLoadPipe> stream)
{
std::lock_guard<std::mutex> l(_lock);
auto it = _stream_map.find(id);
if (it != std::end(_stream_map)) {
@@ -45,7 +45,7 @@ public:
return Status::OK();
}
- std::shared_ptr<io::StreamLoadPipeReader> get(const UniqueId& id) {
+ std::shared_ptr<io::StreamLoadPipe> get(const UniqueId& id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _stream_map.find(id);
if (it == std::end(_stream_map)) {
@@ -67,6 +67,6 @@ public:
private:
std::mutex _lock;
- std::unordered_map<UniqueId, std::shared_ptr<io::StreamLoadPipeReader>>
_stream_map;
+ std::unordered_map<UniqueId, std::shared_ptr<io::StreamLoadPipe>>
_stream_map;
};
} // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index 795b0f304b..0fc27a27f9 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -29,6 +29,7 @@
#include "gen_cpp/FrontendService_types.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "service/backend_options.h"
#include "util/string_util.h"
@@ -92,7 +93,7 @@ public:
need_rollback = false;
}
- _exec_env->load_stream_mgr()->remove(id);
+ _exec_env->new_load_stream_mgr()->remove(id);
}
std::string to_json() const;
diff --git a/be/src/util/hdfs_storage_backend.cpp
b/be/src/util/hdfs_storage_backend.cpp
index 71df59c086..83c25716f9 100644
--- a/be/src/util/hdfs_storage_backend.cpp
+++ b/be/src/util/hdfs_storage_backend.cpp
@@ -18,7 +18,6 @@
#include "util/hdfs_storage_backend.h"
#include "io/hdfs_file_reader.h"
-#include "io/hdfs_reader_writer.h"
#include "io/hdfs_writer.h"
#include "olap/file_helper.h"
#include "util/hdfs_util.h"
@@ -55,7 +54,7 @@ Status HDFSStorageBackend::close() {
// if the format of path is hdfs://ip:port/path, replace it to /path.
// path like hdfs://ip:port/path can't be used by libhdfs3.
std::string HDFSStorageBackend::parse_path(const std::string& path) {
- if (path.find(hdfs_file_prefix) != path.npos) {
+ if (path.find(hdfs_file_prefix) != std::string::npos) {
std::string temp = path.substr(hdfs_file_prefix.size());
std::size_t pos = temp.find_first_of('/');
return temp.substr(pos);
@@ -143,7 +142,7 @@ Status HDFSStorageBackend::list(const std::string&
remote_path, bool contain_md5
// get filename
std::filesystem::path file_path(file_name_with_path);
std::string file_name = file_path.filename();
- size_t pos = file_name.find_last_of(".");
+ size_t pos = file_name.find_last_of('.');
if (pos == std::string::npos || pos == file_name.size() - 1) {
// Not found checksum separator, ignore this file
continue;
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 428f6b4f91..4426ba63a7 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -281,8 +281,8 @@ set(VEC_FILES
exec/format/json/new_json_reader.cpp
exec/format/table/table_format_reader.cpp
exec/format/table/iceberg_reader.cpp
- exec/format/file_reader/new_file_factory.cpp
exec/format/file_reader/new_plain_text_line_reader.cpp
+ exec/format/file_reader/new_plain_binary_line_reader.cpp
)
if (WITH_MYSQL)
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 48f8d84dc7..63bdbd69f2 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -23,14 +23,15 @@
#include "common/consts.h"
#include "common/status.h"
#include "exec/decompressor.h"
-#include "exec/plain_binary_line_reader.h"
-#include "exec/plain_text_line_reader.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
+#include "io/file_factory.h"
#include "util/string_util.h"
#include "util/utf8_check.h"
#include "vec/core/block.h"
-#include "vec/exec/scan/vfile_scanner.h"
+#include "vec/exec/format/file_reader/new_plain_binary_line_reader.h"
+#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
+#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
@@ -45,6 +46,8 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile*
profile, ScannerCounte
_params(params),
_range(range),
_file_slot_descs(file_slot_descs),
+ _file_system(nullptr),
+ _file_reader(nullptr),
_line_reader(nullptr),
_line_reader_eof(false),
_text_converter(nullptr),
@@ -76,7 +79,7 @@ CsvReader::CsvReader(RuntimeProfile* profile, const
TFileScanRangeParams& params
_size = _range.size;
}
-CsvReader::~CsvReader() {}
+CsvReader::~CsvReader() = default;
Status CsvReader::init_reader(bool is_load) {
// set the skip lines and start offset
@@ -102,18 +105,23 @@ Status CsvReader::init_reader(bool is_load) {
_skip_lines = 1;
}
- // create and open file reader
- FileReader* real_reader = nullptr;
+ FileSystemProperties system_properties;
+ system_properties.system_type = _params.file_type;
+ system_properties.properties = _params.properties;
+ system_properties.hdfs_params = _params.hdfs_params;
+
+ FileDescription file_description;
+ file_description.path = _range.path;
+ file_description.start_offset = start_offset;
+ file_description.file_size = _range.__isset.file_size ? _range.file_size :
0;
+
if (_params.file_type == TFileType::FILE_STREAM) {
- RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
_file_reader_s));
- real_reader = _file_reader_s.get();
+ RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader));
} else {
RETURN_IF_ERROR(FileFactory::create_file_reader(
- _profile, _params, _range.path, start_offset,
_range.file_size, 0, _file_reader));
- real_reader = _file_reader.get();
+ _profile, system_properties, file_description, &_file_system,
&_file_reader));
}
- RETURN_IF_ERROR(real_reader->open());
- if (real_reader->size() == 0 && _params.file_type !=
TFileType::FILE_STREAM &&
+ if (_file_reader->size() == 0 && _params.file_type !=
TFileType::FILE_STREAM &&
_params.file_type != TFileType::FILE_BROKER) {
return Status::EndOfFile("Empty File");
}
@@ -135,11 +143,13 @@ Status CsvReader::init_reader(bool is_load) {
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
- _line_reader.reset(new PlainTextLineReader(_profile, real_reader,
_decompressor.get(),
- _size, _line_delimiter,
_line_delimiter_length));
+ _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader,
_decompressor.get(),
+ _size, _line_delimiter,
+ _line_delimiter_length,
start_offset));
+
break;
case TFileFormatType::FORMAT_PROTO:
- _line_reader.reset(new PlainBinaryLineReader(real_reader));
+ _line_reader.reset(new NewPlainBinaryLineReader(_file_reader,
_params.file_type));
break;
default:
return Status::InternalError(
@@ -495,11 +505,20 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool*
is_parse_name) {
}
}
- // create and open file reader
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params,
_range.path, start_offset,
- _range.file_size, 0,
_file_reader));
- RETURN_IF_ERROR(_file_reader->open());
- if (_file_reader->size() == 0) {
+ FileSystemProperties system_properties;
+ system_properties.system_type = _params.file_type;
+ system_properties.properties = _params.properties;
+ system_properties.hdfs_params = _params.hdfs_params;
+
+ FileDescription file_description;
+ file_description.path = _range.path;
+ file_description.start_offset = start_offset;
+ file_description.file_size = _range.__isset.file_size ? _range.file_size :
0;
+
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
system_properties, file_description,
+ &_file_system,
&_file_reader));
+ if (_file_reader->size() == 0 && _params.file_type !=
TFileType::FILE_STREAM &&
+ _params.file_type != TFileType::FILE_BROKER) {
return Status::EndOfFile("Empty File");
}
@@ -513,8 +532,10 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool*
is_parse_name) {
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(_create_decompressor());
- _line_reader.reset(new PlainTextLineReader(_profile, _file_reader.get(),
_decompressor.get(),
- _size, _line_delimiter,
_line_delimiter_length));
+ _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader,
_decompressor.get(),
+ _size, _line_delimiter,
_line_delimiter_length,
+ start_offset));
+
return Status::OK();
}
diff --git a/be/src/vec/exec/format/csv/csv_reader.h
b/be/src/vec/exec/format/csv/csv_reader.h
index 5bb14523e3..3a22e0aa14 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -17,10 +17,11 @@
#pragma once
+#include "io/fs/file_reader.h"
#include "vec/exec/format/generic_reader.h"
+
namespace doris {
-class FileReader;
class LineReader;
class TextConverter;
class Decompressor;
@@ -63,13 +64,13 @@ private:
bool _is_array(const Slice& slice);
// used for parse table schema of csv file.
+ // Currently, this feature is for table valued function.
Status _prepare_parse(size_t* read_line, bool* is_parse_name);
Status _parse_col_nums(size_t* col_nums);
Status _parse_col_names(std::vector<std::string>* col_names);
// TODO(ftw): parse type
Status _parse_col_types(size_t col_nums, std::vector<TypeDescriptor>*
col_types);
-private:
RuntimeState* _state;
RuntimeProfile* _profile;
ScannerCounter* _counter;
@@ -84,11 +85,8 @@ private:
// True if this is a load task
bool _is_load = false;
- // _file_reader_s is for stream load pipe reader,
- // and _file_reader is for other file reader.
- // TODO: refactor this to use only shared_ptr or unique_ptr
- std::unique_ptr<FileReader> _file_reader;
- std::shared_ptr<FileReader> _file_reader_s;
+ std::unique_ptr<io::FileSystem> _file_system;
+ io::FileReaderSPtr _file_reader;
std::unique_ptr<LineReader> _line_reader;
bool _line_reader_eof;
std::unique_ptr<TextConverter> _text_converter;
diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.cpp
b/be/src/vec/exec/format/file_reader/new_file_factory.cpp
deleted file mode 100644
index a02b5ebdba..0000000000
--- a/be/src/vec/exec/format/file_reader/new_file_factory.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/format/file_reader/new_file_factory.h"
-
-#include "io/broker_reader.h"
-#include "io/broker_writer.h"
-#include "io/buffered_reader.h"
-#include "io/fs/broker_file_system.h"
-#include "io/fs/file_system.h"
-#include "io/fs/hdfs_file_system.h"
-#include "io/fs/s3_file_system.h"
-#include "io/hdfs_file_reader.h"
-#include "io/hdfs_writer.h"
-#include "io/local_file_reader.h"
-#include "io/local_file_writer.h"
-#include "io/s3_reader.h"
-#include "io/s3_writer.h"
-#include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
-#include "runtime/stream_load/new_load_stream_mgr.h"
-#include "util/s3_util.h"
-
-namespace doris {
-
-Status NewFileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
- const std::vector<TNetworkAddress>&
broker_addresses,
- const std::map<std::string,
std::string>& properties,
- const std::string& path, int64_t
start_offset,
- std::unique_ptr<FileWriter>&
file_writer) {
- switch (type) {
- case TFileType::FILE_LOCAL: {
- file_writer.reset(new LocalFileWriter(path, start_offset));
- break;
- }
- case TFileType::FILE_BROKER: {
- file_writer.reset(new BrokerWriter(env, broker_addresses, properties,
path, start_offset));
- break;
- }
- case TFileType::FILE_S3: {
- file_writer.reset(new S3Writer(properties, path, start_offset));
- break;
- }
- case TFileType::FILE_HDFS: {
- RETURN_IF_ERROR(create_hdfs_writer(
- const_cast<std::map<std::string, std::string>&>(properties),
path, file_writer));
- break;
- }
- default:
- return Status::InternalError("unsupported file writer type: {}",
std::to_string(type));
- }
-
- return Status::OK();
-}
-
-// ============================
-// broker scan node/unique ptr
-Status NewFileFactory::create_file_reader(TFileType::type type, ExecEnv* env,
- RuntimeProfile* profile,
- const std::vector<TNetworkAddress>&
broker_addresses,
- const std::map<std::string,
std::string>& properties,
- const TBrokerRangeDesc& range,
int64_t start_offset,
- std::unique_ptr<FileReader>&
file_reader) {
- FileReader* file_reader_ptr;
- switch (type) {
- case TFileType::FILE_LOCAL: {
- file_reader_ptr = new LocalFileReader(range.path, start_offset);
- break;
- }
- case TFileType::FILE_BROKER: {
- file_reader_ptr = new BufferedReader(
- profile,
- new BrokerReader(env, broker_addresses, properties,
range.path, start_offset,
- range.__isset.file_size ? range.file_size :
0));
- break;
- }
- case TFileType::FILE_S3: {
- file_reader_ptr =
- new BufferedReader(profile, new S3Reader(properties,
range.path, start_offset));
- break;
- }
- case TFileType::FILE_HDFS: {
- FileReader* hdfs_reader = nullptr;
- RETURN_IF_ERROR(
- create_hdfs_reader(range.hdfs_params, range.path,
start_offset, &hdfs_reader));
- file_reader_ptr = new BufferedReader(profile, hdfs_reader);
- break;
- }
- default:
- return Status::InternalError("unsupported file reader type: " +
std::to_string(type));
- }
- file_reader.reset(file_reader_ptr);
-
- return Status::OK();
-}
-
-// ============================
-// file scan node/unique ptr
-Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/,
- const FileSystemProperties&
system_properties,
- const FileDescription&
file_description,
- std::unique_ptr<io::FileSystem>*
file_system,
- io::FileReaderSPtr* file_reader) {
- TFileType::type type = system_properties.system_type;
- io::FileSystem* file_system_ptr = nullptr;
- switch (type) {
- case TFileType::FILE_S3: {
- RETURN_IF_ERROR(create_s3_reader(system_properties.properties,
file_description.path,
- &file_system_ptr, file_reader));
- break;
- }
- case TFileType::FILE_HDFS: {
- RETURN_IF_ERROR(create_hdfs_reader(system_properties.hdfs_params,
file_description.path,
- &file_system_ptr, file_reader));
- break;
- }
- case TFileType::FILE_BROKER: {
-
RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0],
- system_properties.properties,
file_description.path,
- &file_system_ptr, file_reader));
- break;
- }
- default:
- return Status::NotSupported("unsupported file reader type: {}",
std::to_string(type));
- }
- file_system->reset(file_system_ptr);
- return Status::OK();
-}
-
-// file scan node/stream load pipe
-Status NewFileFactory::create_pipe_reader(const TUniqueId& load_id,
- io::FileReaderSPtr* file_reader) {
- *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
- if (!(*file_reader)) {
- return Status::InternalError("unknown stream load id: {}",
UniqueId(load_id).to_string());
- }
- return Status::OK();
-}
-
-Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params,
const std::string& path,
- int64_t start_offset, FileReader**
reader) {
- *reader = new HdfsFileReader(hdfs_params, path, start_offset);
- return Status::OK();
-}
-
-Status NewFileFactory::create_hdfs_reader(const THdfsParams& hdfs_params,
const std::string& path,
- io::FileSystem** hdfs_file_system,
- io::FileReaderSPtr* reader) {
- *hdfs_file_system = new io::HdfsFileSystem(hdfs_params, path);
- (dynamic_cast<io::HdfsFileSystem*>(*hdfs_file_system))->connect();
- (*hdfs_file_system)->open_file(path, reader);
- return Status::OK();
-}
-
-Status NewFileFactory::create_hdfs_writer(const std::map<std::string,
std::string>& properties,
- const std::string& path,
- std::unique_ptr<FileWriter>& writer)
{
- writer.reset(new HDFSWriter(properties, path));
- return Status::OK();
-}
-
-Status NewFileFactory::create_s3_reader(const std::map<std::string,
std::string>& prop,
- const std::string& path,
io::FileSystem** s3_file_system,
- io::FileReaderSPtr* reader) {
- S3URI s3_uri(path);
- if (!s3_uri.parse()) {
- return Status::InvalidArgument("s3 uri is invalid: {}", path);
- }
- S3Conf s3_conf;
- RETURN_IF_ERROR(ClientFactory::convert_properties_to_s3_conf(prop, s3_uri,
&s3_conf));
- *s3_file_system = new io::S3FileSystem(s3_conf, "");
- (dynamic_cast<io::S3FileSystem*>(*s3_file_system))->connect();
- (*s3_file_system)->open_file(s3_uri.get_key(), reader);
- return Status::OK();
-}
-
-Status NewFileFactory::create_broker_reader(const TNetworkAddress& broker_addr,
- const std::map<std::string,
std::string>& prop,
- const std::string& path,
- io::FileSystem**
broker_file_system,
- io::FileReaderSPtr* reader) {
- *broker_file_system = new io::BrokerFileSystem(broker_addr, prop);
- (dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect();
- (*broker_file_system)->open_file(path, reader);
- return Status::OK();
-}
-} // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.h
b/be/src/vec/exec/format/file_reader/new_file_factory.h
deleted file mode 100644
index aa510718ba..0000000000
--- a/be/src/vec/exec/format/file_reader/new_file_factory.h
+++ /dev/null
@@ -1,115 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include "gen_cpp/PlanNodes_types.h"
-#include "gen_cpp/Types_types.h"
-#include "io/file_reader.h"
-#include "io/file_writer.h"
-#include "io/fs/file_reader.h"
-
-namespace doris {
-namespace io {
-class FileSystem;
-}
-
-class ExecEnv;
-class TNetworkAddress;
-class RuntimeProfile;
-
-struct FileSystemProperties {
- TFileType::type system_type;
- std::map<std::string, std::string> properties;
- THdfsParams hdfs_params;
- std::vector<TNetworkAddress> broker_addresses;
-};
-
-struct FileDescription {
- std::string path;
- int64_t start_offset;
- size_t file_size;
- size_t buffer_size;
-};
-
-class NewFileFactory {
-public:
- // Create FileWriter
- static Status create_file_writer(TFileType::type type, ExecEnv* env,
- const std::vector<TNetworkAddress>&
broker_addresses,
- const std::map<std::string, std::string>&
properties,
- const std::string& path, int64_t
start_offset,
- std::unique_ptr<FileWriter>& file_writer);
-
- /**
- * Create FileReader for broker scan node related scanners and readers
- */
- static Status create_file_reader(TFileType::type type, ExecEnv* env,
RuntimeProfile* profile,
- const std::vector<TNetworkAddress>&
broker_addresses,
- const std::map<std::string, std::string>&
properties,
- const TBrokerRangeDesc& range, int64_t
start_offset,
- std::unique_ptr<FileReader>& file_reader);
- /**
- * Create FileReader for file scan node rlated scanners and readers
- * If buffer_size > 0, use BufferedReader to wrap the underlying
FileReader;
- * Otherwise, return the underlying FileReader directly.
- */
- static Status create_file_reader(RuntimeProfile* profile,
- const FileSystemProperties&
system_properties,
- const FileDescription& file_description,
- std::unique_ptr<io::FileSystem>*
file_system,
- io::FileReaderSPtr* file_reader);
-
- // Create FileReader for stream load pipe
- static Status create_pipe_reader(const TUniqueId& load_id,
io::FileReaderSPtr* file_reader);
-
- // TODO(ftw): should be delete after new_hdfs_file_reader ready
- static Status create_hdfs_reader(const THdfsParams& hdfs_params, const
std::string& path,
- int64_t start_offset, FileReader**
reader);
-
- static Status create_hdfs_reader(const THdfsParams& hdfs_params, const
std::string& path,
- io::FileSystem** hdfs_file_system,
io::FileReaderSPtr* reader);
-
- // TODO(ftw): should be delete after new_hdfs_file_writer ready
- static Status create_hdfs_writer(const std::map<std::string, std::string>&
properties,
- const std::string& path,
std::unique_ptr<FileWriter>& writer);
-
- static Status create_s3_reader(const std::map<std::string, std::string>&
prop,
- const std::string& path, io::FileSystem**
s3_file_system,
- io::FileReaderSPtr* reader);
-
- static Status create_broker_reader(const TNetworkAddress& broker_addr,
- const std::map<std::string,
std::string>& prop,
- const std::string& path,
io::FileSystem** hdfs_file_system,
- io::FileReaderSPtr* reader);
-
- static TFileType::type convert_storage_type(TStorageBackendType::type
type) {
- switch (type) {
- case TStorageBackendType::LOCAL:
- return TFileType::FILE_LOCAL;
- case TStorageBackendType::S3:
- return TFileType::FILE_S3;
- case TStorageBackendType::BROKER:
- return TFileType::FILE_BROKER;
- case TStorageBackendType::HDFS:
- return TFileType::FILE_HDFS;
- default:
- LOG(FATAL) << "not match type to convert, from type:" << type;
- }
- __builtin_unreachable();
- }
-};
-} // namespace doris
diff --git
a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp
b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp
new file mode 100644
index 0000000000..016d29ca1d
--- /dev/null
+++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.cpp
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "new_plain_binary_line_reader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include "io/fs/file_reader.h"
+#include "io/fs/stream_load_pipe.h"
+#include "olap/iterators.h"
+
+namespace doris {
+
+NewPlainBinaryLineReader::NewPlainBinaryLineReader(io::FileReaderSPtr
file_reader,
+ TFileType::type file_type)
+ : _file_reader(file_reader), _file_type(file_type) {}
+
+NewPlainBinaryLineReader::~NewPlainBinaryLineReader() {
+ close();
+}
+
+void NewPlainBinaryLineReader::close() {}
+
+Status NewPlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size,
bool* eof) {
+ std::unique_ptr<uint8_t[]> file_buf;
+ size_t read_size = 0;
+ IOContext io_ctx;
+ io_ctx.reader_type = READER_QUERY;
+ switch (_file_type) {
+ case TFileType::FILE_LOCAL:
+ case TFileType::FILE_HDFS:
+ case TFileType::FILE_S3: {
+ size_t file_size = _file_reader->size();
+ file_buf.reset(new uint8_t[file_size]);
+ Slice result(file_buf.get(), file_size);
+ RETURN_IF_ERROR(_file_reader->read_at(0, result, io_ctx, &read_size));
+ break;
+ }
+ case TFileType::FILE_STREAM: {
+ RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()))
+ ->read_one_message(&file_buf, &read_size));
+
+ break;
+ }
+ default: {
+ return Status::NotSupported("no supported file reader type: {}",
_file_type);
+ }
+ }
+ *ptr = file_buf.release();
+ *size = read_size;
+ if (read_size == 0) {
+ *eof = true;
+ }
+ return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/io/hdfs_reader_writer.h
b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
similarity index 54%
rename from be/src/io/hdfs_reader_writer.h
rename to be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
index 3b27b22656..b8b3f398db 100644
--- a/be/src/io/hdfs_reader_writer.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
@@ -17,22 +17,27 @@
#pragma once
-#include "gen_cpp/PlanNodes_types.h"
-#include "io/file_reader.h"
-#include "io/file_writer.h"
+#include <gen_cpp/Types_types.h>
+
+#include "exec/line_reader.h"
+#include "io/fs/file_reader.h"
namespace doris {
-// TODO(ftw): This file should be deleted when new_file_factory.h replace
file_factory.h
-class HdfsReaderWriter {
+class NewPlainBinaryLineReader : public LineReader {
public:
- static Status create_reader(const THdfsParams& hdfs_params, const
std::string& path,
- int64_t start_offset, FileReader** reader);
+ NewPlainBinaryLineReader(io::FileReaderSPtr file_reader, TFileType::type
file_type);
+
+ ~NewPlainBinaryLineReader() override;
+
+ Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
- static Status create_reader(const std::map<std::string, std::string>&
properties,
- const std::string& path, int64_t start_offset,
FileReader** reader);
+ void close() override;
- static Status create_writer(const std::map<std::string, std::string>&
properties,
- const std::string& path,
std::unique_ptr<FileWriter>& writer);
+private:
+ io::FileReaderSPtr _file_reader;
+
+ TFileType::type _file_type;
};
+
} // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index 56bc26c648..52af605154 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -33,7 +33,8 @@
namespace doris {
-NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
io::FileReader* file_reader,
+NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
+ io::FileReaderSPtr file_reader,
Decompressor* decompressor,
size_t length,
const std::string&
line_delimiter,
size_t line_delimiter_length,
size_t current_offset)
@@ -134,11 +135,6 @@ void NewPlainTextLineReader::extend_input_buf() {
_input_buf_limit -= _input_buf_pos;
_input_buf_pos = 0;
} while (false);
-
- // LOG(INFO) << "extend input buf."
- // << " input_buf_size: " << _input_buf_size
- // << " input_buf_pos: " << _input_buf_pos
- // << " input_buf_limit: " << _input_buf_limit;
}
void NewPlainTextLineReader::extend_output_buf() {
@@ -176,11 +172,6 @@ void NewPlainTextLineReader::extend_output_buf() {
_output_buf_limit -= _output_buf_pos;
_output_buf_pos = 0;
} while (false);
-
- // LOG(INFO) << "extend output buf."
- // << " output_buf_size: " << _output_buf_size
- // << " output_buf_pos: " << _output_buf_pos
- // << " output_buf_limit: " << _output_buf_limit;
}
Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size,
bool* eof) {
@@ -241,9 +232,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t**
ptr, size_t* size, bool
IOContext io_ctx;
RETURN_IF_ERROR(
_file_reader->read_at(_current_offset, file_slice,
io_ctx, &read_len));
+ _current_offset += read_len;
+ if (read_len == 0) {
+ _file_eof = true;
+ }
COUNTER_UPDATE(_bytes_read_counter, read_len);
}
- // LOG(INFO) << "after read file: _file_eof: " << _file_eof <<
" read_len: " << read_len;
if (_file_eof || read_len == 0) {
if (!_stream_end) {
return Status::InternalError(
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
index a39e578577..99debf6d18 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
@@ -18,19 +18,17 @@
#pragma once
#include "exec/line_reader.h"
+#include "io/fs/file_reader.h"
#include "util/runtime_profile.h"
namespace doris {
-namespace io {
-class FileReader;
-}
class Decompressor;
class Status;
class NewPlainTextLineReader : public LineReader {
public:
- NewPlainTextLineReader(RuntimeProfile* profile, io::FileReader*
file_reader,
+ NewPlainTextLineReader(RuntimeProfile* profile, io::FileReaderSPtr
file_reader,
Decompressor* decompressor, size_t length,
const std::string& line_delimiter, size_t
line_delimiter_length,
size_t current_offset);
@@ -60,7 +58,7 @@ private:
void extend_output_buf();
RuntimeProfile* _profile;
- io::FileReader* _file_reader;
+ io::FileReaderSPtr _file_reader;
Decompressor* _decompressor;
// the min length that should be read.
// -1 means endless(for stream load)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index e179f91eea..13e86b1ba9 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -18,12 +18,14 @@
#include "vec/exec/format/json/new_json_reader.h"
#include "common/compiler_util.h"
-#include "exec/plain_text_line_reader.h"
#include "exprs/json_functions.h"
#include "io/file_factory.h"
+#include "io/fs/stream_load_pipe.h"
+#include "olap/iterators.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
+#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
using namespace ErrorCode;
@@ -38,9 +40,8 @@ NewJsonReader::NewJsonReader(RuntimeState* state,
RuntimeProfile* profile, Scann
_params(params),
_range(range),
_file_slot_descs(file_slot_descs),
+ _file_system(nullptr),
_file_reader(nullptr),
- _file_reader_s(nullptr),
- _real_file_reader(nullptr),
_line_reader(nullptr),
_reader_eof(false),
_skip_first_line(false),
@@ -49,7 +50,8 @@ NewJsonReader::NewJsonReader(RuntimeState* state,
RuntimeProfile* profile, Scann
_value_allocator(_value_buffer, sizeof(_value_buffer)),
_parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
_origin_json_doc(&_value_allocator, sizeof(_parse_buffer),
&_parse_allocator),
- _scanner_eof(scanner_eof) {
+ _scanner_eof(scanner_eof),
+ _current_offset(0) {
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "ReadTime");
_file_read_timer = ADD_TIMER(_profile, "FileReadTime");
@@ -64,9 +66,6 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const
TFileScanRangeParams
_params(params),
_range(range),
_file_slot_descs(file_slot_descs),
- _file_reader(nullptr),
- _file_reader_s(nullptr),
- _real_file_reader(nullptr),
_line_reader(nullptr),
_reader_eof(false),
_skip_first_line(false),
@@ -159,11 +158,11 @@ Status
NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
if (_line_reader != nullptr) {
RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof));
} else {
- int64_t length = 0;
- RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr,
&length));
+ size_t read_size = 0;
+ RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &read_size));
json_str = json_str_ptr.get();
- size = length;
- if (length == 0) {
+ size = read_size;
+ if (read_size == 0) {
eof = true;
}
}
@@ -286,15 +285,25 @@ Status NewJsonReader::_open_file_reader() {
start_offset -= 1;
}
+ _current_offset = start_offset;
+
+ FileSystemProperties system_properties;
+ system_properties.system_type = _params.file_type;
+ system_properties.properties = _params.properties;
+ system_properties.hdfs_params = _params.hdfs_params;
+
+ FileDescription file_description;
+ file_description.path = _range.path;
+ file_description.start_offset = start_offset;
+ file_description.file_size = _range.__isset.file_size ? _range.file_size :
0;
+
if (_params.file_type == TFileType::FILE_STREAM) {
- RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
_file_reader_s));
- _real_file_reader = _file_reader_s.get();
+ RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader));
} else {
RETURN_IF_ERROR(FileFactory::create_file_reader(
- _profile, _params, _range.path, start_offset,
_range.file_size, 0, _file_reader));
- _real_file_reader = _file_reader.get();
+ _profile, system_properties, file_description, &_file_system,
&_file_reader));
}
- return _real_file_reader->open();
+ return Status::OK();
}
Status NewJsonReader::_open_line_reader() {
@@ -306,8 +315,9 @@ Status NewJsonReader::_open_line_reader() {
} else {
_skip_first_line = false;
}
- _line_reader.reset(new PlainTextLineReader(_profile, _real_file_reader,
nullptr, size,
- _line_delimiter,
_line_delimiter_length));
+ _line_reader.reset(new NewPlainTextLineReader(_profile, _file_reader,
nullptr, size,
+ _line_delimiter,
_line_delimiter_length,
+ _current_offset));
return Status::OK();
}
@@ -509,13 +519,12 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool*
eof) {
if (_line_reader != nullptr) {
RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
} else {
- int64_t length = 0;
- RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr,
&length));
- json_str = json_str_ptr.get();
- *size = length;
- if (length == 0) {
+ RETURN_IF_ERROR(_read_one_message(&json_str_ptr, size));
+ json_str = json_str_ptr.release();
+ if (*size == 0) {
*eof = true;
}
+ _current_offset += *size;
}
_bytes_read_counter += *size;
@@ -877,4 +886,28 @@ std::string NewJsonReader::_print_json_value(const
rapidjson::Value& value) {
return std::string(buffer.GetString());
}
+Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf,
size_t* read_size) {
+ IOContext io_ctx;
+ io_ctx.reader_type = READER_QUERY;
+ switch (_params.file_type) {
+ case TFileType::FILE_LOCAL:
+ case TFileType::FILE_HDFS:
+ case TFileType::FILE_S3: {
+ size_t file_size = _file_reader->size();
+ file_buf->reset(new uint8_t[file_size]);
+ Slice result(file_buf->get(), file_size);
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, io_ctx,
read_size));
+ break;
+ }
+ case TFileType::FILE_STREAM: {
+ RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()))
+ ->read_one_message(file_buf, read_size));
+ break;
+ }
+ default: {
+ return Status::NotSupported("no supported file reader type: {}",
_params.file_type);
+ }
+ }
+ return Status::OK();
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index 5a057d32fe..98aae55ea4 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -22,6 +22,7 @@
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
+#include "io/fs/file_reader.h"
#include "vec/exec/format/generic_reader.h"
namespace doris {
@@ -92,7 +93,8 @@ private:
std::string _print_json_value(const rapidjson::Value& value);
-private:
+ Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t*
read_size);
+
Status (NewJsonReader::*_vhandle_json_callback)(
std::vector<vectorized::MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs, bool*
is_empty_row, bool* eof);
@@ -103,12 +105,8 @@ private:
const TFileRangeDesc& _range;
const std::vector<SlotDescriptor*>& _file_slot_descs;
- // _file_reader_s is for stream load pipe reader,
- // and _file_reader is for other file reader.
- // TODO: refactor this to use only shared_ptr or unique_ptr
- std::unique_ptr<FileReader> _file_reader;
- std::shared_ptr<FileReader> _file_reader_s;
- FileReader* _real_file_reader;
+ std::unique_ptr<io::FileSystem> _file_system;
+ io::FileReaderSPtr _file_reader;
std::unique_ptr<LineReader> _line_reader;
bool _reader_eof;
@@ -134,9 +132,8 @@ private:
char _value_buffer[4 * 1024 * 1024]; // 4MB
char _parse_buffer[512 * 1024]; // 512KB
- typedef rapidjson::GenericDocument<rapidjson::UTF8<>,
rapidjson::MemoryPoolAllocator<>,
- rapidjson::MemoryPoolAllocator<>>
- Document;
+ using Document = rapidjson::GenericDocument<rapidjson::UTF8<>,
rapidjson::MemoryPoolAllocator<>,
+
rapidjson::MemoryPoolAllocator<>>;
rapidjson::MemoryPoolAllocator<> _value_allocator;
rapidjson::MemoryPoolAllocator<> _parse_allocator;
Document _origin_json_doc; // origin json document object from parsed
json string
@@ -145,6 +142,8 @@ private:
bool* _scanner_eof;
+ size_t _current_offset;
+
RuntimeProfile::Counter* _bytes_read_counter;
RuntimeProfile::Counter* _read_timer;
RuntimeProfile::Counter* _file_read_timer;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 2748160e97..c6867b0d83 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -23,6 +23,9 @@
#include "cctz/time_zone.h"
#include "gutil/strings/substitute.h"
#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "olap/iterators.h"
+#include "util/slice.h"
#include "vec/columns/column_array.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
@@ -47,10 +50,11 @@ void ORCFileInputStream::read(void* buf, uint64_t length,
uint64_t offset) {
SCOPED_RAW_TIMER(&_statistics.read_time);
uint64_t has_read = 0;
char* out = reinterpret_cast<char*>(buf);
+ IOContext io_ctx;
while (has_read < length) {
- int64_t loop_read;
- Status st = _file_reader->readat(offset + has_read, length - has_read,
&loop_read,
- out + has_read);
+ size_t loop_read;
+ Slice result(out + has_read, length - has_read);
+ Status st = _file_reader->read_at(offset + has_read, result, io_ctx,
&loop_read);
if (!st.ok()) {
throw orc::ParseError(
strings::Substitute("Failed to read $0: $1", _file_name,
st.to_string()));
@@ -87,7 +91,8 @@ OrcReader::OrcReader(const TFileScanRangeParams& params,
const TFileRangeDesc& r
_scan_params(params),
_scan_range(range),
_ctz(ctz),
- _column_names(column_names) {}
+ _column_names(column_names),
+ _file_system(nullptr) {}
OrcReader::~OrcReader() {
close();
@@ -132,12 +137,22 @@ Status OrcReader::init_reader(
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
if (_file_reader == nullptr) {
- std::unique_ptr<FileReader> inner_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_scan_params, _scan_range.path,
-
_scan_range.start_offset,
- _scan_range.file_size,
0, inner_reader));
- RETURN_IF_ERROR(inner_reader->open());
- _file_reader = new ORCFileInputStream(_scan_range.path,
inner_reader.release());
+ io::FileReaderSPtr inner_reader;
+
+ FileSystemProperties system_properties;
+ system_properties.system_type = _scan_params.file_type;
+ system_properties.properties = _scan_params.properties;
+ system_properties.hdfs_params = _scan_params.hdfs_params;
+
+ FileDescription file_description;
+ file_description.path = _scan_range.path;
+ file_description.start_offset = _scan_range.start_offset;
+ file_description.file_size = _scan_range.__isset.file_size ?
_scan_range.file_size : 0;
+
+ RETURN_IF_ERROR(FileFactory::create_file_reader(
+ _profile, system_properties, file_description, &_file_system,
&inner_reader));
+
+ _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
}
if (_file_reader->getLength() == 0) {
return Status::EndOfFile("Empty orc file");
@@ -179,12 +194,22 @@ Status OrcReader::init_reader(
Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
if (_file_reader == nullptr) {
- std::unique_ptr<FileReader> inner_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_scan_params, _scan_range.path,
-
_scan_range.start_offset,
- _scan_range.file_size,
0, inner_reader));
- RETURN_IF_ERROR(inner_reader->open());
- _file_reader = new ORCFileInputStream(_scan_range.path,
inner_reader.release());
+ io::FileReaderSPtr inner_reader;
+
+ FileSystemProperties system_properties;
+ system_properties.system_type = _scan_params.file_type;
+ system_properties.properties = _scan_params.properties;
+ system_properties.hdfs_params = _scan_params.hdfs_params;
+
+ FileDescription file_description;
+ file_description.path = _scan_range.path;
+ file_description.start_offset = _scan_range.start_offset;
+ file_description.file_size = _scan_range.__isset.file_size ?
_scan_range.file_size : 0;
+
+ RETURN_IF_ERROR(FileFactory::create_file_reader(
+ _profile, system_properties, file_description, &_file_system,
&inner_reader));
+
+ _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
}
if (_file_reader->getLength() == 0) {
return Status::EndOfFile("Empty orc file");
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 98c2fc7c02..4f5f4dba10 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -21,7 +21,7 @@
#include "common/config.h"
#include "exec/olap_common.h"
-#include "io/file_reader.h"
+#include "io/fs/file_reader.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/exec/format/format_common.h"
@@ -37,15 +37,10 @@ public:
int64_t read_bytes = 0;
};
- ORCFileInputStream(const std::string& file_name, FileReader* file_reader)
+ ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr
file_reader)
: _file_name(file_name), _file_reader(file_reader) {};
- ~ORCFileInputStream() override {
- if (_file_reader != nullptr) {
- _file_reader->close();
- delete _file_reader;
- }
- }
+ ~ORCFileInputStream() override = default;
uint64_t getLength() const override { return _file_reader->size(); }
@@ -60,7 +55,7 @@ public:
private:
Statistics _statistics;
const std::string& _file_name;
- FileReader* _file_reader;
+ io::FileReaderSPtr _file_reader;
};
class OrcReader : public GenericReader {
@@ -82,7 +77,7 @@ public:
~OrcReader() override;
// for test
- void set_file_reader(const std::string& file_name, FileReader*
file_reader) {
+ void set_file_reader(const std::string& file_name, io::FileReaderSPtr
file_reader) {
_file_reader = new ORCFileInputStream(file_name, file_reader);
}
@@ -283,6 +278,8 @@ private:
orc::ReaderOptions _reader_options;
orc::RowReaderOptions _row_reader_options;
+ std::unique_ptr<io::FileSystem> _file_system;
+
// only for decimal
DecimalScaleParams _decimal_scale_params;
};
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index 5811f034bd..269f6a4368 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -23,7 +23,8 @@
#include "common/logging.h"
#include "gen_cpp/parquet_types.h"
-#include "io/file_reader.h"
+#include "io/fs/file_reader.h"
+#include "olap/iterators.h"
#include "util/coding.h"
#include "util/thrift_util.h"
#include "vparquet_file_metadata.h"
@@ -33,12 +34,14 @@ namespace doris::vectorized {
constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
-static Status parse_thrift_footer(FileReader* file,
std::shared_ptr<FileMetaData>& file_metadata) {
+static Status parse_thrift_footer(io::FileReaderSPtr file,
+ std::shared_ptr<FileMetaData>&
file_metadata) {
uint8_t footer[PARQUET_FOOTER_SIZE];
int64_t file_size = file->size();
- int64_t bytes_read = 0;
- RETURN_IF_ERROR(file->readat(file_size - PARQUET_FOOTER_SIZE,
PARQUET_FOOTER_SIZE, &bytes_read,
- footer));
+ size_t bytes_read = 0;
+ Slice result(footer, PARQUET_FOOTER_SIZE);
+ IOContext io_ctx;
+ RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE, result,
io_ctx, &bytes_read));
DCHECK_EQ(bytes_read, PARQUET_FOOTER_SIZE);
// validate magic
@@ -57,8 +60,9 @@ static Status parse_thrift_footer(FileReader* file,
std::shared_ptr<FileMetaData
tparquet::FileMetaData t_metadata;
// deserialize footer
uint8_t meta_buff[metadata_size];
- RETURN_IF_ERROR(file->readat(file_size - PARQUET_FOOTER_SIZE -
metadata_size, metadata_size,
- &bytes_read, meta_buff));
+ Slice res(meta_buff, metadata_size);
+ RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE -
metadata_size, res, io_ctx,
+ &bytes_read));
DCHECK_EQ(bytes_read, metadata_size);
RETURN_IF_ERROR(deserialize_thrift_msg(meta_buff, &metadata_size, true,
&t_metadata));
file_metadata.reset(new FileMetaData(t_metadata));
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 75f43b4730..a7d95225b6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -26,7 +26,7 @@
namespace doris::vectorized {
-Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
+Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
const ParquetReadColumn& column,
const tparquet::RowGroup& row_group,
const std::vector<RowRange>& row_ranges,
cctz::time_zone* ctz,
@@ -80,8 +80,8 @@ void ParquetColumnReader::_generate_read_ranges(int64_t
start_index, int64_t end
}
}
-Status ScalarColumnReader::init(FileReader* file, FieldSchema* field,
tparquet::ColumnChunk* chunk,
- size_t max_buf_size) {
+Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+ tparquet::ColumnChunk* chunk, size_t
max_buf_size) {
_stream_reader =
new BufferedFileStreamReader(file, _metadata->start_offset(),
_metadata->size(),
std::min((size_t)_metadata->size(),
max_buf_size));
@@ -273,8 +273,8 @@ void ArrayColumnReader::_reserve_def_levels_buf(size_t
size) {
}
}
-Status ArrayColumnReader::init(FileReader* file, FieldSchema* field,
tparquet::ColumnChunk* chunk,
- size_t max_buf_size) {
+Status ArrayColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+ tparquet::ColumnChunk* chunk, size_t
max_buf_size) {
_stream_reader =
new BufferedFileStreamReader(file, _metadata->start_offset(),
_metadata->size(),
std::min((size_t)_metadata->size(),
max_buf_size));
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index de0ec185b9..0f61c5742c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -107,8 +107,8 @@ public:
virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size,
size_t* read_rows, bool* eof) = 0;
- static Status create(FileReader* file, FieldSchema* field, const
ParquetReadColumn& column,
- const tparquet::RowGroup& row_group,
+ static Status create(io::FileReaderSPtr file, FieldSchema* field,
+ const ParquetReadColumn& column, const
tparquet::RowGroup& row_group,
const std::vector<RowRange>& row_ranges,
cctz::time_zone* ctz,
std::unique_ptr<ParquetColumnReader>& reader, size_t
max_buf_size);
void init_column_metadata(const tparquet::ColumnChunk& chunk);
@@ -139,7 +139,7 @@ public:
ScalarColumnReader(const std::vector<RowRange>& row_ranges,
cctz::time_zone* ctz)
: ParquetColumnReader(row_ranges, ctz) {};
~ScalarColumnReader() override { close(); };
- Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk*
chunk,
+ Status init(io::FileReaderSPtr file, FieldSchema* field,
tparquet::ColumnChunk* chunk,
size_t max_buf_size);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size, size_t* read_rows,
@@ -155,7 +155,7 @@ public:
ArrayColumnReader(const std::vector<RowRange>& row_ranges,
cctz::time_zone* ctz)
: ParquetColumnReader(row_ranges, ctz) {};
~ArrayColumnReader() override { close(); };
- Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk*
chunk,
+ Status init(io::FileReaderSPtr file, FieldSchema* field,
tparquet::ColumnChunk* chunk,
size_t max_buf_size);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t
batch_size, size_t* read_rows,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 24441896a9..b406919b4c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -25,7 +25,7 @@ namespace doris::vectorized {
const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
-RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
+RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
const std::vector<ParquetReadColumn>&
read_columns,
const int32_t row_group_id, const
tparquet::RowGroup& row_group,
cctz::time_zone* ctz,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 53e8a052c0..d53e32de2a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -19,6 +19,7 @@
#include "exec/text_converter.h"
#include "io/file_reader.h"
+#include "io/fs/file_reader.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_context.h"
#include "vparquet_column_reader.h"
@@ -99,7 +100,7 @@ public:
PositionDeleteContext(const PositionDeleteContext& filter) = default;
};
- RowGroupReader(doris::FileReader* file_reader,
+ RowGroupReader(io::FileReaderSPtr file_reader,
const std::vector<ParquetReadColumn>& read_columns, const
int32_t row_group_id,
const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
const PositionDeleteContext& position_delete_ctx,
@@ -131,7 +132,7 @@ private:
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContext*>&
missing_columns);
- doris::FileReader* _file_reader;
+ io::FileReaderSPtr _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_column_readers;
const std::vector<ParquetReadColumn>& _read_columns;
const int32_t _row_group_id;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 3d00f52355..ecf4d23259 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -21,6 +21,7 @@
#include "common/status.h"
#include "io/file_factory.h"
+#include "olap/iterators.h"
#include "parquet_pred_cmp.h"
#include "parquet_thrift_util.h"
#include "vec/exprs/vbloom_predicate.h"
@@ -135,18 +136,26 @@ void ParquetReader::close() {
}
Status ParquetReader::_open_file() {
+ FileSystemProperties system_properties;
+ system_properties.system_type = _scan_params.file_type;
+ system_properties.properties = _scan_params.properties;
+ system_properties.hdfs_params = _scan_params.hdfs_params;
+
+ FileDescription file_description;
+ file_description.path = _scan_range.path;
+ file_description.start_offset = _scan_range.start_offset;
+ file_description.file_size = _scan_range.__isset.file_size ?
_scan_range.file_size : 0;
+
if (_file_reader == nullptr) {
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_scan_params, _scan_range.path,
-
_scan_range.start_offset,
- _scan_range.file_size,
0, _file_reader));
+ RETURN_IF_ERROR(FileFactory::create_file_reader(
+ _profile, system_properties, file_description, &_file_system,
&_file_reader));
}
if (_file_metadata == nullptr) {
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
- RETURN_IF_ERROR(_file_reader->open());
if (_file_reader->size() == 0) {
return Status::EndOfFile("Empty Parquet File");
}
- RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(),
_file_metadata));
+ RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
}
return Status::OK();
}
@@ -403,7 +412,7 @@ Status ParquetReader::_next_row_group_reader() {
RowGroupReader::PositionDeleteContext position_delete_ctx =
_get_position_delete_ctx(row_group, row_group_index);
- _current_group_reader.reset(new RowGroupReader(_file_reader.get(),
_read_columns,
+ _current_group_reader.reset(new RowGroupReader(_file_reader, _read_columns,
row_group_index.row_group_id, row_group, _ctz,
position_delete_ctx,
_lazy_read_ctx));
return _current_group_reader->init(_file_metadata->schema(),
candidate_row_ranges,
@@ -493,16 +502,17 @@ Status ParquetReader::_process_page_index(const
tparquet::RowGroup& row_group,
return Status::OK();
}
uint8_t col_index_buff[page_index._column_index_size];
- int64_t bytes_read = 0;
- RETURN_IF_ERROR(_file_reader->readat(page_index._column_index_start,
- page_index._column_index_size,
&bytes_read,
- col_index_buff));
+ size_t bytes_read = 0;
+ Slice result(col_index_buff, page_index._column_index_size);
+ IOContext io_ctx;
+ RETURN_IF_ERROR(
+ _file_reader->read_at(page_index._column_index_start, result,
io_ctx, &bytes_read));
auto& schema_desc = _file_metadata->schema();
std::vector<RowRange> skipped_row_ranges;
uint8_t off_index_buff[page_index._offset_index_size];
- RETURN_IF_ERROR(_file_reader->readat(page_index._offset_index_start,
- page_index._offset_index_size,
&bytes_read,
- off_index_buff));
+ Slice res(off_index_buff, page_index._offset_index_size);
+ RETURN_IF_ERROR(
+ _file_reader->read_at(page_index._offset_index_start, res, io_ctx,
&bytes_read));
for (auto& read_col : _read_columns) {
auto conjunct_iter =
_colname_to_value_range->find(read_col._file_slot_name);
if (_colname_to_value_range->end() == conjunct_iter) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 2bfc74f823..2f2d37aec5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -26,7 +26,8 @@
#include "common/status.h"
#include "exec/olap_common.h"
#include "gen_cpp/parquet_types.h"
-#include "io/file_reader.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_system.h"
#include "vec/core/block.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exprs/vexpr_context.h"
@@ -61,7 +62,7 @@ public:
~ParquetReader() override;
// for test
- void set_file_reader(FileReader* file_reader) {
_file_reader.reset(file_reader); }
+ void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader =
file_reader; }
Status init_reader(const std::vector<std::string>& column_names, bool
filter_groups = true) {
// without predicate
@@ -151,7 +152,8 @@ private:
RuntimeProfile* _profile;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
- std::unique_ptr<FileReader> _file_reader = nullptr;
+ std::unique_ptr<io::FileSystem> _file_system = nullptr;
+ io::FileReaderSPtr _file_reader = nullptr;
std::shared_ptr<FileMetaData> _file_metadata;
const tparquet::FileMetaData* _t_metadata;
std::unique_ptr<RowGroupReader> _current_group_reader;
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp
b/be/test/runtime/routine_load_task_executor_test.cpp
index 2e5a1e93c6..4cc3122714 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -23,7 +23,7 @@
#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "util/cpu_info.h"
@@ -38,8 +38,8 @@ extern TStreamLoadPutResult k_stream_load_put_result;
class RoutineLoadTaskExecutorTest : public testing::Test {
public:
- RoutineLoadTaskExecutorTest() {}
- virtual ~RoutineLoadTaskExecutorTest() {}
+ RoutineLoadTaskExecutorTest() = default;
+ ~RoutineLoadTaskExecutorTest() override = default;
void SetUp() override {
k_stream_load_begin_result = TLoadTxnBeginResult();
@@ -47,24 +47,20 @@ public:
k_stream_load_rollback_result = TLoadTxnRollbackResult();
k_stream_load_put_result = TStreamLoadPutResult();
- _env._master_info = new TMasterInfo();
- _env._load_stream_mgr = new LoadStreamMgr();
- _env._stream_load_executor = new StreamLoadExecutor(&_env);
+ _env.set_master_info(new TMasterInfo());
+ _env.set_new_load_stream_mgr(new NewLoadStreamMgr());
+ _env.set_stream_load_executor(new StreamLoadExecutor(&_env));
config::routine_load_thread_pool_size = 5;
config::max_consumer_num_per_group = 3;
}
void TearDown() override {
- delete _env._master_info;
- _env._master_info = nullptr;
- delete _env._load_stream_mgr;
- _env._load_stream_mgr = nullptr;
- delete _env._stream_load_executor;
- _env._stream_load_executor = nullptr;
+ delete _env.master_info();
+ delete _env.new_load_stream_mgr();
+ delete _env.stream_load_executor();
}
-private:
ExecEnv _env;
};
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 1424cf3203..319feb0b8c 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -18,7 +18,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "io/local_file_reader.h"
+#include "io/fs/local_file_system.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/data_types/data_type_factory.hpp"
@@ -89,8 +89,9 @@ TEST_F(ParquetReaderTest, normal) {
DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
- LocalFileReader* reader =
- new
LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
0);
+ io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+ io::FileReaderSPtr reader;
+
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
&reader);
cctz::time_zone ctz;
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index d1f59e9234..f0712cf7c5 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -24,8 +24,8 @@
#include "exec/schema_scanner.h"
#include "io/buffered_reader.h"
-#include "io/file_reader.h"
-#include "io/local_file_reader.h"
+#include "io/fs/local_file_system.h"
+#include "olap/iterators.h"
#include "runtime/string_value.h"
#include "util/runtime_profile.h"
#include "util/timezone_utils.h"
@@ -47,13 +47,14 @@ public:
};
TEST_F(ParquetThriftReaderTest, normal) {
- LocalFileReader
reader("./be/test/exec/test_data/parquet_scanner/localfile.parquet", 0);
-
- auto st = reader.open();
+ io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+ io::FileReaderSPtr reader;
+ auto st =
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/localfile.parquet",
+ &reader);
EXPECT_TRUE(st.ok());
std::shared_ptr<FileMetaData> meta_data;
- parse_thrift_footer(&reader, meta_data);
+ parse_thrift_footer(reader, meta_data);
tparquet::FileMetaData t_metadata = meta_data->to_thrift();
LOG(WARNING) << "=====================================";
@@ -77,13 +78,15 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
// `hobby` array<map<string,string>>,
// `friend` map<string,string>,
// `mark` struct<math:int,english:int>)
- LocalFileReader
reader("./be/test/exec/test_data/parquet_scanner/hive-complex.parquet", 0);
- auto st = reader.open();
+ io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+ io::FileReaderSPtr reader;
+ auto st =
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/hive-complex.parquet",
+ &reader);
EXPECT_TRUE(st.ok());
std::shared_ptr<FileMetaData> metadata;
- parse_thrift_footer(&reader, metadata);
+ parse_thrift_footer(reader, metadata);
tparquet::FileMetaData t_metadata = metadata->to_thrift();
FieldDescriptor schemaDescriptor;
schemaDescriptor.parse_from_thrift(t_metadata.schema);
@@ -146,7 +149,7 @@ static int fill_nullable_column(ColumnPtr& doris_column,
level_t* definitions, s
return null_cnt;
}
-static Status get_column_values(FileReader* file_reader,
tparquet::ColumnChunk* column_chunk,
+static Status get_column_values(io::FileReaderSPtr file_reader,
tparquet::ColumnChunk* column_chunk,
FieldSchema* field_schema, ColumnPtr&
doris_column,
DataTypePtr& data_type, level_t* definitions) {
tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
@@ -280,14 +283,15 @@ static void read_parquet_data_and_check(const
std::string& parquet_file,
* `list_string` array<string>) // 14
*/
- LocalFileReader reader(parquet_file, 0);
- auto st = reader.open();
+ io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+ io::FileReaderSPtr reader;
+ auto st = local_fs->open_file(parquet_file, &reader);
EXPECT_TRUE(st.ok());
std::unique_ptr<vectorized::Block> block;
create_block(block);
std::shared_ptr<FileMetaData> metadata;
- parse_thrift_footer(&reader, metadata);
+ parse_thrift_footer(reader, metadata);
tparquet::FileMetaData t_metadata = metadata->to_thrift();
FieldDescriptor schema_descriptor;
schema_descriptor.parse_from_thrift(t_metadata.schema);
@@ -297,7 +301,7 @@ static void read_parquet_data_and_check(const std::string&
parquet_file,
auto& column_name_with_type = block->get_by_position(c);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[c],
+ get_column_values(reader, &t_metadata.row_groups[0].columns[c],
const_cast<FieldSchema*>(schema_descriptor.get_column(c)), data_column,
data_type, defs);
}
@@ -306,7 +310,7 @@ static void read_parquet_data_and_check(const std::string&
parquet_file,
auto& column_name_with_type = block->get_by_position(14);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[13],
+ get_column_values(reader, &t_metadata.row_groups[0].columns[13],
const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column,
data_type, defs);
}
@@ -315,19 +319,20 @@ static void read_parquet_data_and_check(const
std::string& parquet_file,
auto& column_name_with_type = block->get_by_position(15);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[9],
+ get_column_values(reader, &t_metadata.row_groups[0].columns[9],
const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column,
data_type, defs);
}
- LocalFileReader result(result_file, 0);
- auto rst = result.open();
+ io::FileReaderSPtr result;
+ auto rst = local_fs->open_file(result_file, &result);
EXPECT_TRUE(rst.ok());
- uint8_t result_buf[result.size() + 1];
- result_buf[result.size()] = '\0';
- int64_t bytes_read;
- bool eof;
- result.read(result_buf, result.size(), &bytes_read, &eof);
+ uint8_t result_buf[result->size() + 1];
+ result_buf[result->size()] = '\0';
+ size_t bytes_read;
+ Slice res(result_buf, result->size());
+ IOContext io_ctx;
+ result->read_at(0, res, io_ctx, &bytes_read);
ASSERT_STREQ(block->dump_data(0, rows).c_str(),
reinterpret_cast<char*>(result_buf));
}
@@ -400,13 +405,15 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
lazy_read_ctx.all_read_columns.emplace_back(slot->col_name());
read_columns.emplace_back(ParquetReadColumn(7, slot->col_name()));
}
- LocalFileReader
file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
- auto st = file_reader.open();
+ io::FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+ io::FileReaderSPtr file_reader;
+ auto st =
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
+ &file_reader);
EXPECT_TRUE(st.ok());
// prepare metadata
std::shared_ptr<FileMetaData> meta_data;
- parse_thrift_footer(&file_reader, meta_data);
+ parse_thrift_footer(file_reader, meta_data);
tparquet::FileMetaData t_metadata = meta_data->to_thrift();
cctz::time_zone ctz;
@@ -414,10 +421,11 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
auto row_group = t_metadata.row_groups[0];
std::shared_ptr<RowGroupReader> row_group_reader;
RowGroupReader::PositionDeleteContext
position_delete_ctx(row_group.num_rows, 0);
- row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0,
row_group, &ctz,
+ row_group_reader.reset(new RowGroupReader(file_reader, read_columns, 0,
row_group, &ctz,
position_delete_ctx,
lazy_read_ctx));
std::vector<RowRange> row_ranges;
row_ranges.emplace_back(0, row_group.num_rows);
+
auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>();
auto stg = row_group_reader->init(meta_data->schema(), row_ranges,
col_offsets);
EXPECT_TRUE(stg.ok());
@@ -435,14 +443,16 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
auto stb = row_group_reader->next_batch(&block, 1024, &read_rows,
&batch_eof);
EXPECT_TRUE(stb.ok());
- LocalFileReader
result("./be/test/exec/test_data/parquet_scanner/group-reader.txt", 0);
- auto rst = result.open();
+ io::FileReaderSPtr result;
+ auto rst =
local_fs->open_file("./be/test/exec/test_data/parquet_scanner/group-reader.txt",
+ &result);
EXPECT_TRUE(rst.ok());
- uint8_t result_buf[result.size() + 1];
- result_buf[result.size()] = '\0';
- int64_t bytes_read;
- bool eof;
- result.read(result_buf, result.size(), &bytes_read, &eof);
+ uint8_t result_buf[result->size() + 1];
+ result_buf[result->size()] = '\0';
+ size_t bytes_read;
+ Slice res(result_buf, result->size());
+ IOContext io_ctx;
+ result->read_at(0, res, io_ctx, &bytes_read);
ASSERT_STREQ(block.dump_data(0, 10).c_str(),
reinterpret_cast<char*>(result_buf));
}
} // namespace vectorized
diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out
b/regression-test/data/load_p0/stream_load/test_json_load.out
index ebf706f594..d0de96b7d7 100644
--- a/regression-test/data/load_p0/stream_load/test_json_load.out
+++ b/regression-test/data/load_p0/stream_load/test_json_load.out
@@ -12,32 +12,6 @@
10 hefei 23456710
200 changsha 3456789
--- !select1 --
-1 beijing 2345671
-2 shanghai 2345672
-3 guangzhou 2345673
-4 shenzhen 2345674
-5 hangzhou 2345675
-6 nanjing 2345676
-7 wuhan 2345677
-8 chengdu 2345678
-9 xian 2345679
-10 hefei 23456710
-200 changsha 3456789
-
--- !select2 --
-10 beijing 2345671
-20 shanghai 2345672
-30 guangzhou 2345673
-40 shenzhen 2345674
-50 hangzhou 2345675
-60 nanjing 2345676
-70 wuhan 2345677
-80 chengdu 2345678
-90 xian 2345679
-100 hefei 23456710
-200 changsha 3456789
-
-- !select2 --
10 beijing 2345671
20 shanghai 2345672
@@ -64,32 +38,6 @@
10 23456710
200 755
--- !select3 --
-1 2345671
-2 2345672
-3 2345673
-4 2345674
-5 2345675
-6 2345676
-7 2345677
-8 2345678
-9 2345679
-10 23456710
-200 755
-
--- !select4 --
-1 210
-2 220
-3 230
-4 240
-5 250
-6 260
-7 270
-8 280
-9 290
-10 300
-200 755
-
-- !select4 --
1 210
2 220
@@ -116,32 +64,6 @@
10 2345676
200 755
--- !select5 --
-1 1454547
-2 1244264
-3 528369
-4 594201
-5 594201
-6 2345672
-7 2345673
-8 2345674
-9 2345675
-10 2345676
-200 755
-
--- !select6 --
-10 1454547
-20 1244264
-30 528369
-40 594201
-50 594201
-60 2345672
-70 2345673
-80 2345674
-90 2345675
-100 2345676
-200 755
-
-- !select6 --
10 1454547
20 1244264
@@ -163,22 +85,6 @@
100 2345676
200 755
--- !select7 --
-60 2345672
-70 2345673
-80 2345674
-90 2345675
-100 2345676
-200 755
-
--- !select8 --
-60 2345672
-70 2345673
-80 2345674
-90 2345675
-100 2345676
-200 755
-
-- !select8 --
60 2345672
70 2345673
@@ -195,33 +101,9 @@
50 guangzhou 2345675
200 changsha 3456789
--- !select9 --
-10 beijing 2345671
-20 shanghai 2345672
-30 hangzhou 2345673
-40 shenzhen 2345674
-50 guangzhou 2345675
-200 changsha 3456789
-
-- !select10 --
200 changsha 3456789
--- !select10 --
-200 changsha 3456789
-
--- !select11 --
-1 beijing 2345671
-2 shanghai 2345672
-3 guangzhou 2345673
-4 shenzhen 2345674
-5 hangzhou 2345675
-6 nanjing 2345676
-7 wuhan 2345677
-8 chengdu 2345678
-9 xian 2345679
-10 hefei 23456710
-200 changsha 3456789
-
-- !select11 --
1 beijing 2345671
2 shanghai 2345672
@@ -248,28 +130,6 @@
10 \N 23456710
200 changsha 3456789
--- !select12 --
-1 \N 2345671
-2 shanghai 2345672
-3 beijing 2345673
-4 shenzhen 2345674
-5 hangzhou 2345675
-6 nanjing 2345676
-7 \N 2345677
-8 chengdu 2345678
-9 \N 2345679
-10 \N 23456710
-200 changsha 3456789
-
--- !select13 --
-2 shanghai 2345672
-3 beijing 2345673
-4 shenzhen 2345674
-5 hangzhou 2345675
-6 nanjing 2345676
-8 chengdu 2345678
-200 hangzhou 12345
-
-- !select13 --
2 shanghai 2345672
3 beijing 2345673
@@ -287,22 +147,6 @@
50 2345675 \N
200 changsha 3456789
--- !select14 --
-10 2345671 \N
-20 2345672 \N
-30 2345673 \N
-40 2345674 \N
-50 2345675 \N
-200 changsha 3456789
-
--- !select15 --
-10 beijing 2345671
-20 shanghai 2345672
-30 hangzhou 2345673
-40 shenzhen 2345674
-50 guangzhou 2345675
-200 changsha 3456789
-
-- !select15 --
10 beijing 2345671
20 shanghai 2345672
@@ -320,12 +164,3 @@
6 fuyang 2345676
200 changsha 3456789
--- !select16 --
-1 xihu 2345671
-2 xiaoshan 2345672
-3 binjiang 2345673
-4 shangcheng 2345674
-5 tonglu 2345675
-6 fuyang 2345676
-200 changsha 3456789
-
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index 513d3e14dd..d2c5d15fe0 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -114,7 +114,7 @@ suite("test_json_load", "p0") {
assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
}
- def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag,
format_flag, exprs, json_paths,
+ def load_json_data = {label, strip_flag, read_flag, format_flag, exprs,
json_paths,
json_root, where_expr, fuzzy_flag, file_name,
ignore_failure=false ->
// load the json data
@@ -207,17 +207,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case1', 'true', '',
'json', '', '', '', '', '', 'simple_json.json')
-
- sql "sync"
- qt_select1 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case1_2', 'true', '',
'json', '', '', '', '', '', 'simple_json.json')
+ load_json_data.call('test_json_load_case1_2', 'true', '', 'json', '',
'', '', '', '', 'simple_json.json')
sql "sync"
qt_select1 "select * from ${testTable} order by id"
@@ -232,17 +222,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case2', 'true', '',
'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
-
- sql "sync"
- qt_select2 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case2_2', 'true', '',
'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
+ load_json_data.call('test_json_load_case2_2', 'true', '', 'json', 'id=
id * 10', '', '', '', '', 'simple_json.json')
sql "sync"
qt_select2 "select * from ${testTable} order by id"
@@ -257,18 +237,7 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('false', 'test_json_load_case3', 'true', '',
'json', '', '[\"$.id\", \"$.code\"]',
- '', '', '', 'simple_json.json')
-
- sql "sync"
- qt_select3 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table2.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case3_2', 'true', '',
'json', '', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('test_json_load_case3_2', 'true', '', 'json', '',
'[\"$.id\", \"$.code\"]',
'', '', '', 'simple_json.json')
sql "sync"
@@ -284,18 +253,7 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('false', 'test_json_load_case4', 'true', '',
'json', 'code = id * 10 + 200', '[\"$.id\"]',
- '', '', '', 'simple_json.json')
-
- sql "sync"
- qt_select4 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table2.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case4_2', 'true', '',
'json', 'code = id * 10 + 200', '[\"$.id\"]',
+ load_json_data.call('test_json_load_case4_2', 'true', '', 'json',
'code = id * 10 + 200', '[\"$.id\"]',
'', '', '', 'simple_json.json')
sql "sync"
@@ -311,18 +269,7 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('false', 'test_json_load_case5', 'true', 'true',
'json', '', '[\"$.id\", \"$.code\"]',
- '', '', '', 'multi_line_json.json')
-
- sql "sync"
- qt_select5 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table2.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case5_2', 'true', 'true',
'json', '', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('test_json_load_case5_2', 'true', 'true', 'json',
'', '[\"$.id\", \"$.code\"]',
'', '', '', 'multi_line_json.json')
sql "sync"
@@ -338,19 +285,7 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('false', 'test_json_load_case6', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
- '', '', '', 'multi_line_json.json')
-
- sql "sync"
- qt_select6 "select * from ${testTable} order by id"
-
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table2.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case6_2', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('test_json_load_case6_2', 'true', 'true', 'json',
'id= id * 10', '[\"$.id\", \"$.code\"]',
'', '', '', 'multi_line_json.json')
sql "sync"
@@ -366,18 +301,7 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('false', 'test_json_load_case7', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
- '', 'id > 50', '', 'multi_line_json.json')
-
- sql "sync"
- qt_select7 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table2.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case7_2', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('test_json_load_case7_2', 'true', 'true', 'json',
'id= id * 10', '[\"$.id\", \"$.code\"]',
'', 'id > 50', '', 'multi_line_json.json')
sql "sync"
@@ -393,19 +317,7 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
- load_json_data.call('false', 'test_json_load_case8', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
- '', 'id > 50', 'true', 'multi_line_json.json')
-
- sql "sync"
- qt_select8 "select * from ${testTable} order by id"
-
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table2.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case8_2', 'true', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('test_json_load_case8_2', 'true', 'true', 'json',
'id= id * 10', '[\"$.id\", \"$.code\"]',
'', 'id > 50', 'true', 'multi_line_json.json')
sql "sync"
@@ -421,18 +333,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case9', '', 'true',
'json', 'id= id * 10', '',
- '$.item', '', 'true', 'nest_json.json')
-
- sql "sync"
- qt_select9 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case9_2', '', 'true',
'json', 'id= id * 10', '',
+ load_json_data.call('test_json_load_case9_2', '', 'true', 'json', 'id=
id * 10', '',
'$.item', '', 'true', 'nest_json.json')
sql "sync"
@@ -448,19 +349,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case10', '', 'true',
'json', 'id= id * 10', '',
- '$.item', '', 'true', 'invalid_json.json', true)
-
- sql "sync"
- qt_select10 "select * from ${testTable} order by id"
-
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case10_2', '', 'true',
'json', 'id= id * 10', '',
+ load_json_data.call('test_json_load_case10_2', '', 'true', 'json',
'id= id * 10', '',
'$.item', '', 'true', 'invalid_json.json', true)
sql "sync"
@@ -476,17 +365,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case11', 'true', '',
'json', '', '', '', '', '', 'simple_json2.json')
-
- sql "sync"
- qt_select11 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case11_2', 'true', '',
'json', '', '', '', '', '', 'simple_json2.json')
+ load_json_data.call('test_json_load_case11_2', 'true', '', 'json', '',
'', '', '', '', 'simple_json2.json')
sql "sync"
qt_select11 "select * from ${testTable} order by id"
@@ -501,17 +380,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case12', 'true', '',
'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
-
- sql "sync"
- qt_select12 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case12_2', 'true', '',
'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
+ load_json_data.call('test_json_load_case12_2', 'true', '', 'json', '',
'', '', '', '', 'simple_json2_lack_one_column.json')
sql "sync"
qt_select12 "select * from ${testTable} order by id"
@@ -553,38 +422,6 @@ suite("test_json_load", "p0") {
sql "sync"
qt_select13 "select * from ${testTable} order by id"
-
- sql "DROP TABLE IF EXISTS ${testTable}"
- create_test_table3.call(testTable)
- // load the json data
- streamLoad {
- table "${testTable}"
-
- // set http request header params
- set 'strip_outer_array', "true"
- set 'format', "json"
- set 'max_filter_ratio', '1'
- file "simple_json2_lack_one_column.json" // import json file
- time 10000 // limit inflight 10s
-
- // if declared a check callback, the default check condition will
ignore.
- // So you must check all condition
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- log.info("Stream load result: ${result}".toString())
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows + json.NumberFilteredRows)
- assertEquals(json.NumberFilteredRows, 4)
- assertEquals(json.NumberLoadedRows, 6)
- assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
- }
- }
- sql "sync"
- qt_select13 "select * from ${testTable} order by id"
-
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
@@ -595,18 +432,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case14', '', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
- '$.item', '', 'true', 'nest_json.json')
-
- sql "sync"
- qt_select14 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case14_2', '', 'true',
'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
+ load_json_data.call('test_json_load_case14_2', '', 'true', 'json',
'id= id * 10', '[\"$.id\", \"$.code\"]',
'$.item', '', 'true', 'nest_json.json')
sql "sync"
@@ -622,18 +448,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case15', '', 'true',
'json', 'id, code, city, id= id * 10',
- '[\"$.id\", \"$.code\", \"$.city\"]', '$.item',
'', 'true', 'nest_json.json')
-
- sql "sync"
- qt_select15 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case15_2', '', 'true',
'json', 'id, code, city,id= id * 10',
+ load_json_data.call('test_json_load_case15_2', '', 'true', 'json',
'id, code, city,id= id * 10',
'[\"$.id\", \"$.code\", \"$.city\"]', '$.item',
'', 'true', 'nest_json.json')
sql "sync"
@@ -649,18 +464,7 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
- load_json_data.call('false', 'test_json_load_case16', 'true', '',
'json', 'id, code, city',
- '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item',
'', 'true', 'nest_json_array.json')
-
- sql "sync"
- qt_select16 "select * from ${testTable} order by id"
-
- // test new json reader
- sql "DROP TABLE IF EXISTS ${testTable}"
-
- create_test_table1.call(testTable)
-
- load_json_data.call('true', 'test_json_load_case16_2', 'true', '',
'json', 'id, code, city',
+ load_json_data.call('test_json_load_case16_2', 'true', '', 'json',
'id, code, city',
'[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item',
'', 'true', 'nest_json_array.json')
sql "sync"
@@ -714,7 +518,7 @@ suite("test_json_load", "p0") {
sql "DROP TABLE IF EXISTS ${testTable}"
test_invalid_json_array_table.call(testTable)
- load_json_data.call('false', 'test_json_load_case19', 'true', '',
'json', '', '',
+ load_json_data.call('test_json_load_case19', 'true', '', 'json',
'', '',
'', '', '', 'invalid_json_array.json', true)
sql "sync"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]