Copilot commented on code in PR #63862:
URL: https://github.com/apache/doris/pull/63862#discussion_r3338311607
##########
be/test/format/reader/table_reader_test.cpp:
##########
@@ -612,6 +665,119 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
std::filesystem::remove_all(test_dir);
}
+TEST(TableReaderTest, OrcReaderReadsThroughNewTableReader) {
+ const auto test_dir = std::filesystem::temp_directory_path() /
"doris_table_reader_orc_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
Review Comment:
These tests use fixed names under `temp_directory_path()` and
unconditionally `remove_all()` the directory. This can create flaky behavior
when tests are executed concurrently in multiple processes (e.g.,
shard/parallel CI runs), as one process can delete another's directory
mid-test. Prefer generating a per-test unique directory name (e.g., include PID
+ test name, or a random suffix) to avoid collisions.
##########
be/test/format/reader/table_reader_test.cpp:
##########
@@ -268,6 +274,53 @@ void write_parquet_file(const std::string& file_path,
int32_t id, const std::str
builder.build()));
}
+void set_orc_string_value(::orc::StringVectorBatch& batch, int64_t row,
std::string_view value) {
+ batch.data[row] = const_cast<char*>(value.data());
+ batch.length[row] = static_cast<int64_t>(value.size());
+}
Review Comment:
This uses `const_cast` to write a `char*` pointer into ORC's batch buffer.
Even in tests, it's safer to back these pointers with owned/mutable storage
whose lifetime clearly extends through `writer->add(...)` (e.g., store strings
in a `std::vector<std::string>` and reference that buffer), or use an API that
accepts `const char*` if available. This reduces the chance of undefined
behavior if the underlying ORC writer ever mutates the buffer.
##########
be/src/format/new_orc/orc_reader.cpp:
##########
@@ -0,0 +1,1065 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/new_orc/orc_reader.h"
+
+#include <cctz/time_zone.h>
+#include <gen_cpp/Types_types.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <list>
+#include <map>
+#include <memory>
+#include <orc/OrcFile.hh>
+#include <orc/Vector.hh>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/consts.h"
+#include "common/exception.h"
+#include "core/block/block.h"
+#include "core/column/column_array.h"
+#include "core/column/column_decimal.h"
+#include "core/column/column_map.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_date_time.h"
+#include "core/data_type/data_type_decimal.h"
+#include "core/data_type/data_type_map.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "io/fs/file_reader.h"
+#include "runtime/exec_env.h"
+#include "util/slice.h"
+
+namespace doris::new_orc {
+namespace {
+
+constexpr uint64_t DEFAULT_ORC_READ_BATCH_SIZE = 4096;
+constexpr uint64_t DEFAULT_ORC_NATURAL_READ_SIZE = 128 * 1024;
+constexpr int DECIMAL_PRECISION_FOR_HIVE11 =
BeConsts::MAX_DECIMAL128_PRECISION;
+constexpr int DECIMAL_SCALE_FOR_HIVE11 = 10;
+constexpr const char* ORC_LIST_ELEMENT_NAME = "element";
+constexpr const char* ORC_MAP_ENTRY_NAME = "key_value";
+constexpr const char* ORC_MAP_KEY_NAME = "key";
+constexpr const char* ORC_MAP_VALUE_NAME = "value";
+
+class DorisOrcInputStream final : public ::orc::InputStream {
+public:
+ DorisOrcInputStream(std::string file_name, io::FileReaderSPtr file_reader,
+ io::IOContext* io_ctx)
+ : _file_name(std::move(file_name)),
+ _file_reader(std::move(file_reader)),
+ _io_ctx(io_ctx) {}
+
+ uint64_t getLength() const override { return _file_reader->size(); }
+
+ uint64_t getNaturalReadSize() const override { return
DEFAULT_ORC_NATURAL_READ_SIZE; }
+
+ void read(void* buf, uint64_t length, uint64_t offset) override {
+ uint64_t bytes_read = 0;
+ auto* out = static_cast<uint8_t*>(buf);
+ while (bytes_read < length) {
+ size_t loop_read = 0;
+ Status st = _file_reader->read_at(
+ static_cast<size_t>(offset + bytes_read),
+ Slice(out + bytes_read, static_cast<size_t>(length -
bytes_read)), &loop_read,
+ _io_ctx);
+ if (!st.ok()) {
+ throw ::orc::ParseError("Failed to read " + _file_name + ": " +
+ st.to_string_no_stack());
+ }
+ if (loop_read == 0) {
+ break;
+ }
+ bytes_read += loop_read;
+ }
+ if (bytes_read != length) {
+ throw ::orc::ParseError("Short read from " + _file_name);
+ }
+ }
+
+ const std::string& getName() const override { return _file_name; }
+
+private:
+ std::string _file_name;
+ io::FileReaderSPtr _file_reader;
+ io::IOContext* _io_ctx = nullptr;
+};
+
+bool is_null_at(const ::orc::ColumnVectorBatch& batch, size_t row) {
+ return batch.hasNulls && !batch.notNull[row];
+}
+
+template <typename ColumnType, typename ValueType, typename OrcBatchType>
+Status append_numeric_values(MutableColumnPtr& column, const
::orc::ColumnVectorBatch& batch,
+ size_t rows) {
+ const auto* orc_batch = dynamic_cast<const OrcBatchType*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC numeric batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnType&>(*column).get_data();
+ const size_t old_size = data.size();
+ data.resize(old_size + rows);
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_size + row] = ValueType {};
+ continue;
+ }
+ data[old_size + row] = static_cast<ValueType>(orc_batch->data[row]);
+ }
+ return Status::OK();
+}
+
+template <typename ColumnType, typename ValueType>
+Status append_floating_values(MutableColumnPtr& column, const
::orc::ColumnVectorBatch& batch,
+ size_t rows) {
+ const auto* orc_batch = dynamic_cast<const
::orc::DoubleVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC floating batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnType&>(*column).get_data();
+ const size_t old_size = data.size();
+ data.resize(old_size + rows);
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_size + row] = ValueType {};
+ continue;
+ }
+ data[old_size + row] = static_cast<ValueType>(orc_batch->data[row]);
+ }
+ return Status::OK();
+}
+
+size_t trim_right_spaces(const char* value, size_t length) {
+ while (length > 0 && value[length - 1] == ' ') {
+ --length;
+ }
+ return length;
+}
+
+Int128 to_int128(::orc::Int128 value) {
+ const auto high_bits =
static_cast<__uint128_t>(static_cast<uint64_t>(value.getHighBits()));
+ const auto low_bits = static_cast<__uint128_t>(value.getLowBits());
+ return static_cast<Int128>((high_bits << 64) | low_bits);
+}
+
+std::vector<int32_t> append_path(const std::vector<int32_t>& path, int32_t
child_idx) {
+ auto child_path = path;
+ child_path.push_back(child_idx);
+ return child_path;
+}
+
+std::vector<std::string> append_name_path(const std::vector<std::string>& path,
+ const std::string& child_name) {
+ auto child_path = path;
+ child_path.push_back(child_name);
+ return child_path;
+}
+
+bool is_direct_projection_child(const reader::FieldProjection& parent,
+ const reader::FieldProjection& child) {
+ return child.file_path.size() == parent.file_path.size() + 1 &&
+ std::equal(parent.file_path.begin(), parent.file_path.end(),
child.file_path.begin());
+}
+
+Status get_projection_child_index(const reader::FieldProjection& parent,
+ const reader::FieldProjection& child,
int32_t child_count,
+ const std::string& column_name, int32_t*
child_idx) {
+ if (child.file_path.empty() || !is_direct_projection_child(parent, child))
{
+ return Status::InvalidArgument("Invalid ORC projection path for column
{}", column_name);
+ }
+ *child_idx = child.file_path.back();
+ if (*child_idx < 0 || *child_idx >= child_count) {
+ return Status::InvalidArgument("Invalid ORC projection child index {}
for column {}",
+ *child_idx, column_name);
+ }
+ return Status::OK();
+}
+
+void collect_type_and_descendant_ids(const ::orc::Type& type,
std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type_ids != nullptr);
+ type_ids->insert(type.getColumnId());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ collect_type_and_descendant_ids(*child_type, type_ids);
+ }
+}
+
+Status collect_projected_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection& projection,
+ std::set<uint64_t>* const type_ids);
+
+Status collect_projected_map_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ std::set<uint64_t>* const type_ids);
+
+Status collect_projected_map_entry_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ const reader::FieldProjection&
entry_projection,
+ std::set<uint64_t>* const type_ids,
+ bool* const selected_key, bool*
const selected_value) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ DORIS_CHECK(selected_key != nullptr);
+ DORIS_CHECK(selected_value != nullptr);
+
+ int32_t entry_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(projection, entry_projection,
1, ORC_MAP_ENTRY_NAME,
+ &entry_idx));
+ DORIS_CHECK(entry_idx == 0);
+ if (entry_projection.project_all_children) {
+ collect_type_and_descendant_ids(*type.getSubtype(0), type_ids);
+ collect_type_and_descendant_ids(*type.getSubtype(1), type_ids);
+ *selected_key = true;
+ *selected_value = true;
+ return Status::OK();
+ }
+ if (entry_projection.children.empty()) {
+ return Status::NotSupported("ORC MAP entry projection contains no
children");
+ }
+ for (const auto& key_value_projection : entry_projection.children) {
+ int32_t key_value_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(entry_projection,
key_value_projection, 2,
+ ORC_MAP_ENTRY_NAME,
&key_value_idx));
+ const auto* child_type =
type.getSubtype(static_cast<uint64_t>(key_value_idx));
+ DORIS_CHECK(child_type != nullptr);
+ RETURN_IF_ERROR(collect_projected_type_ids(*child_type,
key_value_projection, type_ids));
+ *selected_key = *selected_key || key_value_idx == 0;
+ *selected_value = *selected_value || key_value_idx == 1;
+ }
+ return Status::OK();
+}
+
+Status collect_projected_map_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ type_ids->insert(type.getColumnId());
+ if (projection.project_all_children) {
+ collect_type_and_descendant_ids(type, type_ids);
+ return Status::OK();
+ }
+ if (projection.children.empty()) {
+ int32_t column_id = -1;
+ if (!projection.file_path.empty()) {
+ column_id = projection.file_path[0];
+ }
+ return Status::NotSupported("ORC MAP projection for column {} contains
no children",
+ column_id);
+ }
+
+ bool selected_key = false;
+ bool selected_value = false;
+ for (const auto& entry_projection : projection.children) {
+ RETURN_IF_ERROR(collect_projected_map_entry_type_ids(
+ type, projection, entry_projection, type_ids, &selected_key,
&selected_value));
+ }
+ if (!selected_key || !selected_value) {
+ return Status::NotSupported("ORC MAP projection must include both key
and value");
+ }
+ return Status::OK();
+}
+
+Status collect_projected_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection& projection,
+ std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type_ids != nullptr);
+ type_ids->insert(type.getColumnId());
+ if (projection.project_all_children) {
+ collect_type_and_descendant_ids(type, type_ids);
+ return Status::OK();
+ }
+ if (projection.children.empty()) {
+ return Status::NotSupported("ORC projection contains no children");
+ }
+ if (type.getKind() == ::orc::TypeKind::MAP) {
+ return collect_projected_map_type_ids(type, projection, type_ids);
+ }
+ if (type.getKind() != ::orc::TypeKind::STRUCT && type.getKind() !=
::orc::TypeKind::LIST) {
+ return Status::InvalidArgument("Cannot project children from
non-complex ORC type {}",
+ static_cast<int>(type.getKind()));
+ }
+
+ const auto child_count = static_cast<int32_t>(type.getSubtypeCount());
+ for (const auto& child_projection : projection.children) {
+ int32_t child_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(projection,
child_projection, child_count,
+ "orc_complex", &child_idx));
+ const auto* child_type =
type.getSubtype(static_cast<uint64_t>(child_idx));
+ DORIS_CHECK(child_type != nullptr);
+ RETURN_IF_ERROR(collect_projected_type_ids(*child_type,
child_projection, type_ids));
+ }
+ return Status::OK();
+}
+
+Status append_orc_offsets(ColumnArray::Offsets64& doris_offsets,
+ const ::orc::DataBuffer<int64_t>& orc_offsets,
size_t rows,
+ size_t* element_size) {
+ const auto prev_offset = doris_offsets.empty() ? 0 : doris_offsets.back();
+ const auto base_offset = orc_offsets[0];
+ for (size_t idx = 1; idx <= rows; ++idx) {
+ const auto delta = orc_offsets[idx] - base_offset;
+ if (delta < 0) {
+ return Status::Corruption("Invalid ORC offsets");
+ }
+ doris_offsets.push_back(prev_offset +
static_cast<ColumnArray::Offset64>(delta));
+ }
+ const auto total_delta = orc_offsets[rows] - base_offset;
+ if (total_delta < 0) {
+ return Status::Corruption("Invalid ORC offsets");
+ }
+ *element_size = static_cast<size_t>(total_delta);
+ return Status::OK();
+}
+
+int64_t find_struct_child_index(const ::orc::Type& type, const std::string&
field_name) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ if (type.getFieldName(child_idx) == field_name) {
+ return static_cast<int64_t>(child_idx);
+ }
+ }
+ return -1;
+}
+
+} // namespace
+
+struct OrcReaderScanState {
+ std::unique_ptr<::orc::Reader> reader;
+ const ::orc::Type* root_type = nullptr;
+ ::orc::ReaderMetrics reader_metrics;
+ ::orc::RowReaderOptions row_reader_options;
+ std::unique_ptr<::orc::RowReader> row_reader;
+ const ::orc::Type* selected_type = nullptr;
+ std::unique_ptr<::orc::ColumnVectorBatch> batch;
+ std::vector<reader::ColumnId> read_columns;
+ std::map<reader::ColumnId, size_t> column_to_selected_batch_index;
+ bool row_reader_created = false;
+};
+
+OrcReader::OrcReader(std::shared_ptr<io::FileSystemProperties>&
system_properties,
+ std::unique_ptr<io::FileDescription>& file_description,
+ std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile*
profile)
+ : FileReader(system_properties, file_description, io_ctx, profile) {}
+
+OrcReader::~OrcReader() = default;
+
+Status OrcReader::init(RuntimeState* state) {
+ RETURN_IF_ERROR(reader::FileReader::init(state));
+ _state = std::make_unique<OrcReaderScanState>();
+
+ ::orc::ReaderOptions options;
+ options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
+ options.setReaderMetrics(&_state->reader_metrics);
+
+ auto input_stream =
std::make_unique<DorisOrcInputStream>(_file_description->path,
+
_tracing_file_reader, _io_ctx.get());
+ try {
+ _state->reader = ::orc::createReader(std::move(input_stream), options);
+ _state->root_type = &_state->reader->getType();
+ } catch (const std::exception& e) {
+ return Status::InternalError("Failed to open ORC file {}: {}",
_file_description->path,
+ e.what());
+ }
+ return Status::OK();
+}
+
+DataTypePtr OrcReader::_convert_to_doris_type(const ::orc::Type& type) const {
+ DataTypePtr data_type;
+ switch (type.getKind()) {
+ case ::orc::TypeKind::BOOLEAN:
+ data_type = std::make_shared<DataTypeUInt8>();
+ break;
+ case ::orc::TypeKind::BYTE:
+ data_type = std::make_shared<DataTypeInt8>();
+ break;
+ case ::orc::TypeKind::SHORT:
+ data_type = std::make_shared<DataTypeInt16>();
+ break;
+ case ::orc::TypeKind::INT:
+ data_type = std::make_shared<DataTypeInt32>();
+ break;
+ case ::orc::TypeKind::LONG:
+ data_type = std::make_shared<DataTypeInt64>();
+ break;
+ case ::orc::TypeKind::FLOAT:
+ data_type = std::make_shared<DataTypeFloat32>();
+ break;
+ case ::orc::TypeKind::DOUBLE:
+ data_type = std::make_shared<DataTypeFloat64>();
+ break;
+ case ::orc::TypeKind::STRING:
+ case ::orc::TypeKind::BINARY:
+ data_type = std::make_shared<DataTypeString>();
+ break;
+ case ::orc::TypeKind::VARCHAR:
+ data_type =
std::make_shared<DataTypeString>(cast_set<int>(type.getMaximumLength()),
+
PrimitiveType::TYPE_VARCHAR);
+ break;
+ case ::orc::TypeKind::CHAR:
+ data_type =
std::make_shared<DataTypeString>(cast_set<int>(type.getMaximumLength()),
+ PrimitiveType::TYPE_CHAR);
+ break;
+ case ::orc::TypeKind::DATE:
+ data_type = std::make_shared<DataTypeDateV2>();
+ break;
+ case ::orc::TypeKind::TIMESTAMP:
+ case ::orc::TypeKind::TIMESTAMP_INSTANT:
+ data_type = std::make_shared<DataTypeDateTimeV2>(6);
+ break;
+ case ::orc::TypeKind::DECIMAL:
+ data_type = std::make_shared<DataTypeDecimal<TYPE_DECIMAL128I>>(
+ type.getPrecision() == 0 ? DECIMAL_PRECISION_FOR_HIVE11
+ : cast_set<int>(type.getPrecision()),
+ type.getPrecision() == 0 ? DECIMAL_SCALE_FOR_HIVE11
+ : cast_set<int>(type.getScale()));
+ break;
+ case ::orc::TypeKind::LIST:
+ data_type = _convert_list_to_doris_type(type);
+ break;
+ case ::orc::TypeKind::MAP:
+ data_type = _convert_map_to_doris_type(type);
+ break;
+ case ::orc::TypeKind::STRUCT:
+ data_type = _convert_struct_to_doris_type(type);
+ break;
+ default:
+ throw doris::Exception(
+ Status::NotSupported("ORC type {} is not supported by new ORC
reader",
+ static_cast<int>(type.getKind())));
+ }
+ return make_nullable(data_type);
+}
+
+DataTypePtr OrcReader::_convert_list_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::LIST);
+ DORIS_CHECK(type.getSubtypeCount() == 1);
+ const auto* element_type = type.getSubtype(0);
+ DORIS_CHECK(element_type != nullptr);
+ return
std::make_shared<DataTypeArray>(_convert_to_doris_type(*element_type));
+}
+
+DataTypePtr OrcReader::_convert_map_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ const auto* key_type = type.getSubtype(0);
+ const auto* value_type = type.getSubtype(1);
+ DORIS_CHECK(key_type != nullptr);
+ DORIS_CHECK(value_type != nullptr);
+ return std::make_shared<DataTypeMap>(_convert_to_doris_type(*key_type),
+ _convert_to_doris_type(*value_type));
+}
+
+DataTypePtr OrcReader::_convert_struct_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ DataTypes child_types;
+ Strings child_names;
+ child_types.reserve(type.getSubtypeCount());
+ child_names.reserve(type.getSubtypeCount());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ child_types.push_back(_convert_to_doris_type(*child_type));
+ child_names.push_back(type.getFieldName(child_idx));
+ }
+ return std::make_shared<DataTypeStruct>(child_types, child_names);
+}
+
+Status OrcReader::_fill_schema_field(const ::orc::Type& type, int32_t field_id,
+ const std::string& field_name,
+ const std::vector<int32_t>& file_path,
+ const std::vector<std::string>& name_path,
+ reader::SchemaField* const field) const {
+ if (field == nullptr) {
+ return Status::InvalidArgument("schema field is null");
+ }
+ field->id = field_id;
+ field->name = field_name;
+ field->file_path = file_path;
+ field->field_id_path.clear();
+ field->name_path = name_path;
+ field->column_type = reader::ColumnType::DATA_COLUMN;
+ try {
+ field->type = _convert_to_doris_type(type);
+ } catch (const doris::Exception& e) {
+ return e.to_status();
+ }
+ field->children.clear();
+ switch (type.getKind()) {
+ case ::orc::TypeKind::STRUCT:
+ return _fill_struct_schema_children(type, file_path, name_path, field);
+ case ::orc::TypeKind::LIST:
+ return _fill_list_schema_children(type, file_path, name_path, field);
+ case ::orc::TypeKind::MAP:
+ return _fill_map_schema_children(type, file_path, name_path, field);
+ default:
+ break;
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_fill_struct_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const
field) const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ field->children.reserve(type.getSubtypeCount());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ const auto child_name = type.getFieldName(child_idx);
+ reader::SchemaField child_field;
+ RETURN_IF_ERROR(_fill_schema_field(*child_type,
static_cast<int32_t>(child_idx), child_name,
+ append_path(file_path,
static_cast<int32_t>(child_idx)),
+ append_name_path(name_path,
child_name), &child_field));
+ field->children.push_back(std::move(child_field));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_fill_list_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const field)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::LIST);
+ DORIS_CHECK(type.getSubtypeCount() == 1);
+ const auto* element_type = type.getSubtype(0);
+ DORIS_CHECK(element_type != nullptr);
+
+ reader::SchemaField element_field;
+ RETURN_IF_ERROR(
+ _fill_schema_field(*element_type, 0, ORC_LIST_ELEMENT_NAME,
append_path(file_path, 0),
+ append_name_path(name_path,
ORC_LIST_ELEMENT_NAME), &element_field));
+ field->children.push_back(std::move(element_field));
+ return Status::OK();
+}
+
+Status OrcReader::_fill_map_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const field)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ const auto* key_type = type.getSubtype(0);
+ const auto* value_type = type.getSubtype(1);
+ DORIS_CHECK(key_type != nullptr);
+ DORIS_CHECK(value_type != nullptr);
+
+ const auto entry_file_path = append_path(file_path, 0);
+ const auto entry_name_path = append_name_path(name_path,
ORC_MAP_ENTRY_NAME);
+ reader::SchemaField key_field;
+ RETURN_IF_ERROR(
+ _fill_schema_field(*key_type, 0, ORC_MAP_KEY_NAME,
append_path(entry_file_path, 0),
+ append_name_path(entry_name_path,
ORC_MAP_KEY_NAME), &key_field));
+ reader::SchemaField value_field;
+ RETURN_IF_ERROR(_fill_schema_field(
+ *value_type, 1, ORC_MAP_VALUE_NAME, append_path(entry_file_path,
1),
+ append_name_path(entry_name_path, ORC_MAP_VALUE_NAME),
&value_field));
+
+ reader::SchemaField entry_field;
+ entry_field.id = 0;
+ entry_field.name = ORC_MAP_ENTRY_NAME;
+ entry_field.file_path = entry_file_path;
+ entry_field.field_id_path.clear();
+ entry_field.name_path = entry_name_path;
+ entry_field.column_type = reader::ColumnType::DATA_COLUMN;
+ entry_field.children.push_back(std::move(key_field));
+ entry_field.children.push_back(std::move(value_field));
+
+ DataTypes entry_child_types;
+ Strings entry_child_names;
+ entry_child_types.reserve(entry_field.children.size());
+ entry_child_names.reserve(entry_field.children.size());
+ for (const auto& child : entry_field.children) {
+ entry_child_types.push_back(child.type);
+ entry_child_names.push_back(child.name);
+ }
+ entry_field.type = std::make_shared<DataTypeStruct>(entry_child_types,
entry_child_names);
+ field->children.push_back(std::move(entry_field));
Review Comment:
In `_fill_map_schema_children`, `entry_field.type` is created as a
non-nullable `DataTypeStruct`, while other schema fields are produced via
`_fill_schema_field()` which wraps types with `make_nullable(...)`. This makes
the map entry's schema inconsistent with the rest of the ORC schema (and with
expectations in code/tests that frequently treat fields as nullable). Consider
wrapping the entry struct type with `make_nullable(...)` (or otherwise aligning
nullability rules for this synthetic entry node).
##########
be/src/format/new_orc/orc_reader.cpp:
##########
@@ -0,0 +1,1065 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/new_orc/orc_reader.h"
+
+#include <cctz/time_zone.h>
+#include <gen_cpp/Types_types.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <list>
+#include <map>
+#include <memory>
+#include <orc/OrcFile.hh>
+#include <orc/Vector.hh>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/consts.h"
+#include "common/exception.h"
+#include "core/block/block.h"
+#include "core/column/column_array.h"
+#include "core/column/column_decimal.h"
+#include "core/column/column_map.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_date_time.h"
+#include "core/data_type/data_type_decimal.h"
+#include "core/data_type/data_type_map.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "io/fs/file_reader.h"
+#include "runtime/exec_env.h"
+#include "util/slice.h"
+
+namespace doris::new_orc {
+namespace {
+
+constexpr uint64_t DEFAULT_ORC_READ_BATCH_SIZE = 4096;
+constexpr uint64_t DEFAULT_ORC_NATURAL_READ_SIZE = 128 * 1024;
+constexpr int DECIMAL_PRECISION_FOR_HIVE11 =
BeConsts::MAX_DECIMAL128_PRECISION;
+constexpr int DECIMAL_SCALE_FOR_HIVE11 = 10;
+constexpr const char* ORC_LIST_ELEMENT_NAME = "element";
+constexpr const char* ORC_MAP_ENTRY_NAME = "key_value";
+constexpr const char* ORC_MAP_KEY_NAME = "key";
+constexpr const char* ORC_MAP_VALUE_NAME = "value";
+
+class DorisOrcInputStream final : public ::orc::InputStream {
+public:
+ DorisOrcInputStream(std::string file_name, io::FileReaderSPtr file_reader,
+ io::IOContext* io_ctx)
+ : _file_name(std::move(file_name)),
+ _file_reader(std::move(file_reader)),
+ _io_ctx(io_ctx) {}
+
+ uint64_t getLength() const override { return _file_reader->size(); }
+
+ uint64_t getNaturalReadSize() const override { return
DEFAULT_ORC_NATURAL_READ_SIZE; }
+
+ void read(void* buf, uint64_t length, uint64_t offset) override {
+ uint64_t bytes_read = 0;
+ auto* out = static_cast<uint8_t*>(buf);
+ while (bytes_read < length) {
+ size_t loop_read = 0;
+ Status st = _file_reader->read_at(
+ static_cast<size_t>(offset + bytes_read),
+ Slice(out + bytes_read, static_cast<size_t>(length -
bytes_read)), &loop_read,
+ _io_ctx);
+ if (!st.ok()) {
+ throw ::orc::ParseError("Failed to read " + _file_name + ": " +
+ st.to_string_no_stack());
+ }
+ if (loop_read == 0) {
+ break;
+ }
+ bytes_read += loop_read;
+ }
+ if (bytes_read != length) {
+ throw ::orc::ParseError("Short read from " + _file_name);
+ }
+ }
+
+ const std::string& getName() const override { return _file_name; }
+
+private:
+ std::string _file_name;
+ io::FileReaderSPtr _file_reader;
+ io::IOContext* _io_ctx = nullptr;
+};
+
+bool is_null_at(const ::orc::ColumnVectorBatch& batch, size_t row) {
+ return batch.hasNulls && !batch.notNull[row];
+}
+
+template <typename ColumnType, typename ValueType, typename OrcBatchType>
+Status append_numeric_values(MutableColumnPtr& column, const
::orc::ColumnVectorBatch& batch,
+ size_t rows) {
+ const auto* orc_batch = dynamic_cast<const OrcBatchType*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC numeric batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnType&>(*column).get_data();
+ const size_t old_size = data.size();
+ data.resize(old_size + rows);
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_size + row] = ValueType {};
+ continue;
+ }
+ data[old_size + row] = static_cast<ValueType>(orc_batch->data[row]);
+ }
+ return Status::OK();
+}
+
+template <typename ColumnType, typename ValueType>
+Status append_floating_values(MutableColumnPtr& column, const
::orc::ColumnVectorBatch& batch,
+ size_t rows) {
+ const auto* orc_batch = dynamic_cast<const
::orc::DoubleVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC floating batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnType&>(*column).get_data();
+ const size_t old_size = data.size();
+ data.resize(old_size + rows);
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_size + row] = ValueType {};
+ continue;
+ }
+ data[old_size + row] = static_cast<ValueType>(orc_batch->data[row]);
+ }
+ return Status::OK();
+}
+
+size_t trim_right_spaces(const char* value, size_t length) {
+ while (length > 0 && value[length - 1] == ' ') {
+ --length;
+ }
+ return length;
+}
+
+Int128 to_int128(::orc::Int128 value) {
+ const auto high_bits =
static_cast<__uint128_t>(static_cast<uint64_t>(value.getHighBits()));
+ const auto low_bits = static_cast<__uint128_t>(value.getLowBits());
+ return static_cast<Int128>((high_bits << 64) | low_bits);
+}
+
+std::vector<int32_t> append_path(const std::vector<int32_t>& path, int32_t
child_idx) {
+ auto child_path = path;
+ child_path.push_back(child_idx);
+ return child_path;
+}
+
+std::vector<std::string> append_name_path(const std::vector<std::string>& path,
+ const std::string& child_name) {
+ auto child_path = path;
+ child_path.push_back(child_name);
+ return child_path;
+}
+
+bool is_direct_projection_child(const reader::FieldProjection& parent,
+ const reader::FieldProjection& child) {
+ return child.file_path.size() == parent.file_path.size() + 1 &&
+ std::equal(parent.file_path.begin(), parent.file_path.end(),
child.file_path.begin());
+}
+
+Status get_projection_child_index(const reader::FieldProjection& parent,
+ const reader::FieldProjection& child,
int32_t child_count,
+ const std::string& column_name, int32_t*
child_idx) {
+ if (child.file_path.empty() || !is_direct_projection_child(parent, child))
{
+ return Status::InvalidArgument("Invalid ORC projection path for column
{}", column_name);
+ }
+ *child_idx = child.file_path.back();
+ if (*child_idx < 0 || *child_idx >= child_count) {
+ return Status::InvalidArgument("Invalid ORC projection child index {}
for column {}",
+ *child_idx, column_name);
+ }
+ return Status::OK();
+}
+
+void collect_type_and_descendant_ids(const ::orc::Type& type,
std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type_ids != nullptr);
+ type_ids->insert(type.getColumnId());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ collect_type_and_descendant_ids(*child_type, type_ids);
+ }
+}
+
+Status collect_projected_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection& projection,
+ std::set<uint64_t>* const type_ids);
+
+Status collect_projected_map_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ std::set<uint64_t>* const type_ids);
+
+Status collect_projected_map_entry_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ const reader::FieldProjection&
entry_projection,
+ std::set<uint64_t>* const type_ids,
+ bool* const selected_key, bool*
const selected_value) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ DORIS_CHECK(selected_key != nullptr);
+ DORIS_CHECK(selected_value != nullptr);
+
+ int32_t entry_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(projection, entry_projection,
1, ORC_MAP_ENTRY_NAME,
+ &entry_idx));
+ DORIS_CHECK(entry_idx == 0);
+ if (entry_projection.project_all_children) {
+ collect_type_and_descendant_ids(*type.getSubtype(0), type_ids);
+ collect_type_and_descendant_ids(*type.getSubtype(1), type_ids);
+ *selected_key = true;
+ *selected_value = true;
+ return Status::OK();
+ }
+ if (entry_projection.children.empty()) {
+ return Status::NotSupported("ORC MAP entry projection contains no
children");
+ }
+ for (const auto& key_value_projection : entry_projection.children) {
+ int32_t key_value_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(entry_projection,
key_value_projection, 2,
+ ORC_MAP_ENTRY_NAME,
&key_value_idx));
+ const auto* child_type =
type.getSubtype(static_cast<uint64_t>(key_value_idx));
+ DORIS_CHECK(child_type != nullptr);
+ RETURN_IF_ERROR(collect_projected_type_ids(*child_type,
key_value_projection, type_ids));
+ *selected_key = *selected_key || key_value_idx == 0;
+ *selected_value = *selected_value || key_value_idx == 1;
+ }
+ return Status::OK();
+}
+
+Status collect_projected_map_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ type_ids->insert(type.getColumnId());
+ if (projection.project_all_children) {
+ collect_type_and_descendant_ids(type, type_ids);
+ return Status::OK();
+ }
+ if (projection.children.empty()) {
+ int32_t column_id = -1;
+ if (!projection.file_path.empty()) {
+ column_id = projection.file_path[0];
+ }
+ return Status::NotSupported("ORC MAP projection for column {} contains
no children",
+ column_id);
+ }
+
+ bool selected_key = false;
+ bool selected_value = false;
+ for (const auto& entry_projection : projection.children) {
+ RETURN_IF_ERROR(collect_projected_map_entry_type_ids(
+ type, projection, entry_projection, type_ids, &selected_key,
&selected_value));
+ }
+ if (!selected_key || !selected_value) {
+ return Status::NotSupported("ORC MAP projection must include both key
and value");
+ }
+ return Status::OK();
+}
+
+Status collect_projected_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection& projection,
+ std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type_ids != nullptr);
+ type_ids->insert(type.getColumnId());
+ if (projection.project_all_children) {
+ collect_type_and_descendant_ids(type, type_ids);
+ return Status::OK();
+ }
+ if (projection.children.empty()) {
+ return Status::NotSupported("ORC projection contains no children");
+ }
+ if (type.getKind() == ::orc::TypeKind::MAP) {
+ return collect_projected_map_type_ids(type, projection, type_ids);
+ }
+ if (type.getKind() != ::orc::TypeKind::STRUCT && type.getKind() !=
::orc::TypeKind::LIST) {
+ return Status::InvalidArgument("Cannot project children from
non-complex ORC type {}",
+ static_cast<int>(type.getKind()));
+ }
+
+ const auto child_count = static_cast<int32_t>(type.getSubtypeCount());
+ for (const auto& child_projection : projection.children) {
+ int32_t child_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(projection,
child_projection, child_count,
+ "orc_complex", &child_idx));
+ const auto* child_type =
type.getSubtype(static_cast<uint64_t>(child_idx));
+ DORIS_CHECK(child_type != nullptr);
+ RETURN_IF_ERROR(collect_projected_type_ids(*child_type,
child_projection, type_ids));
+ }
+ return Status::OK();
+}
+
+Status append_orc_offsets(ColumnArray::Offsets64& doris_offsets,
+ const ::orc::DataBuffer<int64_t>& orc_offsets,
size_t rows,
+ size_t* element_size) {
+ const auto prev_offset = doris_offsets.empty() ? 0 : doris_offsets.back();
+ const auto base_offset = orc_offsets[0];
+ for (size_t idx = 1; idx <= rows; ++idx) {
+ const auto delta = orc_offsets[idx] - base_offset;
+ if (delta < 0) {
+ return Status::Corruption("Invalid ORC offsets");
+ }
+ doris_offsets.push_back(prev_offset +
static_cast<ColumnArray::Offset64>(delta));
+ }
+ const auto total_delta = orc_offsets[rows] - base_offset;
+ if (total_delta < 0) {
+ return Status::Corruption("Invalid ORC offsets");
+ }
+ *element_size = static_cast<size_t>(total_delta);
+ return Status::OK();
+}
+
+int64_t find_struct_child_index(const ::orc::Type& type, const std::string&
field_name) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ if (type.getFieldName(child_idx) == field_name) {
+ return static_cast<int64_t>(child_idx);
+ }
+ }
+ return -1;
+}
+
+} // namespace
+
+struct OrcReaderScanState {
+ std::unique_ptr<::orc::Reader> reader;
+ const ::orc::Type* root_type = nullptr;
+ ::orc::ReaderMetrics reader_metrics;
+ ::orc::RowReaderOptions row_reader_options;
+ std::unique_ptr<::orc::RowReader> row_reader;
+ const ::orc::Type* selected_type = nullptr;
+ std::unique_ptr<::orc::ColumnVectorBatch> batch;
+ std::vector<reader::ColumnId> read_columns;
+ std::map<reader::ColumnId, size_t> column_to_selected_batch_index;
+ bool row_reader_created = false;
+};
+
+OrcReader::OrcReader(std::shared_ptr<io::FileSystemProperties>&
system_properties,
+ std::unique_ptr<io::FileDescription>& file_description,
+ std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile*
profile)
+ : FileReader(system_properties, file_description, io_ctx, profile) {}
+
+OrcReader::~OrcReader() = default;
+
+Status OrcReader::init(RuntimeState* state) {
+ RETURN_IF_ERROR(reader::FileReader::init(state));
+ _state = std::make_unique<OrcReaderScanState>();
+
+ ::orc::ReaderOptions options;
+ options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
+ options.setReaderMetrics(&_state->reader_metrics);
+
+ auto input_stream =
std::make_unique<DorisOrcInputStream>(_file_description->path,
+
_tracing_file_reader, _io_ctx.get());
+ try {
+ _state->reader = ::orc::createReader(std::move(input_stream), options);
+ _state->root_type = &_state->reader->getType();
+ } catch (const std::exception& e) {
+ return Status::InternalError("Failed to open ORC file {}: {}",
_file_description->path,
+ e.what());
+ }
+ return Status::OK();
+}
+
+DataTypePtr OrcReader::_convert_to_doris_type(const ::orc::Type& type) const {
+ DataTypePtr data_type;
+ switch (type.getKind()) {
+ case ::orc::TypeKind::BOOLEAN:
+ data_type = std::make_shared<DataTypeUInt8>();
+ break;
+ case ::orc::TypeKind::BYTE:
+ data_type = std::make_shared<DataTypeInt8>();
+ break;
+ case ::orc::TypeKind::SHORT:
+ data_type = std::make_shared<DataTypeInt16>();
+ break;
+ case ::orc::TypeKind::INT:
+ data_type = std::make_shared<DataTypeInt32>();
+ break;
+ case ::orc::TypeKind::LONG:
+ data_type = std::make_shared<DataTypeInt64>();
+ break;
+ case ::orc::TypeKind::FLOAT:
+ data_type = std::make_shared<DataTypeFloat32>();
+ break;
+ case ::orc::TypeKind::DOUBLE:
+ data_type = std::make_shared<DataTypeFloat64>();
+ break;
+ case ::orc::TypeKind::STRING:
+ case ::orc::TypeKind::BINARY:
+ data_type = std::make_shared<DataTypeString>();
+ break;
+ case ::orc::TypeKind::VARCHAR:
+ data_type =
std::make_shared<DataTypeString>(cast_set<int>(type.getMaximumLength()),
+
PrimitiveType::TYPE_VARCHAR);
+ break;
+ case ::orc::TypeKind::CHAR:
+ data_type =
std::make_shared<DataTypeString>(cast_set<int>(type.getMaximumLength()),
+ PrimitiveType::TYPE_CHAR);
+ break;
+ case ::orc::TypeKind::DATE:
+ data_type = std::make_shared<DataTypeDateV2>();
+ break;
+ case ::orc::TypeKind::TIMESTAMP:
+ case ::orc::TypeKind::TIMESTAMP_INSTANT:
+ data_type = std::make_shared<DataTypeDateTimeV2>(6);
+ break;
+ case ::orc::TypeKind::DECIMAL:
+ data_type = std::make_shared<DataTypeDecimal<TYPE_DECIMAL128I>>(
+ type.getPrecision() == 0 ? DECIMAL_PRECISION_FOR_HIVE11
+ : cast_set<int>(type.getPrecision()),
+ type.getPrecision() == 0 ? DECIMAL_SCALE_FOR_HIVE11
+ : cast_set<int>(type.getScale()));
+ break;
+ case ::orc::TypeKind::LIST:
+ data_type = _convert_list_to_doris_type(type);
+ break;
+ case ::orc::TypeKind::MAP:
+ data_type = _convert_map_to_doris_type(type);
+ break;
+ case ::orc::TypeKind::STRUCT:
+ data_type = _convert_struct_to_doris_type(type);
+ break;
+ default:
+ throw doris::Exception(
+ Status::NotSupported("ORC type {} is not supported by new ORC
reader",
+ static_cast<int>(type.getKind())));
+ }
+ return make_nullable(data_type);
+}
+
+DataTypePtr OrcReader::_convert_list_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::LIST);
+ DORIS_CHECK(type.getSubtypeCount() == 1);
+ const auto* element_type = type.getSubtype(0);
+ DORIS_CHECK(element_type != nullptr);
+ return
std::make_shared<DataTypeArray>(_convert_to_doris_type(*element_type));
+}
+
+DataTypePtr OrcReader::_convert_map_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ const auto* key_type = type.getSubtype(0);
+ const auto* value_type = type.getSubtype(1);
+ DORIS_CHECK(key_type != nullptr);
+ DORIS_CHECK(value_type != nullptr);
+ return std::make_shared<DataTypeMap>(_convert_to_doris_type(*key_type),
+ _convert_to_doris_type(*value_type));
+}
+
+DataTypePtr OrcReader::_convert_struct_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ DataTypes child_types;
+ Strings child_names;
+ child_types.reserve(type.getSubtypeCount());
+ child_names.reserve(type.getSubtypeCount());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ child_types.push_back(_convert_to_doris_type(*child_type));
+ child_names.push_back(type.getFieldName(child_idx));
+ }
+ return std::make_shared<DataTypeStruct>(child_types, child_names);
+}
+
+Status OrcReader::_fill_schema_field(const ::orc::Type& type, int32_t field_id,
+ const std::string& field_name,
+ const std::vector<int32_t>& file_path,
+ const std::vector<std::string>& name_path,
+ reader::SchemaField* const field) const {
+ if (field == nullptr) {
+ return Status::InvalidArgument("schema field is null");
+ }
+ field->id = field_id;
+ field->name = field_name;
+ field->file_path = file_path;
+ field->field_id_path.clear();
+ field->name_path = name_path;
+ field->column_type = reader::ColumnType::DATA_COLUMN;
+ try {
+ field->type = _convert_to_doris_type(type);
+ } catch (const doris::Exception& e) {
+ return e.to_status();
+ }
+ field->children.clear();
+ switch (type.getKind()) {
+ case ::orc::TypeKind::STRUCT:
+ return _fill_struct_schema_children(type, file_path, name_path, field);
+ case ::orc::TypeKind::LIST:
+ return _fill_list_schema_children(type, file_path, name_path, field);
+ case ::orc::TypeKind::MAP:
+ return _fill_map_schema_children(type, file_path, name_path, field);
+ default:
+ break;
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_fill_struct_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const
field) const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ field->children.reserve(type.getSubtypeCount());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ const auto child_name = type.getFieldName(child_idx);
+ reader::SchemaField child_field;
+ RETURN_IF_ERROR(_fill_schema_field(*child_type,
static_cast<int32_t>(child_idx), child_name,
+ append_path(file_path,
static_cast<int32_t>(child_idx)),
+ append_name_path(name_path,
child_name), &child_field));
+ field->children.push_back(std::move(child_field));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_fill_list_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const field)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::LIST);
+ DORIS_CHECK(type.getSubtypeCount() == 1);
+ const auto* element_type = type.getSubtype(0);
+ DORIS_CHECK(element_type != nullptr);
+
+ reader::SchemaField element_field;
+ RETURN_IF_ERROR(
+ _fill_schema_field(*element_type, 0, ORC_LIST_ELEMENT_NAME,
append_path(file_path, 0),
+ append_name_path(name_path,
ORC_LIST_ELEMENT_NAME), &element_field));
+ field->children.push_back(std::move(element_field));
+ return Status::OK();
+}
+
+Status OrcReader::_fill_map_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const field)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ const auto* key_type = type.getSubtype(0);
+ const auto* value_type = type.getSubtype(1);
+ DORIS_CHECK(key_type != nullptr);
+ DORIS_CHECK(value_type != nullptr);
+
+ const auto entry_file_path = append_path(file_path, 0);
+ const auto entry_name_path = append_name_path(name_path,
ORC_MAP_ENTRY_NAME);
+ reader::SchemaField key_field;
+ RETURN_IF_ERROR(
+ _fill_schema_field(*key_type, 0, ORC_MAP_KEY_NAME,
append_path(entry_file_path, 0),
+ append_name_path(entry_name_path,
ORC_MAP_KEY_NAME), &key_field));
+ reader::SchemaField value_field;
+ RETURN_IF_ERROR(_fill_schema_field(
+ *value_type, 1, ORC_MAP_VALUE_NAME, append_path(entry_file_path,
1),
+ append_name_path(entry_name_path, ORC_MAP_VALUE_NAME),
&value_field));
+
+ reader::SchemaField entry_field;
+ entry_field.id = 0;
+ entry_field.name = ORC_MAP_ENTRY_NAME;
+ entry_field.file_path = entry_file_path;
+ entry_field.field_id_path.clear();
+ entry_field.name_path = entry_name_path;
+ entry_field.column_type = reader::ColumnType::DATA_COLUMN;
+ entry_field.children.push_back(std::move(key_field));
+ entry_field.children.push_back(std::move(value_field));
+
+ DataTypes entry_child_types;
+ Strings entry_child_names;
+ entry_child_types.reserve(entry_field.children.size());
+ entry_child_names.reserve(entry_field.children.size());
+ for (const auto& child : entry_field.children) {
+ entry_child_types.push_back(child.type);
+ entry_child_names.push_back(child.name);
+ }
+ entry_field.type = std::make_shared<DataTypeStruct>(entry_child_types,
entry_child_names);
+ field->children.push_back(std::move(entry_field));
+ return Status::OK();
+}
+
+Status OrcReader::get_schema(std::vector<reader::SchemaField>* const
file_schema) const {
+ if (file_schema == nullptr) {
+ return Status::InvalidArgument("file_schema is null");
+ }
+ if (_state == nullptr || _state->root_type == nullptr) {
+ return Status::Uninitialized("OrcReader is not open");
+ }
+ if (_state->root_type->getKind() != ::orc::TypeKind::STRUCT) {
+ return Status::NotSupported("ORC reader only supports top-level struct
schema");
+ }
+ file_schema->clear();
+ file_schema->reserve(_state->root_type->getSubtypeCount());
+ for (uint64_t child_idx = 0; child_idx <
_state->root_type->getSubtypeCount(); ++child_idx) {
+ const auto* child_type = _state->root_type->getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ const auto child_name = _state->root_type->getFieldName(child_idx);
+ reader::SchemaField field;
+ RETURN_IF_ERROR(_fill_schema_field(*child_type,
static_cast<int32_t>(child_idx), child_name,
+ {static_cast<int32_t>(child_idx)},
{child_name},
+ &field));
+ file_schema->push_back(std::move(field));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
+ if (_state == nullptr || _state->reader == nullptr || _state->root_type ==
nullptr) {
+ return Status::Uninitialized("OrcReader is not open");
+ }
+ RETURN_IF_ERROR(reader::FileReader::open(request));
+ if (!_request->conjuncts.empty() || !_request->delete_conjuncts.empty() ||
+ !_request->column_predicate_filters.empty() ||
!_request->reader_expression_map.empty()) {
+ return Status::NotSupported(
+ "New ORC reader does not support file-local filters or reader
expressions");
+ }
+
+ if (_request->column_positions.empty()) {
+ for (const auto file_column_id : _request->predicate_columns) {
+ _request->column_positions.emplace(file_column_id, file_column_id);
+ }
+ for (const auto file_column_id : _request->non_predicate_columns) {
+ _request->column_positions.emplace(file_column_id, file_column_id);
+ }
+ }
+
+ _state->read_columns.clear();
+ _state->read_columns.reserve(_request->predicate_columns.size() +
+ _request->non_predicate_columns.size());
+ _state->read_columns.insert(_state->read_columns.end(),
_request->predicate_columns.begin(),
+ _request->predicate_columns.end());
+ _state->read_columns.insert(_state->read_columns.end(),
_request->non_predicate_columns.begin(),
+ _request->non_predicate_columns.end());
+
+ std::sort(_state->read_columns.begin(), _state->read_columns.end());
+ _state->read_columns.erase(
+ std::unique(_state->read_columns.begin(),
_state->read_columns.end()),
+ _state->read_columns.end());
+
+ RETURN_IF_ERROR(_configure_row_reader_projection());
+ _state->row_reader_options.setTimezoneName("UTC");
+ _state->row_reader_options.setEnableLazyDecoding(false);
+ _state->row_reader_options.setUseTightNumericVector(false);
+ RETURN_IF_ERROR(_create_row_reader());
+ _eof = _state->reader->getNumberOfRows() == 0;
+ return Status::OK();
+}
+
+Status OrcReader::_configure_row_reader_projection() {
+ const auto num_fields =
static_cast<reader::ColumnId>(_state->root_type->getSubtypeCount());
+ for (const auto& [file_column_id, projection] :
_request->complex_projections) {
+ if (file_column_id < 0 || file_column_id >= num_fields) {
+ return Status::InvalidArgument("Invalid ORC projection top-level
field id {}",
+ file_column_id);
+ }
+ if (projection.file_column_id != file_column_id) {
+ return Status::InvalidArgument("ORC projection column id mismatch:
key={}, value={}",
+ file_column_id,
projection.file_column_id);
+ }
+ if (projection.file_path.empty() || projection.file_path.front() !=
file_column_id) {
+ return Status::InvalidArgument("Invalid ORC projection root path
for column {}",
+ file_column_id);
+ }
+ }
+
+ for (const auto file_column_id : _state->read_columns) {
+ DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
+ DORIS_CHECK(_request->column_positions.contains(file_column_id));
+ }
+ if (_request->complex_projections.empty()) {
+ std::list<uint64_t> include_columns;
+ for (const auto file_column_id : _state->read_columns) {
+ include_columns.push_back(static_cast<uint64_t>(file_column_id));
+ }
+ _state->row_reader_options.include(include_columns);
+ return Status::OK();
+ }
+
+ std::set<uint64_t> include_type_ids;
+ include_type_ids.insert(_state->root_type->getColumnId());
+ for (const auto file_column_id : _state->read_columns) {
+ const auto* type =
_state->root_type->getSubtype(static_cast<uint64_t>(file_column_id));
+ DORIS_CHECK(type != nullptr);
+ const auto projection_it =
_request->complex_projections.find(file_column_id);
+ if (projection_it == _request->complex_projections.end()) {
+ collect_type_and_descendant_ids(*type, &include_type_ids);
+ continue;
+ }
+ RETURN_IF_ERROR(
+ collect_projected_type_ids(*type, projection_it->second,
&include_type_ids));
+ }
+ std::list<uint64_t> include_type_id_list(include_type_ids.begin(),
include_type_ids.end());
+ _state->row_reader_options.includeTypes(include_type_id_list);
+ return Status::OK();
+}
+
+Status OrcReader::_create_row_reader() {
+ try {
+ _state->row_reader =
_state->reader->createRowReader(_state->row_reader_options);
+ _state->selected_type = &_state->row_reader->getSelectedType();
+ DORIS_CHECK(_state->selected_type->getKind() ==
::orc::TypeKind::STRUCT);
+ _state->batch =
_state->row_reader->createRowBatch(DEFAULT_ORC_READ_BATCH_SIZE);
+ _state->column_to_selected_batch_index.clear();
+ for (uint64_t selected_idx = 0; selected_idx <
_state->selected_type->getSubtypeCount();
+ ++selected_idx) {
+ const auto field_name =
_state->selected_type->getFieldName(selected_idx);
+ for (const auto file_column_id : _state->read_columns) {
+ if (field_name ==
_state->root_type->getFieldName(file_column_id)) {
+ _state->column_to_selected_batch_index.emplace(
+ file_column_id, static_cast<size_t>(selected_idx));
+ break;
+ }
+ }
+ }
+ DORIS_CHECK(_state->column_to_selected_batch_index.size() ==
_state->read_columns.size());
+ _state->row_reader_created = true;
+ } catch (const std::exception& e) {
+ return Status::InternalError("Failed to create ORC row reader: {}",
e.what());
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_column(const ::orc::Type& file_type, const
::orc::Type& selected_type,
+ const ::orc::ColumnVectorBatch& batch,
MutableColumnPtr& column,
+ size_t rows) const {
+ DORIS_CHECK(file_type.getKind() == selected_type.getKind());
+ DORIS_CHECK(column->is_nullable());
+ if (rows == 0) {
+ return Status::OK();
+ }
+ auto& nullable_column = assert_cast<ColumnNullable&>(*column);
+ auto nested_column = nullable_column.get_nested_column_ptr();
+ auto& null_map = nullable_column.get_null_map_data();
+ const size_t old_size = null_map.size();
+ null_map.resize(old_size + rows);
+ if (batch.hasNulls) {
+ for (size_t row = 0; row < rows; ++row) {
+ null_map[old_size + row] = !batch.notNull[row];
+ }
+ } else {
+ std::memset(null_map.data() + old_size, 0, rows);
+ }
+
+ switch (file_type.getKind()) {
+ case ::orc::TypeKind::BOOLEAN:
+ return append_numeric_values<ColumnUInt8, UInt8,
::orc::LongVectorBatch>(nested_column,
+
batch, rows);
+ case ::orc::TypeKind::BYTE:
+ return append_numeric_values<ColumnInt8, Int8,
::orc::LongVectorBatch>(nested_column, batch,
+
rows);
+ case ::orc::TypeKind::SHORT:
+ return append_numeric_values<ColumnInt16, Int16,
::orc::LongVectorBatch>(nested_column,
+
batch, rows);
+ case ::orc::TypeKind::INT:
+ return append_numeric_values<ColumnInt32, Int32,
::orc::LongVectorBatch>(nested_column,
+
batch, rows);
+ case ::orc::TypeKind::LONG:
+ return append_numeric_values<ColumnInt64, Int64,
::orc::LongVectorBatch>(nested_column,
+
batch, rows);
+ case ::orc::TypeKind::FLOAT:
+ return append_floating_values<ColumnFloat32, Float32>(nested_column,
batch, rows);
+ case ::orc::TypeKind::DOUBLE:
+ return append_floating_values<ColumnFloat64, Float64>(nested_column,
batch, rows);
+ case ::orc::TypeKind::STRING:
+ case ::orc::TypeKind::BINARY:
+ case ::orc::TypeKind::VARCHAR:
+ case ::orc::TypeKind::CHAR:
+ return _decode_string_column(file_type, batch, nested_column, rows);
+ case ::orc::TypeKind::DATE:
+ return _decode_date_column(batch, nested_column, rows);
+ case ::orc::TypeKind::TIMESTAMP:
+ case ::orc::TypeKind::TIMESTAMP_INSTANT:
+ return _decode_timestamp_column(batch, nested_column, rows);
+ case ::orc::TypeKind::DECIMAL:
+ return _decode_decimal_column(batch, nested_column, rows);
+ case ::orc::TypeKind::LIST:
+ return _decode_list_column(file_type, selected_type, batch,
nested_column, rows);
+ case ::orc::TypeKind::MAP:
+ return _decode_map_column(file_type, selected_type, batch,
nested_column, rows);
+ case ::orc::TypeKind::STRUCT:
+ return _decode_struct_column(file_type, selected_type, batch,
nested_column, rows);
+ default:
+ return Status::NotSupported("ORC type {} is not supported by new ORC
reader",
+ static_cast<int>(file_type.getKind()));
+ }
+}
+
+Status OrcReader::_decode_string_column(const ::orc::Type& file_type,
+ const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column,
size_t rows) const {
+ const auto* orc_batch = dynamic_cast<const
::orc::StringVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC string batch type {}",
batch.toString());
+ }
+ auto& string_column = assert_cast<ColumnString&>(*nested_column);
+ static constexpr const char* EMPTY_STRING = "";
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ string_column.insert_data(EMPTY_STRING, 0);
+ continue;
+ }
+ auto length = static_cast<size_t>(orc_batch->length[row]);
+ if (file_type.getKind() == ::orc::TypeKind::CHAR) {
+ length = trim_right_spaces(orc_batch->data[row], length);
+ }
+ string_column.insert_data(length == 0 ? EMPTY_STRING :
orc_batch->data[row], length);
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_date_column(const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column, size_t
rows) const {
+ const auto* orc_batch = dynamic_cast<const
::orc::LongVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC date batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnDateV2&>(*nested_column).get_data();
+ const size_t old_data_size = data.size();
+ data.resize(old_data_size + rows);
+ auto& date_dict = date_day_offset_dict::get();
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_data_size + row] = DateV2Value<DateV2ValueType> {};
+ continue;
+ }
+ data[old_data_size + row] =
date_dict[cast_set<int>(orc_batch->data[row])];
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_timestamp_column(const ::orc::ColumnVectorBatch&
batch,
+ MutableColumnPtr& nested_column,
size_t rows) const {
+ const auto* orc_batch = dynamic_cast<const
::orc::TimestampVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC timestamp batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnDateTimeV2&>(*nested_column).get_data();
+ const size_t old_data_size = data.size();
+ data.resize(old_data_size + rows);
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_data_size + row] = DateV2Value<DateTimeV2ValueType> {};
+ continue;
+ }
+ auto& value =
+
reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(data[old_data_size + row]);
+ value.from_unixtime(orc_batch->data[row], cctz::utc_time_zone());
+ value.set_microsecond(cast_set<uint64_t>(orc_batch->nanoseconds[row] /
1000));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_decimal_column(const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column,
size_t rows) const {
+ auto& data = assert_cast<ColumnDecimal128V3&>(*nested_column).get_data();
+ const size_t old_data_size = data.size();
+ data.resize(old_data_size + rows);
+ if (const auto* decimal64_batch = dynamic_cast<const
::orc::Decimal64VectorBatch*>(&batch);
+ decimal64_batch != nullptr) {
+ for (size_t row = 0; row < rows; ++row) {
+ data[old_data_size + row] = is_null_at(batch, row)
+ ? Decimal128V3(0)
+ :
Decimal128V3(decimal64_batch->values[row]);
+ }
+ return Status::OK();
+ }
+ const auto* decimal128_batch = dynamic_cast<const
::orc::Decimal128VectorBatch*>(&batch);
+ if (decimal128_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC decimal batch type {}",
batch.toString());
+ }
+ for (size_t row = 0; row < rows; ++row) {
+ data[old_data_size + row] =
+ is_null_at(batch, row) ? Decimal128V3(0)
+ :
Decimal128V3(to_int128(decimal128_batch->values[row]));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_list_column(const ::orc::Type& file_type,
+ const ::orc::Type& selected_type,
+ const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column, size_t
rows) const {
+ const auto* orc_list = dynamic_cast<const ::orc::ListVectorBatch*>(&batch);
+ if (orc_list == nullptr) {
+ return Status::InternalError("Unexpected ORC list batch type {}",
batch.toString());
+ }
+ DORIS_CHECK(file_type.getSubtypeCount() == 1);
+ DORIS_CHECK(selected_type.getSubtypeCount() == 1);
+ DORIS_CHECK(orc_list->elements != nullptr);
+ const auto* file_element_type = file_type.getSubtype(0);
+ const auto* selected_element_type = selected_type.getSubtype(0);
+ DORIS_CHECK(file_element_type != nullptr);
+ DORIS_CHECK(selected_element_type != nullptr);
+
+ auto& array_column = assert_cast<ColumnArray&>(*nested_column);
+ size_t element_size = 0;
+ RETURN_IF_ERROR(
+ append_orc_offsets(array_column.get_offsets(), orc_list->offsets,
rows, &element_size));
+ auto element_column = array_column.get_data_ptr()->assume_mutable();
+ RETURN_IF_ERROR(_decode_column(*file_element_type, *selected_element_type,
*orc_list->elements,
+ element_column, element_size));
+ array_column.get_data_ptr() = std::move(element_column);
+ return Status::OK();
+}
+
+Status OrcReader::_decode_map_column(const ::orc::Type& file_type, const
::orc::Type& selected_type,
+ const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column, size_t
rows) const {
+ const auto* orc_map = dynamic_cast<const ::orc::MapVectorBatch*>(&batch);
+ if (orc_map == nullptr) {
+ return Status::InternalError("Unexpected ORC map batch type {}",
batch.toString());
+ }
+ DORIS_CHECK(file_type.getSubtypeCount() == 2);
+ DORIS_CHECK(selected_type.getSubtypeCount() == 2);
+ DORIS_CHECK(orc_map->keys != nullptr);
+ DORIS_CHECK(orc_map->elements != nullptr);
+ auto& map_column = assert_cast<ColumnMap&>(*nested_column);
+ size_t element_size = 0;
+ RETURN_IF_ERROR(
+ append_orc_offsets(map_column.get_offsets(), orc_map->offsets,
rows, &element_size));
+
+ auto key_column = map_column.get_keys_ptr()->assume_mutable();
+ const auto* file_key_type = file_type.getSubtype(0);
+ const auto* selected_key_type = selected_type.getSubtype(0);
+ DORIS_CHECK(file_key_type != nullptr);
+ DORIS_CHECK(selected_key_type != nullptr);
+ RETURN_IF_ERROR(_decode_column(*file_key_type, *selected_key_type,
*orc_map->keys, key_column,
+ element_size));
+ map_column.get_keys_ptr() = std::move(key_column);
+ auto value_column = map_column.get_values_ptr()->assume_mutable();
+ const auto* file_value_type = file_type.getSubtype(1);
+ const auto* selected_value_type = selected_type.getSubtype(1);
+ DORIS_CHECK(file_value_type != nullptr);
+ DORIS_CHECK(selected_value_type != nullptr);
+ RETURN_IF_ERROR(_decode_column(*file_value_type, *selected_value_type,
*orc_map->elements,
+ value_column, element_size));
+ map_column.get_values_ptr() = std::move(value_column);
+ return Status::OK();
+}
+
+Status OrcReader::_decode_struct_column(const ::orc::Type& file_type,
+ const ::orc::Type& selected_type,
+ const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column,
size_t rows) const {
+ const auto* orc_struct = dynamic_cast<const
::orc::StructVectorBatch*>(&batch);
+ if (orc_struct == nullptr) {
+ return Status::InternalError("Unexpected ORC struct batch type {}",
batch.toString());
+ }
+ DORIS_CHECK(selected_type.getSubtypeCount() == orc_struct->fields.size());
+ auto& struct_column = assert_cast<ColumnStruct&>(*nested_column);
+ DORIS_CHECK(struct_column.tuple_size() == selected_type.getSubtypeCount());
+
+ for (uint64_t selected_idx = 0; selected_idx <
selected_type.getSubtypeCount();
+ ++selected_idx) {
+ const auto field_name = selected_type.getFieldName(selected_idx);
+ const auto file_child_idx = find_struct_child_index(file_type,
field_name);
+ if (file_child_idx < 0) {
+ return Status::InternalError("Selected ORC field {} is not in file
struct", field_name);
+ }
+ const auto* file_child_type =
file_type.getSubtype(static_cast<uint64_t>(file_child_idx));
+ const auto* selected_child_type =
selected_type.getSubtype(selected_idx);
+ DORIS_CHECK(file_child_type != nullptr);
+ DORIS_CHECK(selected_child_type != nullptr);
+ DORIS_CHECK(selected_idx < orc_struct->fields.size());
+ auto child_column =
+
struct_column.get_column_ptr(static_cast<size_t>(selected_idx))->assume_mutable();
+ RETURN_IF_ERROR(_decode_column(*file_child_type, *selected_child_type,
+ *orc_struct->fields[selected_idx],
child_column, rows));
+ struct_column.get_column_ptr(static_cast<size_t>(selected_idx)) =
std::move(child_column);
+ }
+ return Status::OK();
+}
+
+Status OrcReader::get_block(Block* file_block, size_t* rows, bool* eof) {
+ if (_state == nullptr || !_state->row_reader_created || _state->batch ==
nullptr) {
+ return Status::Uninitialized("OrcReader is not open");
+ }
+ *rows = 0;
+ if (_eof) {
+ *eof = true;
+ return Status::OK();
+ }
Review Comment:
`get_block()` appends decoded values into the existing `Block` columns but
never clears/reset them at the beginning of the call (and also returns early on
EOF without clearing). If callers reuse the same `Block` instance across calls
(which is common), this can accumulate rows across batches and can also leave
stale rows in the `Block` when `rows=0`/`eof=true` (e.g., tests that assert
`block.rows() == 0` on EOF). A concrete fix is to clear/resize the target
columns (or the whole block's column data) at the start of `get_block()`, and
ensure columns are empty when returning `rows==0`.
##########
be/src/format/new_orc/orc_reader.cpp:
##########
@@ -0,0 +1,1065 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/new_orc/orc_reader.h"
+
+#include <cctz/time_zone.h>
+#include <gen_cpp/Types_types.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <list>
+#include <map>
+#include <memory>
+#include <orc/OrcFile.hh>
+#include <orc/Vector.hh>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/consts.h"
+#include "common/exception.h"
+#include "core/block/block.h"
+#include "core/column/column_array.h"
+#include "core/column/column_decimal.h"
+#include "core/column/column_map.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_date_time.h"
+#include "core/data_type/data_type_decimal.h"
+#include "core/data_type/data_type_map.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
+#include "core/types.h"
+#include "core/value/vdatetime_value.h"
+#include "io/fs/file_reader.h"
+#include "runtime/exec_env.h"
+#include "util/slice.h"
+
+namespace doris::new_orc {
+namespace {
+
+constexpr uint64_t DEFAULT_ORC_READ_BATCH_SIZE = 4096;
+constexpr uint64_t DEFAULT_ORC_NATURAL_READ_SIZE = 128 * 1024;
+constexpr int DECIMAL_PRECISION_FOR_HIVE11 =
BeConsts::MAX_DECIMAL128_PRECISION;
+constexpr int DECIMAL_SCALE_FOR_HIVE11 = 10;
+constexpr const char* ORC_LIST_ELEMENT_NAME = "element";
+constexpr const char* ORC_MAP_ENTRY_NAME = "key_value";
+constexpr const char* ORC_MAP_KEY_NAME = "key";
+constexpr const char* ORC_MAP_VALUE_NAME = "value";
+
+class DorisOrcInputStream final : public ::orc::InputStream {
+public:
+ DorisOrcInputStream(std::string file_name, io::FileReaderSPtr file_reader,
+ io::IOContext* io_ctx)
+ : _file_name(std::move(file_name)),
+ _file_reader(std::move(file_reader)),
+ _io_ctx(io_ctx) {}
+
+ uint64_t getLength() const override { return _file_reader->size(); }
+
+ uint64_t getNaturalReadSize() const override { return
DEFAULT_ORC_NATURAL_READ_SIZE; }
+
+ void read(void* buf, uint64_t length, uint64_t offset) override {
+ uint64_t bytes_read = 0;
+ auto* out = static_cast<uint8_t*>(buf);
+ while (bytes_read < length) {
+ size_t loop_read = 0;
+ Status st = _file_reader->read_at(
+ static_cast<size_t>(offset + bytes_read),
+ Slice(out + bytes_read, static_cast<size_t>(length -
bytes_read)), &loop_read,
+ _io_ctx);
+ if (!st.ok()) {
+ throw ::orc::ParseError("Failed to read " + _file_name + ": " +
+ st.to_string_no_stack());
+ }
+ if (loop_read == 0) {
+ break;
+ }
+ bytes_read += loop_read;
+ }
+ if (bytes_read != length) {
+ throw ::orc::ParseError("Short read from " + _file_name);
+ }
+ }
+
+ const std::string& getName() const override { return _file_name; }
+
+private:
+ std::string _file_name;
+ io::FileReaderSPtr _file_reader;
+ io::IOContext* _io_ctx = nullptr;
+};
+
+bool is_null_at(const ::orc::ColumnVectorBatch& batch, size_t row) {
+ return batch.hasNulls && !batch.notNull[row];
+}
+
+template <typename ColumnType, typename ValueType, typename OrcBatchType>
+Status append_numeric_values(MutableColumnPtr& column, const
::orc::ColumnVectorBatch& batch,
+ size_t rows) {
+ const auto* orc_batch = dynamic_cast<const OrcBatchType*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC numeric batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnType&>(*column).get_data();
+ const size_t old_size = data.size();
+ data.resize(old_size + rows);
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_size + row] = ValueType {};
+ continue;
+ }
+ data[old_size + row] = static_cast<ValueType>(orc_batch->data[row]);
+ }
+ return Status::OK();
+}
+
+template <typename ColumnType, typename ValueType>
+Status append_floating_values(MutableColumnPtr& column, const
::orc::ColumnVectorBatch& batch,
+ size_t rows) {
+ const auto* orc_batch = dynamic_cast<const
::orc::DoubleVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC floating batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnType&>(*column).get_data();
+ const size_t old_size = data.size();
+ data.resize(old_size + rows);
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_size + row] = ValueType {};
+ continue;
+ }
+ data[old_size + row] = static_cast<ValueType>(orc_batch->data[row]);
+ }
+ return Status::OK();
+}
+
+size_t trim_right_spaces(const char* value, size_t length) {
+ while (length > 0 && value[length - 1] == ' ') {
+ --length;
+ }
+ return length;
+}
+
+Int128 to_int128(::orc::Int128 value) {
+ const auto high_bits =
static_cast<__uint128_t>(static_cast<uint64_t>(value.getHighBits()));
+ const auto low_bits = static_cast<__uint128_t>(value.getLowBits());
+ return static_cast<Int128>((high_bits << 64) | low_bits);
+}
+
+std::vector<int32_t> append_path(const std::vector<int32_t>& path, int32_t
child_idx) {
+ auto child_path = path;
+ child_path.push_back(child_idx);
+ return child_path;
+}
+
+std::vector<std::string> append_name_path(const std::vector<std::string>& path,
+ const std::string& child_name) {
+ auto child_path = path;
+ child_path.push_back(child_name);
+ return child_path;
+}
+
+bool is_direct_projection_child(const reader::FieldProjection& parent,
+ const reader::FieldProjection& child) {
+ return child.file_path.size() == parent.file_path.size() + 1 &&
+ std::equal(parent.file_path.begin(), parent.file_path.end(),
child.file_path.begin());
+}
+
+Status get_projection_child_index(const reader::FieldProjection& parent,
+ const reader::FieldProjection& child,
int32_t child_count,
+ const std::string& column_name, int32_t*
child_idx) {
+ if (child.file_path.empty() || !is_direct_projection_child(parent, child))
{
+ return Status::InvalidArgument("Invalid ORC projection path for column
{}", column_name);
+ }
+ *child_idx = child.file_path.back();
+ if (*child_idx < 0 || *child_idx >= child_count) {
+ return Status::InvalidArgument("Invalid ORC projection child index {}
for column {}",
+ *child_idx, column_name);
+ }
+ return Status::OK();
+}
+
+void collect_type_and_descendant_ids(const ::orc::Type& type,
std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type_ids != nullptr);
+ type_ids->insert(type.getColumnId());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ collect_type_and_descendant_ids(*child_type, type_ids);
+ }
+}
+
+Status collect_projected_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection& projection,
+ std::set<uint64_t>* const type_ids);
+
+Status collect_projected_map_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ std::set<uint64_t>* const type_ids);
+
+Status collect_projected_map_entry_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ const reader::FieldProjection&
entry_projection,
+ std::set<uint64_t>* const type_ids,
+ bool* const selected_key, bool*
const selected_value) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ DORIS_CHECK(selected_key != nullptr);
+ DORIS_CHECK(selected_value != nullptr);
+
+ int32_t entry_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(projection, entry_projection,
1, ORC_MAP_ENTRY_NAME,
+ &entry_idx));
+ DORIS_CHECK(entry_idx == 0);
+ if (entry_projection.project_all_children) {
+ collect_type_and_descendant_ids(*type.getSubtype(0), type_ids);
+ collect_type_and_descendant_ids(*type.getSubtype(1), type_ids);
+ *selected_key = true;
+ *selected_value = true;
+ return Status::OK();
+ }
+ if (entry_projection.children.empty()) {
+ return Status::NotSupported("ORC MAP entry projection contains no
children");
+ }
+ for (const auto& key_value_projection : entry_projection.children) {
+ int32_t key_value_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(entry_projection,
key_value_projection, 2,
+ ORC_MAP_ENTRY_NAME,
&key_value_idx));
+ const auto* child_type =
type.getSubtype(static_cast<uint64_t>(key_value_idx));
+ DORIS_CHECK(child_type != nullptr);
+ RETURN_IF_ERROR(collect_projected_type_ids(*child_type,
key_value_projection, type_ids));
+ *selected_key = *selected_key || key_value_idx == 0;
+ *selected_value = *selected_value || key_value_idx == 1;
+ }
+ return Status::OK();
+}
+
+Status collect_projected_map_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection&
projection,
+ std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ type_ids->insert(type.getColumnId());
+ if (projection.project_all_children) {
+ collect_type_and_descendant_ids(type, type_ids);
+ return Status::OK();
+ }
+ if (projection.children.empty()) {
+ int32_t column_id = -1;
+ if (!projection.file_path.empty()) {
+ column_id = projection.file_path[0];
+ }
+ return Status::NotSupported("ORC MAP projection for column {} contains
no children",
+ column_id);
+ }
+
+ bool selected_key = false;
+ bool selected_value = false;
+ for (const auto& entry_projection : projection.children) {
+ RETURN_IF_ERROR(collect_projected_map_entry_type_ids(
+ type, projection, entry_projection, type_ids, &selected_key,
&selected_value));
+ }
+ if (!selected_key || !selected_value) {
+ return Status::NotSupported("ORC MAP projection must include both key
and value");
+ }
+ return Status::OK();
+}
+
+Status collect_projected_type_ids(const ::orc::Type& type,
+ const reader::FieldProjection& projection,
+ std::set<uint64_t>* const type_ids) {
+ DORIS_CHECK(type_ids != nullptr);
+ type_ids->insert(type.getColumnId());
+ if (projection.project_all_children) {
+ collect_type_and_descendant_ids(type, type_ids);
+ return Status::OK();
+ }
+ if (projection.children.empty()) {
+ return Status::NotSupported("ORC projection contains no children");
+ }
+ if (type.getKind() == ::orc::TypeKind::MAP) {
+ return collect_projected_map_type_ids(type, projection, type_ids);
+ }
+ if (type.getKind() != ::orc::TypeKind::STRUCT && type.getKind() !=
::orc::TypeKind::LIST) {
+ return Status::InvalidArgument("Cannot project children from
non-complex ORC type {}",
+ static_cast<int>(type.getKind()));
+ }
+
+ const auto child_count = static_cast<int32_t>(type.getSubtypeCount());
+ for (const auto& child_projection : projection.children) {
+ int32_t child_idx = 0;
+ RETURN_IF_ERROR(get_projection_child_index(projection,
child_projection, child_count,
+ "orc_complex", &child_idx));
+ const auto* child_type =
type.getSubtype(static_cast<uint64_t>(child_idx));
+ DORIS_CHECK(child_type != nullptr);
+ RETURN_IF_ERROR(collect_projected_type_ids(*child_type,
child_projection, type_ids));
+ }
+ return Status::OK();
+}
+
+Status append_orc_offsets(ColumnArray::Offsets64& doris_offsets,
+ const ::orc::DataBuffer<int64_t>& orc_offsets,
size_t rows,
+ size_t* element_size) {
+ const auto prev_offset = doris_offsets.empty() ? 0 : doris_offsets.back();
+ const auto base_offset = orc_offsets[0];
+ for (size_t idx = 1; idx <= rows; ++idx) {
+ const auto delta = orc_offsets[idx] - base_offset;
+ if (delta < 0) {
+ return Status::Corruption("Invalid ORC offsets");
+ }
+ doris_offsets.push_back(prev_offset +
static_cast<ColumnArray::Offset64>(delta));
+ }
+ const auto total_delta = orc_offsets[rows] - base_offset;
+ if (total_delta < 0) {
+ return Status::Corruption("Invalid ORC offsets");
+ }
+ *element_size = static_cast<size_t>(total_delta);
+ return Status::OK();
+}
+
+int64_t find_struct_child_index(const ::orc::Type& type, const std::string&
field_name) {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ if (type.getFieldName(child_idx) == field_name) {
+ return static_cast<int64_t>(child_idx);
+ }
+ }
+ return -1;
+}
+
+} // namespace
+
+struct OrcReaderScanState {
+ std::unique_ptr<::orc::Reader> reader;
+ const ::orc::Type* root_type = nullptr;
+ ::orc::ReaderMetrics reader_metrics;
+ ::orc::RowReaderOptions row_reader_options;
+ std::unique_ptr<::orc::RowReader> row_reader;
+ const ::orc::Type* selected_type = nullptr;
+ std::unique_ptr<::orc::ColumnVectorBatch> batch;
+ std::vector<reader::ColumnId> read_columns;
+ std::map<reader::ColumnId, size_t> column_to_selected_batch_index;
+ bool row_reader_created = false;
+};
+
+OrcReader::OrcReader(std::shared_ptr<io::FileSystemProperties>&
system_properties,
+ std::unique_ptr<io::FileDescription>& file_description,
+ std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile*
profile)
+ : FileReader(system_properties, file_description, io_ctx, profile) {}
+
+OrcReader::~OrcReader() = default;
+
+Status OrcReader::init(RuntimeState* state) {
+ RETURN_IF_ERROR(reader::FileReader::init(state));
+ _state = std::make_unique<OrcReaderScanState>();
+
+ ::orc::ReaderOptions options;
+ options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
+ options.setReaderMetrics(&_state->reader_metrics);
+
+ auto input_stream =
std::make_unique<DorisOrcInputStream>(_file_description->path,
+
_tracing_file_reader, _io_ctx.get());
+ try {
+ _state->reader = ::orc::createReader(std::move(input_stream), options);
+ _state->root_type = &_state->reader->getType();
+ } catch (const std::exception& e) {
+ return Status::InternalError("Failed to open ORC file {}: {}",
_file_description->path,
+ e.what());
+ }
+ return Status::OK();
+}
+
+DataTypePtr OrcReader::_convert_to_doris_type(const ::orc::Type& type) const {
+ DataTypePtr data_type;
+ switch (type.getKind()) {
+ case ::orc::TypeKind::BOOLEAN:
+ data_type = std::make_shared<DataTypeUInt8>();
+ break;
+ case ::orc::TypeKind::BYTE:
+ data_type = std::make_shared<DataTypeInt8>();
+ break;
+ case ::orc::TypeKind::SHORT:
+ data_type = std::make_shared<DataTypeInt16>();
+ break;
+ case ::orc::TypeKind::INT:
+ data_type = std::make_shared<DataTypeInt32>();
+ break;
+ case ::orc::TypeKind::LONG:
+ data_type = std::make_shared<DataTypeInt64>();
+ break;
+ case ::orc::TypeKind::FLOAT:
+ data_type = std::make_shared<DataTypeFloat32>();
+ break;
+ case ::orc::TypeKind::DOUBLE:
+ data_type = std::make_shared<DataTypeFloat64>();
+ break;
+ case ::orc::TypeKind::STRING:
+ case ::orc::TypeKind::BINARY:
+ data_type = std::make_shared<DataTypeString>();
+ break;
+ case ::orc::TypeKind::VARCHAR:
+ data_type =
std::make_shared<DataTypeString>(cast_set<int>(type.getMaximumLength()),
+
PrimitiveType::TYPE_VARCHAR);
+ break;
+ case ::orc::TypeKind::CHAR:
+ data_type =
std::make_shared<DataTypeString>(cast_set<int>(type.getMaximumLength()),
+ PrimitiveType::TYPE_CHAR);
+ break;
+ case ::orc::TypeKind::DATE:
+ data_type = std::make_shared<DataTypeDateV2>();
+ break;
+ case ::orc::TypeKind::TIMESTAMP:
+ case ::orc::TypeKind::TIMESTAMP_INSTANT:
+ data_type = std::make_shared<DataTypeDateTimeV2>(6);
+ break;
+ case ::orc::TypeKind::DECIMAL:
+ data_type = std::make_shared<DataTypeDecimal<TYPE_DECIMAL128I>>(
+ type.getPrecision() == 0 ? DECIMAL_PRECISION_FOR_HIVE11
+ : cast_set<int>(type.getPrecision()),
+ type.getPrecision() == 0 ? DECIMAL_SCALE_FOR_HIVE11
+ : cast_set<int>(type.getScale()));
+ break;
+ case ::orc::TypeKind::LIST:
+ data_type = _convert_list_to_doris_type(type);
+ break;
+ case ::orc::TypeKind::MAP:
+ data_type = _convert_map_to_doris_type(type);
+ break;
+ case ::orc::TypeKind::STRUCT:
+ data_type = _convert_struct_to_doris_type(type);
+ break;
+ default:
+ throw doris::Exception(
+ Status::NotSupported("ORC type {} is not supported by new ORC
reader",
+ static_cast<int>(type.getKind())));
+ }
+ return make_nullable(data_type);
+}
+
+DataTypePtr OrcReader::_convert_list_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::LIST);
+ DORIS_CHECK(type.getSubtypeCount() == 1);
+ const auto* element_type = type.getSubtype(0);
+ DORIS_CHECK(element_type != nullptr);
+ return
std::make_shared<DataTypeArray>(_convert_to_doris_type(*element_type));
+}
+
+DataTypePtr OrcReader::_convert_map_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ const auto* key_type = type.getSubtype(0);
+ const auto* value_type = type.getSubtype(1);
+ DORIS_CHECK(key_type != nullptr);
+ DORIS_CHECK(value_type != nullptr);
+ return std::make_shared<DataTypeMap>(_convert_to_doris_type(*key_type),
+ _convert_to_doris_type(*value_type));
+}
+
+DataTypePtr OrcReader::_convert_struct_to_doris_type(const ::orc::Type& type)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ DataTypes child_types;
+ Strings child_names;
+ child_types.reserve(type.getSubtypeCount());
+ child_names.reserve(type.getSubtypeCount());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ child_types.push_back(_convert_to_doris_type(*child_type));
+ child_names.push_back(type.getFieldName(child_idx));
+ }
+ return std::make_shared<DataTypeStruct>(child_types, child_names);
+}
+
+Status OrcReader::_fill_schema_field(const ::orc::Type& type, int32_t field_id,
+ const std::string& field_name,
+ const std::vector<int32_t>& file_path,
+ const std::vector<std::string>& name_path,
+ reader::SchemaField* const field) const {
+ if (field == nullptr) {
+ return Status::InvalidArgument("schema field is null");
+ }
+ field->id = field_id;
+ field->name = field_name;
+ field->file_path = file_path;
+ field->field_id_path.clear();
+ field->name_path = name_path;
+ field->column_type = reader::ColumnType::DATA_COLUMN;
+ try {
+ field->type = _convert_to_doris_type(type);
+ } catch (const doris::Exception& e) {
+ return e.to_status();
+ }
+ field->children.clear();
+ switch (type.getKind()) {
+ case ::orc::TypeKind::STRUCT:
+ return _fill_struct_schema_children(type, file_path, name_path, field);
+ case ::orc::TypeKind::LIST:
+ return _fill_list_schema_children(type, file_path, name_path, field);
+ case ::orc::TypeKind::MAP:
+ return _fill_map_schema_children(type, file_path, name_path, field);
+ default:
+ break;
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_fill_struct_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const
field) const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::STRUCT);
+ field->children.reserve(type.getSubtypeCount());
+ for (uint64_t child_idx = 0; child_idx < type.getSubtypeCount();
++child_idx) {
+ const auto* child_type = type.getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ const auto child_name = type.getFieldName(child_idx);
+ reader::SchemaField child_field;
+ RETURN_IF_ERROR(_fill_schema_field(*child_type,
static_cast<int32_t>(child_idx), child_name,
+ append_path(file_path,
static_cast<int32_t>(child_idx)),
+ append_name_path(name_path,
child_name), &child_field));
+ field->children.push_back(std::move(child_field));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_fill_list_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const field)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::LIST);
+ DORIS_CHECK(type.getSubtypeCount() == 1);
+ const auto* element_type = type.getSubtype(0);
+ DORIS_CHECK(element_type != nullptr);
+
+ reader::SchemaField element_field;
+ RETURN_IF_ERROR(
+ _fill_schema_field(*element_type, 0, ORC_LIST_ELEMENT_NAME,
append_path(file_path, 0),
+ append_name_path(name_path,
ORC_LIST_ELEMENT_NAME), &element_field));
+ field->children.push_back(std::move(element_field));
+ return Status::OK();
+}
+
+Status OrcReader::_fill_map_schema_children(const ::orc::Type& type,
+ const std::vector<int32_t>&
file_path,
+ const std::vector<std::string>&
name_path,
+ reader::SchemaField* const field)
const {
+ DORIS_CHECK(type.getKind() == ::orc::TypeKind::MAP);
+ DORIS_CHECK(type.getSubtypeCount() == 2);
+ const auto* key_type = type.getSubtype(0);
+ const auto* value_type = type.getSubtype(1);
+ DORIS_CHECK(key_type != nullptr);
+ DORIS_CHECK(value_type != nullptr);
+
+ const auto entry_file_path = append_path(file_path, 0);
+ const auto entry_name_path = append_name_path(name_path,
ORC_MAP_ENTRY_NAME);
+ reader::SchemaField key_field;
+ RETURN_IF_ERROR(
+ _fill_schema_field(*key_type, 0, ORC_MAP_KEY_NAME,
append_path(entry_file_path, 0),
+ append_name_path(entry_name_path,
ORC_MAP_KEY_NAME), &key_field));
+ reader::SchemaField value_field;
+ RETURN_IF_ERROR(_fill_schema_field(
+ *value_type, 1, ORC_MAP_VALUE_NAME, append_path(entry_file_path,
1),
+ append_name_path(entry_name_path, ORC_MAP_VALUE_NAME),
&value_field));
+
+ reader::SchemaField entry_field;
+ entry_field.id = 0;
+ entry_field.name = ORC_MAP_ENTRY_NAME;
+ entry_field.file_path = entry_file_path;
+ entry_field.field_id_path.clear();
+ entry_field.name_path = entry_name_path;
+ entry_field.column_type = reader::ColumnType::DATA_COLUMN;
+ entry_field.children.push_back(std::move(key_field));
+ entry_field.children.push_back(std::move(value_field));
+
+ DataTypes entry_child_types;
+ Strings entry_child_names;
+ entry_child_types.reserve(entry_field.children.size());
+ entry_child_names.reserve(entry_field.children.size());
+ for (const auto& child : entry_field.children) {
+ entry_child_types.push_back(child.type);
+ entry_child_names.push_back(child.name);
+ }
+ entry_field.type = std::make_shared<DataTypeStruct>(entry_child_types,
entry_child_names);
+ field->children.push_back(std::move(entry_field));
+ return Status::OK();
+}
+
+Status OrcReader::get_schema(std::vector<reader::SchemaField>* const
file_schema) const {
+ if (file_schema == nullptr) {
+ return Status::InvalidArgument("file_schema is null");
+ }
+ if (_state == nullptr || _state->root_type == nullptr) {
+ return Status::Uninitialized("OrcReader is not open");
+ }
+ if (_state->root_type->getKind() != ::orc::TypeKind::STRUCT) {
+ return Status::NotSupported("ORC reader only supports top-level struct
schema");
+ }
+ file_schema->clear();
+ file_schema->reserve(_state->root_type->getSubtypeCount());
+ for (uint64_t child_idx = 0; child_idx <
_state->root_type->getSubtypeCount(); ++child_idx) {
+ const auto* child_type = _state->root_type->getSubtype(child_idx);
+ DORIS_CHECK(child_type != nullptr);
+ const auto child_name = _state->root_type->getFieldName(child_idx);
+ reader::SchemaField field;
+ RETURN_IF_ERROR(_fill_schema_field(*child_type,
static_cast<int32_t>(child_idx), child_name,
+ {static_cast<int32_t>(child_idx)},
{child_name},
+ &field));
+ file_schema->push_back(std::move(field));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
+ if (_state == nullptr || _state->reader == nullptr || _state->root_type ==
nullptr) {
+ return Status::Uninitialized("OrcReader is not open");
+ }
+ RETURN_IF_ERROR(reader::FileReader::open(request));
+ if (!_request->conjuncts.empty() || !_request->delete_conjuncts.empty() ||
+ !_request->column_predicate_filters.empty() ||
!_request->reader_expression_map.empty()) {
+ return Status::NotSupported(
+ "New ORC reader does not support file-local filters or reader
expressions");
+ }
+
+ if (_request->column_positions.empty()) {
+ for (const auto file_column_id : _request->predicate_columns) {
+ _request->column_positions.emplace(file_column_id, file_column_id);
+ }
+ for (const auto file_column_id : _request->non_predicate_columns) {
+ _request->column_positions.emplace(file_column_id, file_column_id);
+ }
+ }
+
+ _state->read_columns.clear();
+ _state->read_columns.reserve(_request->predicate_columns.size() +
+ _request->non_predicate_columns.size());
+ _state->read_columns.insert(_state->read_columns.end(),
_request->predicate_columns.begin(),
+ _request->predicate_columns.end());
+ _state->read_columns.insert(_state->read_columns.end(),
_request->non_predicate_columns.begin(),
+ _request->non_predicate_columns.end());
+
+ std::sort(_state->read_columns.begin(), _state->read_columns.end());
+ _state->read_columns.erase(
+ std::unique(_state->read_columns.begin(),
_state->read_columns.end()),
+ _state->read_columns.end());
+
+ RETURN_IF_ERROR(_configure_row_reader_projection());
+ _state->row_reader_options.setTimezoneName("UTC");
+ _state->row_reader_options.setEnableLazyDecoding(false);
+ _state->row_reader_options.setUseTightNumericVector(false);
+ RETURN_IF_ERROR(_create_row_reader());
+ _eof = _state->reader->getNumberOfRows() == 0;
+ return Status::OK();
+}
+
+Status OrcReader::_configure_row_reader_projection() {
+ const auto num_fields =
static_cast<reader::ColumnId>(_state->root_type->getSubtypeCount());
+ for (const auto& [file_column_id, projection] :
_request->complex_projections) {
+ if (file_column_id < 0 || file_column_id >= num_fields) {
+ return Status::InvalidArgument("Invalid ORC projection top-level
field id {}",
+ file_column_id);
+ }
+ if (projection.file_column_id != file_column_id) {
+ return Status::InvalidArgument("ORC projection column id mismatch:
key={}, value={}",
+ file_column_id,
projection.file_column_id);
+ }
+ if (projection.file_path.empty() || projection.file_path.front() !=
file_column_id) {
+ return Status::InvalidArgument("Invalid ORC projection root path
for column {}",
+ file_column_id);
+ }
+ }
+
+ for (const auto file_column_id : _state->read_columns) {
+ DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
+ DORIS_CHECK(_request->column_positions.contains(file_column_id));
+ }
+ if (_request->complex_projections.empty()) {
+ std::list<uint64_t> include_columns;
+ for (const auto file_column_id : _state->read_columns) {
+ include_columns.push_back(static_cast<uint64_t>(file_column_id));
+ }
+ _state->row_reader_options.include(include_columns);
+ return Status::OK();
+ }
+
+ std::set<uint64_t> include_type_ids;
+ include_type_ids.insert(_state->root_type->getColumnId());
+ for (const auto file_column_id : _state->read_columns) {
+ const auto* type =
_state->root_type->getSubtype(static_cast<uint64_t>(file_column_id));
+ DORIS_CHECK(type != nullptr);
+ const auto projection_it =
_request->complex_projections.find(file_column_id);
+ if (projection_it == _request->complex_projections.end()) {
+ collect_type_and_descendant_ids(*type, &include_type_ids);
+ continue;
+ }
+ RETURN_IF_ERROR(
+ collect_projected_type_ids(*type, projection_it->second,
&include_type_ids));
+ }
+ std::list<uint64_t> include_type_id_list(include_type_ids.begin(),
include_type_ids.end());
+ _state->row_reader_options.includeTypes(include_type_id_list);
+ return Status::OK();
+}
+
+Status OrcReader::_create_row_reader() {
+ try {
+ _state->row_reader =
_state->reader->createRowReader(_state->row_reader_options);
+ _state->selected_type = &_state->row_reader->getSelectedType();
+ DORIS_CHECK(_state->selected_type->getKind() ==
::orc::TypeKind::STRUCT);
+ _state->batch =
_state->row_reader->createRowBatch(DEFAULT_ORC_READ_BATCH_SIZE);
+ _state->column_to_selected_batch_index.clear();
+ for (uint64_t selected_idx = 0; selected_idx <
_state->selected_type->getSubtypeCount();
+ ++selected_idx) {
+ const auto field_name =
_state->selected_type->getFieldName(selected_idx);
+ for (const auto file_column_id : _state->read_columns) {
+ if (field_name ==
_state->root_type->getFieldName(file_column_id)) {
+ _state->column_to_selected_batch_index.emplace(
+ file_column_id, static_cast<size_t>(selected_idx));
+ break;
+ }
+ }
+ }
+ DORIS_CHECK(_state->column_to_selected_batch_index.size() ==
_state->read_columns.size());
+ _state->row_reader_created = true;
+ } catch (const std::exception& e) {
+ return Status::InternalError("Failed to create ORC row reader: {}",
e.what());
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_column(const ::orc::Type& file_type, const
::orc::Type& selected_type,
+ const ::orc::ColumnVectorBatch& batch,
MutableColumnPtr& column,
+ size_t rows) const {
+ DORIS_CHECK(file_type.getKind() == selected_type.getKind());
+ DORIS_CHECK(column->is_nullable());
+ if (rows == 0) {
+ return Status::OK();
+ }
+ auto& nullable_column = assert_cast<ColumnNullable&>(*column);
+ auto nested_column = nullable_column.get_nested_column_ptr();
+ auto& null_map = nullable_column.get_null_map_data();
+ const size_t old_size = null_map.size();
+ null_map.resize(old_size + rows);
+ if (batch.hasNulls) {
+ for (size_t row = 0; row < rows; ++row) {
+ null_map[old_size + row] = !batch.notNull[row];
+ }
+ } else {
+ std::memset(null_map.data() + old_size, 0, rows);
+ }
+
+ switch (file_type.getKind()) {
+ case ::orc::TypeKind::BOOLEAN:
+ return append_numeric_values<ColumnUInt8, UInt8,
::orc::LongVectorBatch>(nested_column,
+
batch, rows);
+ case ::orc::TypeKind::BYTE:
+ return append_numeric_values<ColumnInt8, Int8,
::orc::LongVectorBatch>(nested_column, batch,
+
rows);
+ case ::orc::TypeKind::SHORT:
+ return append_numeric_values<ColumnInt16, Int16,
::orc::LongVectorBatch>(nested_column,
+
batch, rows);
+ case ::orc::TypeKind::INT:
+ return append_numeric_values<ColumnInt32, Int32,
::orc::LongVectorBatch>(nested_column,
+
batch, rows);
+ case ::orc::TypeKind::LONG:
+ return append_numeric_values<ColumnInt64, Int64,
::orc::LongVectorBatch>(nested_column,
+
batch, rows);
+ case ::orc::TypeKind::FLOAT:
+ return append_floating_values<ColumnFloat32, Float32>(nested_column,
batch, rows);
+ case ::orc::TypeKind::DOUBLE:
+ return append_floating_values<ColumnFloat64, Float64>(nested_column,
batch, rows);
+ case ::orc::TypeKind::STRING:
+ case ::orc::TypeKind::BINARY:
+ case ::orc::TypeKind::VARCHAR:
+ case ::orc::TypeKind::CHAR:
+ return _decode_string_column(file_type, batch, nested_column, rows);
+ case ::orc::TypeKind::DATE:
+ return _decode_date_column(batch, nested_column, rows);
+ case ::orc::TypeKind::TIMESTAMP:
+ case ::orc::TypeKind::TIMESTAMP_INSTANT:
+ return _decode_timestamp_column(batch, nested_column, rows);
+ case ::orc::TypeKind::DECIMAL:
+ return _decode_decimal_column(batch, nested_column, rows);
+ case ::orc::TypeKind::LIST:
+ return _decode_list_column(file_type, selected_type, batch,
nested_column, rows);
+ case ::orc::TypeKind::MAP:
+ return _decode_map_column(file_type, selected_type, batch,
nested_column, rows);
+ case ::orc::TypeKind::STRUCT:
+ return _decode_struct_column(file_type, selected_type, batch,
nested_column, rows);
+ default:
+ return Status::NotSupported("ORC type {} is not supported by new ORC
reader",
+ static_cast<int>(file_type.getKind()));
+ }
+}
+
+Status OrcReader::_decode_string_column(const ::orc::Type& file_type,
+ const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column,
size_t rows) const {
+ const auto* orc_batch = dynamic_cast<const
::orc::StringVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC string batch type {}",
batch.toString());
+ }
+ auto& string_column = assert_cast<ColumnString&>(*nested_column);
+ static constexpr const char* EMPTY_STRING = "";
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ string_column.insert_data(EMPTY_STRING, 0);
+ continue;
+ }
+ auto length = static_cast<size_t>(orc_batch->length[row]);
+ if (file_type.getKind() == ::orc::TypeKind::CHAR) {
+ length = trim_right_spaces(orc_batch->data[row], length);
+ }
+ string_column.insert_data(length == 0 ? EMPTY_STRING :
orc_batch->data[row], length);
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_date_column(const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column, size_t
rows) const {
+ const auto* orc_batch = dynamic_cast<const
::orc::LongVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC date batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnDateV2&>(*nested_column).get_data();
+ const size_t old_data_size = data.size();
+ data.resize(old_data_size + rows);
+ auto& date_dict = date_day_offset_dict::get();
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_data_size + row] = DateV2Value<DateV2ValueType> {};
+ continue;
+ }
+ data[old_data_size + row] =
date_dict[cast_set<int>(orc_batch->data[row])];
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_timestamp_column(const ::orc::ColumnVectorBatch&
batch,
+ MutableColumnPtr& nested_column,
size_t rows) const {
+ const auto* orc_batch = dynamic_cast<const
::orc::TimestampVectorBatch*>(&batch);
+ if (orc_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC timestamp batch type {}",
batch.toString());
+ }
+ auto& data = assert_cast<ColumnDateTimeV2&>(*nested_column).get_data();
+ const size_t old_data_size = data.size();
+ data.resize(old_data_size + rows);
+ for (size_t row = 0; row < rows; ++row) {
+ if (is_null_at(batch, row)) {
+ data[old_data_size + row] = DateV2Value<DateTimeV2ValueType> {};
+ continue;
+ }
+ auto& value =
+
reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(data[old_data_size + row]);
+ value.from_unixtime(orc_batch->data[row], cctz::utc_time_zone());
+ value.set_microsecond(cast_set<uint64_t>(orc_batch->nanoseconds[row] /
1000));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_decimal_column(const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column,
size_t rows) const {
+ auto& data = assert_cast<ColumnDecimal128V3&>(*nested_column).get_data();
+ const size_t old_data_size = data.size();
+ data.resize(old_data_size + rows);
+ if (const auto* decimal64_batch = dynamic_cast<const
::orc::Decimal64VectorBatch*>(&batch);
+ decimal64_batch != nullptr) {
+ for (size_t row = 0; row < rows; ++row) {
+ data[old_data_size + row] = is_null_at(batch, row)
+ ? Decimal128V3(0)
+ :
Decimal128V3(decimal64_batch->values[row]);
+ }
+ return Status::OK();
+ }
+ const auto* decimal128_batch = dynamic_cast<const
::orc::Decimal128VectorBatch*>(&batch);
+ if (decimal128_batch == nullptr) {
+ return Status::InternalError("Unexpected ORC decimal batch type {}",
batch.toString());
+ }
+ for (size_t row = 0; row < rows; ++row) {
+ data[old_data_size + row] =
+ is_null_at(batch, row) ? Decimal128V3(0)
+ :
Decimal128V3(to_int128(decimal128_batch->values[row]));
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_decode_list_column(const ::orc::Type& file_type,
+ const ::orc::Type& selected_type,
+ const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column, size_t
rows) const {
+ const auto* orc_list = dynamic_cast<const ::orc::ListVectorBatch*>(&batch);
+ if (orc_list == nullptr) {
+ return Status::InternalError("Unexpected ORC list batch type {}",
batch.toString());
+ }
+ DORIS_CHECK(file_type.getSubtypeCount() == 1);
+ DORIS_CHECK(selected_type.getSubtypeCount() == 1);
+ DORIS_CHECK(orc_list->elements != nullptr);
+ const auto* file_element_type = file_type.getSubtype(0);
+ const auto* selected_element_type = selected_type.getSubtype(0);
+ DORIS_CHECK(file_element_type != nullptr);
+ DORIS_CHECK(selected_element_type != nullptr);
+
+ auto& array_column = assert_cast<ColumnArray&>(*nested_column);
+ size_t element_size = 0;
+ RETURN_IF_ERROR(
+ append_orc_offsets(array_column.get_offsets(), orc_list->offsets,
rows, &element_size));
+ auto element_column = array_column.get_data_ptr()->assume_mutable();
+ RETURN_IF_ERROR(_decode_column(*file_element_type, *selected_element_type,
*orc_list->elements,
+ element_column, element_size));
+ array_column.get_data_ptr() = std::move(element_column);
+ return Status::OK();
+}
+
+Status OrcReader::_decode_map_column(const ::orc::Type& file_type, const
::orc::Type& selected_type,
+ const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column, size_t
rows) const {
+ const auto* orc_map = dynamic_cast<const ::orc::MapVectorBatch*>(&batch);
+ if (orc_map == nullptr) {
+ return Status::InternalError("Unexpected ORC map batch type {}",
batch.toString());
+ }
+ DORIS_CHECK(file_type.getSubtypeCount() == 2);
+ DORIS_CHECK(selected_type.getSubtypeCount() == 2);
+ DORIS_CHECK(orc_map->keys != nullptr);
+ DORIS_CHECK(orc_map->elements != nullptr);
+ auto& map_column = assert_cast<ColumnMap&>(*nested_column);
+ size_t element_size = 0;
+ RETURN_IF_ERROR(
+ append_orc_offsets(map_column.get_offsets(), orc_map->offsets,
rows, &element_size));
+
+ auto key_column = map_column.get_keys_ptr()->assume_mutable();
+ const auto* file_key_type = file_type.getSubtype(0);
+ const auto* selected_key_type = selected_type.getSubtype(0);
+ DORIS_CHECK(file_key_type != nullptr);
+ DORIS_CHECK(selected_key_type != nullptr);
+ RETURN_IF_ERROR(_decode_column(*file_key_type, *selected_key_type,
*orc_map->keys, key_column,
+ element_size));
+ map_column.get_keys_ptr() = std::move(key_column);
+ auto value_column = map_column.get_values_ptr()->assume_mutable();
+ const auto* file_value_type = file_type.getSubtype(1);
+ const auto* selected_value_type = selected_type.getSubtype(1);
+ DORIS_CHECK(file_value_type != nullptr);
+ DORIS_CHECK(selected_value_type != nullptr);
+ RETURN_IF_ERROR(_decode_column(*file_value_type, *selected_value_type,
*orc_map->elements,
+ value_column, element_size));
+ map_column.get_values_ptr() = std::move(value_column);
+ return Status::OK();
+}
+
+Status OrcReader::_decode_struct_column(const ::orc::Type& file_type,
+ const ::orc::Type& selected_type,
+ const ::orc::ColumnVectorBatch& batch,
+ MutableColumnPtr& nested_column,
size_t rows) const {
+ const auto* orc_struct = dynamic_cast<const
::orc::StructVectorBatch*>(&batch);
+ if (orc_struct == nullptr) {
+ return Status::InternalError("Unexpected ORC struct batch type {}",
batch.toString());
+ }
+ DORIS_CHECK(selected_type.getSubtypeCount() == orc_struct->fields.size());
+ auto& struct_column = assert_cast<ColumnStruct&>(*nested_column);
+ DORIS_CHECK(struct_column.tuple_size() == selected_type.getSubtypeCount());
+
+ for (uint64_t selected_idx = 0; selected_idx <
selected_type.getSubtypeCount();
+ ++selected_idx) {
+ const auto field_name = selected_type.getFieldName(selected_idx);
+ const auto file_child_idx = find_struct_child_index(file_type,
field_name);
+ if (file_child_idx < 0) {
+ return Status::InternalError("Selected ORC field {} is not in file
struct", field_name);
+ }
+ const auto* file_child_type =
file_type.getSubtype(static_cast<uint64_t>(file_child_idx));
+ const auto* selected_child_type =
selected_type.getSubtype(selected_idx);
+ DORIS_CHECK(file_child_type != nullptr);
+ DORIS_CHECK(selected_child_type != nullptr);
+ DORIS_CHECK(selected_idx < orc_struct->fields.size());
+ auto child_column =
+
struct_column.get_column_ptr(static_cast<size_t>(selected_idx))->assume_mutable();
+ RETURN_IF_ERROR(_decode_column(*file_child_type, *selected_child_type,
+ *orc_struct->fields[selected_idx],
child_column, rows));
+ struct_column.get_column_ptr(static_cast<size_t>(selected_idx)) =
std::move(child_column);
+ }
+ return Status::OK();
+}
+
+Status OrcReader::get_block(Block* file_block, size_t* rows, bool* eof) {
+ if (_state == nullptr || !_state->row_reader_created || _state->batch ==
nullptr) {
+ return Status::Uninitialized("OrcReader is not open");
+ }
+ *rows = 0;
+ if (_eof) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ bool has_next = false;
+ try {
+ has_next = _state->row_reader->next(*_state->batch);
+ } catch (const std::exception& e) {
+ return Status::InternalError("Failed to read ORC batch: {}", e.what());
+ }
+ if (!has_next) {
+ _eof = true;
+ *eof = true;
+ return Status::OK();
+ }
+
+ const auto batch_rows = static_cast<size_t>(_state->batch->numElements);
+ auto* struct_batch =
dynamic_cast<::orc::StructVectorBatch*>(_state->batch.get());
+ if (struct_batch == nullptr) {
+ return Status::InternalError("New ORC reader expects struct row
batch");
+ }
+
+ for (size_t idx = 0; idx < _state->read_columns.size(); ++idx) {
+ const auto file_column_id = _state->read_columns[idx];
+ const auto position_it =
_request->column_positions.find(file_column_id);
+ DORIS_CHECK(position_it != _request->column_positions.end());
+ const auto block_position = position_it->second;
+ DORIS_CHECK(block_position < file_block->columns());
+ const auto* type =
_state->root_type->getSubtype(static_cast<uint64_t>(file_column_id));
+ DORIS_CHECK(type != nullptr);
+ const auto batch_index_it =
_state->column_to_selected_batch_index.find(file_column_id);
+ DORIS_CHECK(batch_index_it !=
_state->column_to_selected_batch_index.end());
+ const size_t selected_batch_idx = batch_index_it->second;
+ DORIS_CHECK(selected_batch_idx < struct_batch->fields.size());
+ const auto* selected_type =
_state->selected_type->getSubtype(selected_batch_idx);
+ DORIS_CHECK(selected_type != nullptr);
+ auto column =
file_block->get_by_position(block_position).column->assume_mutable();
+ RETURN_IF_ERROR(_decode_column(*type, *selected_type,
+
*struct_batch->fields[selected_batch_idx], column,
+ batch_rows));
+ file_block->replace_by_position(block_position, std::move(column));
+ }
Review Comment:
`get_block()` appends decoded values into the existing `Block` columns but
never clears/reset them at the beginning of the call (and also returns early on
EOF without clearing). If callers reuse the same `Block` instance across calls
(which is common), this can accumulate rows across batches and can also leave
stale rows in the `Block` when `rows=0`/`eof=true` (e.g., tests that assert
`block.rows() == 0` on EOF). A concrete fix is to clear/resize the target
columns (or the whole block's column data) at the start of `get_block()`, and
ensure columns are empty when returning `rows==0`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]