This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0d9714b179 [Fix](multi catalog)Support read hive1.x orc file. (#16677) 0d9714b179 is described below commit 0d9714b1797f9741999898851f3c3e722d42da4b Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Tue Feb 14 14:32:27 2023 +0800 [Fix](multi catalog)Support read hive1.x orc file. (#16677) Hive 1.x may write orc file with internal column name (_col0, _col1, _col2...). This will cause query result be NULL because column name in orc file doesn't match with column name in Doris table schema. This pr is to support query Hive orc files with internal column names. For now, we haven't see any problem in Parquet file, will send new pr to fix parquet if any problem show up in the future. --- be/src/vec/exec/format/orc/vorc_reader.cpp | 21 +++++++++++++++- be/src/vec/exec/format/orc/vorc_reader.h | 6 +++++ .../planner/external/ExternalFileScanNode.java | 28 ++++++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 2 ++ 4 files changed, 56 insertions(+), 1 deletion(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index e5e43906d1..cf1d308242 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -81,6 +81,7 @@ OrcReader::OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params _range_size(range.size), _ctz(ctz), _column_names(column_names), + _is_hive(params.__isset.slot_name_to_schema_pos), _io_ctx(io_ctx) { TimezoneUtils::find_cctz_time_zone(ctz, _time_zone); _init_profile(); @@ -94,6 +95,7 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r _scan_range(range), _ctz(ctz), _column_names(column_names), + _is_hive(params.__isset.slot_name_to_schema_pos), _file_system(nullptr), _io_ctx(io_ctx) {} @@ -193,7 +195,13 @@ Status OrcReader::init_reader( auto& selected_type = _row_reader->getSelectedType(); _col_orc_type.resize(selected_type.getSubtypeCount()); for (int i = 0; i < selected_type.getSubtypeCount(); ++i) { - _colname_to_idx[_get_field_name_lower_case(&selected_type, i)] = i; + auto name = _get_field_name_lower_case(&selected_type, i); + // For hive engine, translate the column name in orc file to schema column name. + // This is for Hive 1.x which use internal column name _col0, _col1... + if (_is_hive) { + name = _file_col_to_schema_col[name]; + } + _colname_to_idx[name] = i; _col_orc_type[i] = selected_type.getSubtype(i); } return Status::OK(); @@ -253,6 +261,12 @@ Status OrcReader::_init_read_columns() { orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&root_type, i)); } for (auto& col_name : _column_names) { + if (_is_hive) { + auto iter = _scan_params.slot_name_to_schema_pos.find(col_name); + DCHECK(iter != _scan_params.slot_name_to_schema_pos.end()); + int pos = iter->second; + orc_cols_lower_case[pos] = iter->first; + } auto iter = std::find(orc_cols_lower_case.begin(), orc_cols_lower_case.end(), col_name); if (iter == orc_cols_lower_case.end()) { _missing_cols.emplace_back(col_name); @@ -260,6 +274,11 @@ Status OrcReader::_init_read_columns() { int pos = std::distance(orc_cols_lower_case.begin(), iter); _read_cols.emplace_back(orc_cols[pos]); _read_cols_lower_case.emplace_back(col_name); + // For hive engine, store the orc column name to schema column name map. + // This is for Hive 1.x orc file with internal column name _col0, _col1... + if (_is_hive) { + _file_col_to_schema_col[orc_cols[pos]] = col_name; + } } } return Status::OK(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 36e66b4295..2b07d48956 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -270,6 +270,12 @@ private: std::list<std::string> _read_cols_lower_case; std::list<std::string> _missing_cols; std::unordered_map<std::string, int> _colname_to_idx; + // Column name in Orc file to column name to schema. + // This is used for Hive 1.x which use internal column name in Orc file. + // _col0, _col1... + std::unordered_map<std::string, std::string> _file_col_to_schema_col; + // Flag for hive engine. True if the external table engine is Hive. + bool _is_hive = false; std::vector<const orc::Type*> _col_orc_type; ORCFileInputStream* _file_reader = nullptr; Statistics _statistics; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 745da3d63e..0ee81dd634 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -401,6 +401,12 @@ public class ExternalFileScanNode extends ExternalScanNode { createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); this.totalFileSize += scanProvider.getInputFileSize(); + TableIf table = desc.getTable(); + if (table instanceof HMSExternalTable) { + if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + genSlotToSchemaIdMap(context); + } + } if (scanProvider instanceof HiveScanProvider) { this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); @@ -420,6 +426,12 @@ public class ExternalFileScanNode extends ExternalScanNode { createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); this.totalFileSize += scanProvider.getInputFileSize(); + TableIf table = desc.getTable(); + if (table instanceof HMSExternalTable) { + if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + genSlotToSchemaIdMap(context); + } + } if (scanProvider instanceof HiveScanProvider) { this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); @@ -634,6 +646,22 @@ public class ExternalFileScanNode extends ExternalScanNode { scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations); } + private void genSlotToSchemaIdMap(ParamCreateContext context) { + List<Column> baseSchema = desc.getTable().getBaseSchema(); + Map<String, Integer> columnNameToPosition = Maps.newHashMap(); + for (SlotDescriptor slot : desc.getSlots()) { + int idx = 0; + for (Column col : baseSchema) { + if (col.getName().equals(slot.getColumn().getName())) { + columnNameToPosition.put(col.getName(), idx); + break; + } + idx += 1; + } + } + context.params.setSlotNameToSchemaPos(columnNameToPosition); + } + @Override public int getNumInstances() { return scanRangeLocations.size(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1eb2372977..6eb6a657d3 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -325,6 +325,8 @@ struct TFileScanRangeParams { 17: optional TTableFormatFileDesc table_format_params // For csv query task, same the column index in file, order by dest_tuple 18: optional list<i32> column_idxs + // Map of slot to its position in table schema. Only for Hive external table. + 19: optional map<string, i32> slot_name_to_schema_pos } struct TFileRangeDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org