This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3d0beec01d [fix](orc) fix heap-use-after-free and potential memory
leak of orc reader (#17431)
3d0beec01d is described below
commit 3d0beec01d7a12ea0a8a16bcfba07344be5cb19c
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Mar 6 08:42:35 2023 +0800
[fix](orc) fix heap-use-after-free and potential memory leak of orc reader
(#17431)
fix heap-use-after-free
The OrcReader has a internal FileInputStream, If the file is empty, the
memory of FileInputStream will leak.
Besides, there is a Statistics instance in FileInputStream. FileInputStream
maybe delete if the orc reader
is inited failed, but Statistics maybe used when orc reader is closed,
causing heap-use-after-free error.
Potential memory leak
When init file scanner in file scan node, the file scanner prepare failed,
the memory of file scanner will leak.
---
be/src/common/signal_handler.h | 2 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 89 ++++++++++++-----------------
be/src/vec/exec/format/orc/vorc_reader.h | 73 +++++++++++------------
be/src/vec/exec/scan/new_file_scan_node.cpp | 16 ++----
be/src/vec/exec/scan/new_file_scan_node.h | 2 -
be/src/vec/exec/scan/vmeta_scan_node.cpp | 2 +-
6 files changed, 83 insertions(+), 101 deletions(-)
diff --git a/be/src/common/signal_handler.h b/be/src/common/signal_handler.h
index e88829441d..52cc797140 100644
--- a/be/src/common/signal_handler.h
+++ b/be/src/common/signal_handler.h
@@ -300,7 +300,7 @@ void DumpSignalInfo(int signal_number, siginfo_t* siginfo) {
if (reason != nullptr) {
formatter.AppendString(reason);
} else {
- formatter.AppendString("unkown detail explain");
+ formatter.AppendString("unknown detail explain");
}
formatter.AppendString(" (@0x");
formatter.AppendUint64(reinterpret_cast<uintptr_t>(siginfo->si_addr), 16);
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 5ff453df06..6df70e0f16 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -48,9 +48,9 @@ namespace doris::vectorized {
M(TypeIndex::Float64, Float64, orc::DoubleVectorBatch)
void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
- _statistics.read_calls++;
- _statistics.read_bytes += length;
- SCOPED_RAW_TIMER(&_statistics.read_time);
+ _statistics->fs_read_calls++;
+ _statistics->fs_read_bytes += length;
+ SCOPED_RAW_TIMER(&_statistics->fs_read_time);
uint64_t has_read = 0;
char* out = reinterpret_cast<char*>(buf);
IOContext io_ctx;
@@ -113,23 +113,28 @@ OrcReader::~OrcReader() {
void OrcReader::close() {
if (!_closed) {
- if (_profile != nullptr) {
- if (_file_reader != nullptr) {
- auto& fst = _file_reader->statistics();
- COUNTER_UPDATE(_orc_profile.read_time, fst.read_time);
- COUNTER_UPDATE(_orc_profile.read_calls, fst.read_calls);
- COUNTER_UPDATE(_orc_profile.read_bytes, fst.read_bytes);
- }
- COUNTER_UPDATE(_orc_profile.column_read_time,
_statistics.column_read_time);
- COUNTER_UPDATE(_orc_profile.get_batch_time,
_statistics.get_batch_time);
- COUNTER_UPDATE(_orc_profile.parse_meta_time,
_statistics.parse_meta_time);
- COUNTER_UPDATE(_orc_profile.decode_value_time,
_statistics.decode_value_time);
- COUNTER_UPDATE(_orc_profile.decode_null_map_time,
_statistics.decode_null_map_time);
- }
+ _collect_profile_on_close();
_closed = true;
}
}
+void OrcReader::_collect_profile_on_close() {
+ if (_profile != nullptr) {
+ COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
+ COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
+ COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes);
+ COUNTER_UPDATE(_orc_profile.column_read_time,
_statistics.column_read_time);
+ COUNTER_UPDATE(_orc_profile.get_batch_time,
_statistics.get_batch_time);
+ COUNTER_UPDATE(_orc_profile.parse_meta_time,
_statistics.parse_meta_time);
+ COUNTER_UPDATE(_orc_profile.decode_value_time,
_statistics.decode_value_time);
+ COUNTER_UPDATE(_orc_profile.decode_null_map_time,
_statistics.decode_null_map_time);
+ }
+}
+
+int64_t OrcReader::size() const {
+ return _file_input_stream->getLength();
+}
+
void OrcReader::_init_profile() {
if (_profile != nullptr) {
static const char* orc_profile = "OrcReader";
@@ -146,33 +151,33 @@ void OrcReader::_init_profile() {
}
}
-Status OrcReader::init_reader(
- std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
- SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
- if (_file_reader == nullptr) {
+Status OrcReader::_create_file_reader() {
+ if (_file_input_stream == nullptr) {
io::FileReaderSPtr inner_reader;
-
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_system_properties,
_file_description,
&_file_system,
&inner_reader,
_io_ctx));
-
- _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
+ _file_input_stream.reset(
+ new ORCFileInputStream(_scan_range.path, inner_reader,
&_statistics));
}
- if (_file_reader->getLength() == 0) {
- return Status::EndOfFile("init reader failed, empty orc file: " +
_scan_range.path);
+ if (_file_input_stream->getLength() == 0) {
+ return Status::EndOfFile("empty orc file: " + _scan_range.path);
}
-
// create orc reader
try {
orc::ReaderOptions options;
- _reader =
orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options);
+ _reader = orc::createReader(
+
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
} catch (std::exception& e) {
return Status::InternalError("Init OrcReader failed. reason = {}",
e.what());
}
- if (_reader->getNumberOfRows() == 0) {
- return Status::EndOfFile("init reader failed, empty orc file with row
num 0: " +
- _scan_range.path);
- }
+ return Status::OK();
+}
+
+Status OrcReader::init_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+ RETURN_IF_ERROR(_create_file_reader());
// _init_bloom_filter(colname_to_value_range);
// create orc row reader
@@ -206,27 +211,7 @@ Status OrcReader::init_reader(
Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
- if (_file_reader == nullptr) {
- io::FileReaderSPtr inner_reader;
-
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_system_properties,
- _file_description,
&_file_system,
- &inner_reader,
_io_ctx));
-
- _file_reader = new ORCFileInputStream(_scan_range.path, inner_reader);
- }
- if (_file_reader->getLength() == 0) {
- return Status::EndOfFile("get parsed schema fail, empty orc file: " +
_scan_range.path);
- }
-
- // create orc reader
- try {
- orc::ReaderOptions options;
- _reader =
orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options);
- } catch (std::exception& e) {
- return Status::InternalError("Init OrcReader failed. reason = {}",
e.what());
- }
-
+ RETURN_IF_ERROR(_create_file_reader());
auto& root_type = _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 4df04357fb..741addc345 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -31,38 +31,13 @@
namespace doris::vectorized {
-class ORCFileInputStream : public orc::InputStream {
-public:
- struct Statistics {
- int64_t read_time = 0;
- int64_t read_calls = 0;
- int64_t read_bytes = 0;
- };
-
- ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr
file_reader)
- : _file_name(file_name), _file_reader(file_reader) {}
-
- ~ORCFileInputStream() override = default;
-
- uint64_t getLength() const override { return _file_reader->size(); }
-
- uint64_t getNaturalReadSize() const override { return
config::orc_natural_read_size_mb << 20; }
-
- void read(void* buf, uint64_t length, uint64_t offset) override;
-
- const std::string& getName() const override { return _file_name; }
-
- Statistics& statistics() { return _statistics; }
-
-private:
- Statistics _statistics;
- const std::string& _file_name;
- io::FileReaderSPtr _file_reader;
-};
-
+class ORCFileInputStream;
class OrcReader : public GenericReader {
public:
struct Statistics {
+ int64_t fs_read_time = 0;
+ int64_t fs_read_calls = 0;
+ int64_t fs_read_bytes = 0;
int64_t column_read_time = 0;
int64_t get_batch_time = 0;
int64_t parse_meta_time = 0;
@@ -79,10 +54,6 @@ public:
IOContext* io_ctx);
~OrcReader() override;
- // for test
- void set_file_reader(const std::string& file_name, io::FileReaderSPtr
file_reader) {
- _file_reader = new ORCFileInputStream(file_name, file_reader);
- }
Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
@@ -91,7 +62,7 @@ public:
void close();
- int64_t size() const { return _file_reader->getLength(); }
+ int64_t size() const;
std::unordered_map<std::string, TypeDescriptor> get_name_to_type()
override;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
@@ -111,6 +82,12 @@ private:
RuntimeProfile::Counter* decode_value_time;
RuntimeProfile::Counter* decode_null_map_time;
};
+
+ // Create inner orc file,
+ // return EOF if file is empty
+ // return EROOR if encounter error.
+ Status _create_file_reader();
+
void _init_profile();
Status _init_read_columns();
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
@@ -261,6 +238,9 @@ private:
std::string _get_field_name_lower_case(const orc::Type* orc_type, int pos);
+ void _collect_profile_on_close();
+
+private:
RuntimeProfile* _profile;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
@@ -284,7 +264,7 @@ private:
// Flag for hive engine. True if the external table engine is Hive.
bool _is_hive = false;
std::vector<const orc::Type*> _col_orc_type;
- ORCFileInputStream* _file_reader = nullptr;
+ std::unique_ptr<ORCFileInputStream> _file_input_stream;
Statistics _statistics;
OrcProfile _orc_profile;
bool _closed = false;
@@ -303,4 +283,27 @@ private:
DecimalScaleParams _decimal_scale_params;
};
+class ORCFileInputStream : public orc::InputStream {
+public:
+ ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr
file_reader,
+ OrcReader::Statistics* statistics)
+ : _file_name(file_name), _file_reader(file_reader),
_statistics(statistics) {}
+
+ ~ORCFileInputStream() override = default;
+
+ uint64_t getLength() const override { return _file_reader->size(); }
+
+ uint64_t getNaturalReadSize() const override { return
config::orc_natural_read_size_mb << 20; }
+
+ void read(void* buf, uint64_t length, uint64_t offset) override;
+
+ const std::string& getName() const override { return _file_name; }
+
+private:
+ const std::string& _file_name;
+ io::FileReaderSPtr _file_reader;
+ // Owned by OrcReader
+ OrcReader::Statistics* _statistics;
+};
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index a8ce6422e1..50b54f1691 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -88,20 +88,16 @@ Status
NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
}
for (auto& scan_range : _scan_ranges) {
- VScanner* scanner =
-
(VScanner*)_create_scanner(scan_range.scan_range.ext_scan_range.file_scan_range);
+ VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner,
+
scan_range.scan_range.ext_scan_range.file_scan_range,
+ runtime_profile(), _kv_cache);
+ _scanner_pool.add(scanner);
+ RETURN_IF_ERROR(((VFileScanner*)scanner)
+ ->prepare(_vconjunct_ctx_ptr.get(),
&_colname_to_value_range));
scanners->push_back(scanner);
}
return Status::OK();
}
-VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
- VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner,
scan_range,
- runtime_profile(), _kv_cache);
- ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(),
&_colname_to_value_range);
- _scanner_pool.add(scanner);
- return scanner;
-}
-
}; // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h
b/be/src/vec/exec/scan/new_file_scan_node.h
index 8ece99a961..f45ba9ddf9 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -38,8 +38,6 @@ protected:
Status _init_scanners(std::list<VScanner*>* scanners) override;
private:
- VScanner* _create_scanner(const TFileScanRange& scan_range);
-
std::vector<TScanRangeParams> _scan_ranges;
KVCache<std::string> _kv_cache;
};
diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp
b/be/src/vec/exec/scan/vmeta_scan_node.cpp
index a404d9929b..6a9e86636c 100644
--- a/be/src/vec/exec/scan/vmeta_scan_node.cpp
+++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp
@@ -54,8 +54,8 @@ Status VMetaScanNode::_init_scanners(std::list<VScanner*>*
scanners) {
for (auto& scan_range : _scan_ranges) {
VMetaScanner* scanner = new VMetaScanner(_state, this, _tuple_id,
scan_range,
_limit_per_scanner,
runtime_profile());
- RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
_scanner_pool.add(scanner);
+ RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
scanners->push_back(static_cast<VScanner*>(scanner));
}
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]