This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c2ff940947 [refactor](parquet)change decimal type export as
fixed-len-byte on parquet write (#22792)
c2ff940947 is described below
commit c2ff940947d27e14dcfaad915b39a982512068e9
Author: zhangstar333 <[email protected]>
AuthorDate: Tue Aug 15 13:17:50 2023 +0800
[refactor](parquet)change decimal type export as fixed-len-byte on parquet
write (#22792)
before the parquet write export decimal as byte-binary,
but can't be import those fied to Hive.
Now, change to export decimal as fixed-len-byte-array in order to import
hive directly.
---
be/src/vec/runtime/vparquet_writer.cpp | 167 ++++++++++++++++-----
be/src/vec/runtime/vparquet_writer.h | 3 +-
.../org/apache/doris/analysis/OutFileClause.java | 38 +++--
3 files changed, 156 insertions(+), 52 deletions(-)
diff --git a/be/src/vec/runtime/vparquet_writer.cpp
b/be/src/vec/runtime/vparquet_writer.cpp
index 13fd6d8b85..230abce0de 100644
--- a/be/src/vec/runtime/vparquet_writer.cpp
+++ b/be/src/vec/runtime/vparquet_writer.cpp
@@ -34,6 +34,7 @@
#include <string>
#include "common/status.h"
+#include "gutil/endian.h"
#include "io/fs/file_writer.h"
#include "olap/olap_common.h"
#include "runtime/decimalv2_value.h"
@@ -180,8 +181,31 @@ void
ParquetBuildHelper::build_schema_data_type(parquet::Type::type& parquet_dat
void ParquetBuildHelper::build_schema_data_logical_type(
std::shared_ptr<const parquet::LogicalType>&
parquet_data_logical_type_ptr,
- const TParquetDataLogicalType::type& column_data_logical_type) {
+ const TParquetDataLogicalType::type& column_data_logical_type, int*
primitive_length,
+ const TypeDescriptor& type_desc) {
switch (column_data_logical_type) {
+ case TParquetDataLogicalType::DECIMAL: {
+ DCHECK(type_desc.precision != -1 && type_desc.scale != -1)
+ << "precision and scale: " << type_desc.precision << " " <<
type_desc.scale;
+ if (type_desc.type == TYPE_DECIMAL32) {
+ *primitive_length = 4;
+ } else if (type_desc.type == TYPE_DECIMAL64) {
+ *primitive_length = 8;
+ } else if (type_desc.type == TYPE_DECIMAL128I) {
+ *primitive_length = 16;
+ } else {
+ throw parquet::ParquetException(
+ "the logical decimal now only support in decimalv3, maybe
error of " +
+ type_desc.debug_string());
+ }
+ parquet_data_logical_type_ptr =
+ parquet::LogicalType::Decimal(type_desc.precision,
type_desc.scale);
+ break;
+ }
+ case TParquetDataLogicalType::STRING: {
+ parquet_data_logical_type_ptr = parquet::LogicalType::String();
+ break;
+ }
case TParquetDataLogicalType::DATE: {
parquet_data_logical_type_ptr = parquet::LogicalType::Date();
break;
@@ -290,19 +314,22 @@ Status VParquetWriterWrapper::parse_properties() {
Status VParquetWriterWrapper::parse_schema() {
parquet::schema::NodeVector fields;
parquet::Repetition::type parquet_repetition_type;
- parquet::Type::type parquet_data_type;
+ parquet::Type::type parquet_physical_type;
std::shared_ptr<const parquet::LogicalType> parquet_data_logical_type;
+ int primitive_length = -1;
for (int idx = 0; idx < _parquet_schemas.size(); ++idx) {
+ primitive_length = -1;
ParquetBuildHelper::build_schema_repetition_type(
parquet_repetition_type,
_parquet_schemas[idx].schema_repetition_type);
- ParquetBuildHelper::build_schema_data_type(parquet_data_type,
+ ParquetBuildHelper::build_schema_data_type(parquet_physical_type,
_parquet_schemas[idx].schema_data_type);
ParquetBuildHelper::build_schema_data_logical_type(
- parquet_data_logical_type,
_parquet_schemas[idx].schema_data_logical_type);
+ parquet_data_logical_type,
_parquet_schemas[idx].schema_data_logical_type,
+ &primitive_length, _output_vexpr_ctxs[idx]->root()->type());
try {
fields.push_back(parquet::schema::PrimitiveNode::Make(
_parquet_schemas[idx].schema_column_name,
parquet_repetition_type,
- parquet_data_logical_type, parquet_data_type));
+ parquet_data_logical_type, parquet_physical_type,
primitive_length));
} catch (const parquet::ParquetException& e) {
LOG(WARNING) << "parquet writer parse schema error: " << e.what();
return Status::InternalError("parquet writer parse schema error:
{}", e.what());
@@ -335,37 +362,6 @@ Status VParquetWriterWrapper::parse_schema() {
RETURN_WRONG_TYPE
\
}
-#define DISPATCH_PARQUET_DECIMAL_WRITER(DECIMAL_TYPE)
\
- parquet::RowGroupWriter* rgWriter = get_rg_writer();
\
- parquet::ByteArrayWriter* col_writer =
\
- static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
\
- parquet::ByteArray value;
\
- auto decimal_type =
\
-
check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get());
\
- DCHECK(decimal_type);
\
- if (null_map != nullptr) {
\
- auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data(); \
- for (size_t row_id = 0; row_id < sz; row_id++) {
\
- if (null_data[row_id] != 0) {
\
- single_def_level = 0;
\
- col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
\
- single_def_level = 1;
\
- } else {
\
- auto s = decimal_type->to_string(*col, row_id);
\
- value.ptr = reinterpret_cast<const uint8_t*>(s.data());
\
- value.len = s.size();
\
- col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
\
- }
\
- }
\
- } else {
\
- for (size_t row_id = 0; row_id < sz; row_id++) {
\
- auto s = decimal_type->to_string(*col, row_id);
\
- value.ptr = reinterpret_cast<const uint8_t*>(s.data());
\
- value.len = s.size();
\
- col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr,
nullptr, &value); \
- }
\
- }
-
#define DISPATCH_PARQUET_COMPLEX_WRITER(COLUMN_TYPE)
\
parquet::RowGroupWriter* rgWriter = get_rg_writer();
\
parquet::ByteArrayWriter* col_writer =
\
@@ -791,15 +787,108 @@ Status VParquetWriterWrapper::write(const Block& block) {
break;
}
case TYPE_DECIMAL32: {
- DISPATCH_PARQUET_DECIMAL_WRITER(Decimal32)
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::FixedLenByteArrayWriter* col_writer =
+
static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i));
+ parquet::FixedLenByteArray value;
+ auto decimal_type =
check_and_get_data_type<DataTypeDecimal<Decimal32>>(
+ remove_nullable(type).get());
+ DCHECK(decimal_type);
+ if (null_map != nullptr) {
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data();
+ const auto& data_column = assert_cast<const
ColumnDecimal32&>(*col);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ if (null_data[row_id] != 0) {
+ single_def_level = 0;
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ single_def_level = 1;
+ } else {
+ auto data = data_column.get_element(row_id);
+ auto big_endian = bswap_32(data);
+ value.ptr = reinterpret_cast<const
uint8_t*>(&big_endian);
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ }
+ }
+ } else {
+ const auto& data_column = assert_cast<const
ColumnDecimal32&>(*col);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ auto data = data_column.get_element(row_id);
+ auto big_endian = bswap_32(data);
+ value.ptr = reinterpret_cast<const
uint8_t*>(&big_endian);
+ col_writer->WriteBatch(1, nullable ? &single_def_level
: nullptr, nullptr,
+ &value);
+ }
+ }
break;
}
case TYPE_DECIMAL64: {
- DISPATCH_PARQUET_DECIMAL_WRITER(Decimal64)
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::FixedLenByteArrayWriter* col_writer =
+
static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i));
+ parquet::FixedLenByteArray value;
+ auto decimal_type =
check_and_get_data_type<DataTypeDecimal<Decimal64>>(
+ remove_nullable(type).get());
+ DCHECK(decimal_type);
+ if (null_map != nullptr) {
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data();
+ const auto& data_column = assert_cast<const
ColumnDecimal64&>(*col);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ if (null_data[row_id] != 0) {
+ single_def_level = 0;
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ single_def_level = 1;
+ } else {
+ auto data = data_column.get_element(row_id);
+ auto big_endian = bswap_64(data);
+ value.ptr = reinterpret_cast<const
uint8_t*>(&big_endian);
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ }
+ }
+ } else {
+ const auto& data_column = assert_cast<const
ColumnDecimal64&>(*col);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ auto data = data_column.get_element(row_id);
+ auto big_endian = bswap_64(data);
+ value.ptr = reinterpret_cast<const
uint8_t*>(&big_endian);
+ col_writer->WriteBatch(1, nullable ? &single_def_level
: nullptr, nullptr,
+ &value);
+ }
+ }
break;
}
case TYPE_DECIMAL128I: {
- DISPATCH_PARQUET_DECIMAL_WRITER(Decimal128I)
+ parquet::RowGroupWriter* rgWriter = get_rg_writer();
+ parquet::FixedLenByteArrayWriter* col_writer =
+
static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i));
+ parquet::FixedLenByteArray value;
+ auto decimal_type =
check_and_get_data_type<DataTypeDecimal<Decimal128I>>(
+ remove_nullable(type).get());
+ DCHECK(decimal_type);
+ if (null_map != nullptr) {
+ auto& null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data();
+ const auto& data_column = assert_cast<const
ColumnDecimal128I&>(*col);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ if (null_data[row_id] != 0) {
+ single_def_level = 0;
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ single_def_level = 1;
+ } else {
+ auto data = data_column.get_element(row_id);
+ auto big_endian = gbswap_128(data);
+ value.ptr = reinterpret_cast<const
uint8_t*>(&big_endian);
+ col_writer->WriteBatch(1, &single_def_level,
nullptr, &value);
+ }
+ }
+ } else {
+ const auto& data_column = assert_cast<const
ColumnDecimal128I&>(*col);
+ for (size_t row_id = 0; row_id < sz; row_id++) {
+ auto data = data_column.get_element(row_id);
+ auto big_endian = gbswap_128(data);
+ value.ptr = reinterpret_cast<const
uint8_t*>(&big_endian);
+ col_writer->WriteBatch(1, nullable ? &single_def_level
: nullptr, nullptr,
+ &value);
+ }
+ }
break;
}
default: {
diff --git a/be/src/vec/runtime/vparquet_writer.h
b/be/src/vec/runtime/vparquet_writer.h
index 6e07aa0e44..a79ab6ebc4 100644
--- a/be/src/vec/runtime/vparquet_writer.h
+++ b/be/src/vec/runtime/vparquet_writer.h
@@ -86,7 +86,8 @@ public:
const TParquetVersion::type& parquet_version);
static void build_schema_data_logical_type(
std::shared_ptr<const parquet::LogicalType>&
parquet_data_logical_type_ptr,
- const TParquetDataLogicalType::type& column_data_logical_type);
+ const TParquetDataLogicalType::type& column_data_logical_type,
int* primitive_length,
+ const TypeDescriptor& type_desc);
};
class VFileWriterWrapper {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 85c40d79bc..24ed977a7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -99,6 +99,7 @@ public class OutFileClause {
PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array",
TParquetDataType.FIXED_LEN_BYTE_ARRAY);
+ PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("decimal",
TParquetDataLogicalType.DECIMAL);
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("date",
TParquetDataLogicalType.DATE);
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("datetime",
TParquetDataLogicalType.TIMESTAMP);
// TODO(ftw): add other logical type
@@ -145,7 +146,6 @@ public class OutFileClause {
private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB
private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L;
// 2GB
-
private String filePath;
private String format;
private Map<String, String> properties;
@@ -358,7 +358,7 @@ public class OutFileClause {
case STRING:
if
(!schema.second.equals(resultType.getPrimitiveType().toString().toLowerCase()))
{
throw new AnalysisException("project field type is " +
resultType.getPrimitiveType().toString()
- + ", should use " +
resultType.getPrimitiveType().toString() + ","
+ + ", should use " +
resultType.getPrimitiveType().toString() + ","
+ " but the type of column " + i + " is " +
schema.second);
}
break;
@@ -454,13 +454,20 @@ public class OutFileClause {
+ " but the definition type of column " + i +
" is " + type);
}
break;
- case CHAR:
- case VARCHAR:
- case STRING:
case DECIMAL32:
case DECIMAL64:
- case DECIMAL128:
+ case DECIMAL128: {
+ if
(!PARQUET_DATA_TYPE_MAP.get("fixed_len_byte_array").equals(type)) {
+ throw new AnalysisException("project field type is
DECIMAL"
+ + ", should use fixed_len_byte_array, but the
definition type of column "
+ + i + " is " + type);
+ }
+ break;
+ }
case DECIMALV2:
+ case CHAR:
+ case VARCHAR:
+ case STRING:
case DATETIMEV2:
case DATEV2:
case LARGEINT:
@@ -520,13 +527,16 @@ public class OutFileClause {
case DOUBLE:
parquetSchema.schema_data_type =
PARQUET_DATA_TYPE_MAP.get("double");
break;
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128: {
+ parquetSchema.schema_data_type =
PARQUET_DATA_TYPE_MAP.get("fixed_len_byte_array");
+ break;
+ }
+ case DECIMALV2:
case CHAR:
case VARCHAR:
case STRING:
- case DECIMALV2:
- case DECIMAL32:
- case DECIMAL64:
- case DECIMAL128:
case DATETIMEV2:
case DATEV2:
case LARGEINT:
@@ -545,6 +555,12 @@ public class OutFileClause {
}
switch (expr.getType().getPrimitiveType()) {
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128: {
+ parquetSchema.schema_data_logical_type =
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("decimal");
+ break;
+ }
case DATE:
parquetSchema.schema_data_logical_type =
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("date");
break;
@@ -884,5 +900,3 @@ public class OutFileClause {
return sinkOptions;
}
}
-
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]