This is an automated email from the ASF dual-hosted git repository.
xuyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0732eb54bc [feature](struct-type) support csv format stream load for
struct type (#17143)
0732eb54bc is described below
commit 0732eb54bc48c2375877ff13d7c5a926bb47c27a
Author: xy720 <[email protected]>
AuthorDate: Wed Mar 1 15:48:48 2023 +0800
[feature](struct-type) support csv format stream load for struct type
(#17143)
Refactor from_string method in data_type_struct.cpp to support csv format
stream load for struct type.
---
be/src/vec/data_types/data_type_map.cpp | 4 +-
be/src/vec/data_types/data_type_struct.cpp | 188 +++++++++++++++++----
be/src/vec/exprs/vexpr.cpp | 7 +
.../data/load_p0/stream_load/struct_malformat.csv | 5 +
.../data/load_p0/stream_load/struct_normal.csv | 13 ++
.../data/load_p0/stream_load/test_stream_load.out | 22 +++
.../load_p0/stream_load/test_stream_load.groovy | 104 ++++++++++++
7 files changed, 305 insertions(+), 38 deletions(-)
diff --git a/be/src/vec/data_types/data_type_map.cpp
b/be/src/vec/data_types/data_type_map.cpp
index 4895a48c4c..5c362f5b90 100644
--- a/be/src/vec/data_types/data_type_map.cpp
+++ b/be/src/vec/data_types/data_type_map.cpp
@@ -156,11 +156,11 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn*
column) const {
auto* map_column = assert_cast<ColumnMap*>(column);
if (*rb.position() != '{') {
- return Status::InvalidArgument("map does not start with '{' character,
found '{}'",
+ return Status::InvalidArgument("map does not start with '{}'
character, found '{}'", "{",
*rb.position());
}
if (*(rb.end() - 1) != '}') {
- return Status::InvalidArgument("map does not end with '}' character,
found '{}'",
+ return Status::InvalidArgument("map does not end with '{}' character,
found '{}'", "}",
*(rb.end() - 1));
}
diff --git a/be/src/vec/data_types/data_type_struct.cpp
b/be/src/vec/data_types/data_type_struct.cpp
index 886ac5342c..66954687fe 100644
--- a/be/src/vec/data_types/data_type_struct.cpp
+++ b/be/src/vec/data_types/data_type_struct.cpp
@@ -55,8 +55,6 @@ DataTypeStruct::DataTypeStruct(const DataTypes& elems_, const
Strings& names_)
}
Status st = check_tuple_names(names);
- //if (!st.ok()) {
- //}
}
std::string DataTypeStruct::do_get_name() const {
@@ -68,11 +66,6 @@ std::string DataTypeStruct::do_get_name() const {
if (i != 0) {
s << ", ";
}
-
- // if (have_explicit_names) {
- // s << back_quote_if_need(names[i]) << ' ';
- // }
-
s << elems[i]->get_name();
}
s << ")";
@@ -80,16 +73,84 @@ std::string DataTypeStruct::do_get_name() const {
return s.str();
}
+bool next_slot_from_string(ReadBuffer& rb, StringRef& output, bool& is_name,
bool& has_quota) {
+ StringRef element(rb.position(), 0);
+ has_quota = false;
+ is_name = false;
+ if (rb.eof()) {
+ return false;
+ }
+
+ // ltrim
+ while (!rb.eof() && isspace(*rb.position())) {
+ ++rb.position();
+ element.data = rb.position();
+ }
+
+ // parse string
+ if (*rb.position() == '"' || *rb.position() == '\'') {
+ const char str_sep = *rb.position();
+ size_t str_len = 1;
+ // search until next '"' or '\''
+ while (str_len < rb.count() && *(rb.position() + str_len) != str_sep) {
+ ++str_len;
+ }
+ // invalid string
+ if (str_len >= rb.count()) {
+ rb.position() = rb.end();
+ return false;
+ }
+ has_quota = true;
+ rb.position() += str_len + 1;
+ element.size += str_len + 1;
+ }
+
+ // parse element until separator ':' or ',' or end '}'
+ while (!rb.eof() && (*rb.position() != ':') && (*rb.position() != ',') &&
+ (rb.count() != 1 || *rb.position() != '}')) {
+ if (has_quota && !isspace(*rb.position())) {
+ return false;
+ }
+ ++rb.position();
+ ++element.size;
+ }
+ // invalid element
+ if (rb.eof()) {
+ return false;
+ }
+
+ if (*rb.position() == ':') {
+ is_name = true;
+ }
+
+ // adjust read buffer position to first char of next element
+ ++rb.position();
+
+ // rtrim
+ while (element.size > 0 && isspace(element.data[element.size - 1])) {
+ --element.size;
+ }
+
+ // trim '"' and '\'' for string
+ if (element.size >= 2 && (element.data[0] == '"' || element.data[0] ==
'\'') &&
+ element.data[0] == element.data[element.size - 1]) {
+ ++element.data;
+ element.size -= 2;
+ }
+ output = element;
+ return true;
+}
+
Status DataTypeStruct::from_string(ReadBuffer& rb, IColumn* column) const {
DCHECK(!rb.eof());
auto* struct_column = assert_cast<ColumnStruct*>(column);
if (*rb.position() != '{') {
- return Status::InvalidArgument("Struct does not start with '{'
character, found '{}'",
+ return Status::InvalidArgument("Struct does not start with '{}'
character, found '{}'", "{",
*rb.position());
}
- if (rb.count() < 2 || *(rb.end() - 1) != '}') {
- return Status::InvalidArgument("Struct does not end with '}'
character, found '{}'",
+ if (*(rb.end() - 1) != '}') {
+ return Status::InvalidArgument("Struct does not end with '{}'
character, found '{}'", "}",
*(rb.end() - 1));
}
@@ -99,43 +160,98 @@ Status DataTypeStruct::from_string(ReadBuffer& rb,
IColumn* column) const {
}
++rb.position();
+
+ bool is_explicit_names = false;
+ std::vector<std::string> field_names;
std::vector<ReadBuffer> field_rbs;
- field_rbs.reserve(elems.size());
+ std::vector<size_t> field_pos;
- // here get the value "jack" and 20 from {"name":"jack","age":20}
while (!rb.eof()) {
- size_t field_len = 0;
- auto start = rb.position();
- while (!rb.eof() && *start != ',' && *start != '}') {
- field_len++;
- start++;
+ StringRef slot(rb.position(), rb.count());
+ bool has_quota = false;
+ bool is_name = false;
+ if (!next_slot_from_string(rb, slot, is_name, has_quota)) {
+ return Status::InvalidArgument("Cannot read struct field from text
'{}'",
+ slot.to_string());
}
- if (field_len >= rb.count()) {
- return Status::InvalidArgument("Invalid Length");
- }
- ReadBuffer field_rb(rb.position(), field_len);
-
- size_t len = 0;
- auto start_rb = field_rb.position();
- while (!field_rb.eof() && *start_rb != ':') {
- len++;
- start_rb++;
- }
- ReadBuffer field(field_rb.position() + len + 1, field_rb.count() - len
- 1);
+ if (is_name) {
+ std::string name = slot.to_string();
+ if (!next_slot_from_string(rb, slot, is_name, has_quota)) {
+ return Status::InvalidArgument("Cannot read struct field from
text '{}'",
+ slot.to_string());
+ }
+ ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size);
+ field_names.push_back(name);
+ field_rbs.push_back(field_rb);
- if (field.count() >= 2 && ((*field.position() == '"' && *(field.end()
- 1) == '"') ||
- (*field.position() == '\'' && *(field.end()
- 1) == '\''))) {
- ReadBuffer field_no_quote(field.position() + 1, field.count() - 2);
- field_rbs.push_back(field_no_quote);
+ if (!is_explicit_names) {
+ is_explicit_names = true;
+ }
} else {
- field_rbs.push_back(field);
+ ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size);
+ field_rbs.push_back(field_rb);
}
+ }
- rb.position() += field_len + 1;
+ // TODO: should we support insert default field value when actual field
number is less than
+ // schema field number?
+ if (field_rbs.size() != elems.size()) {
+ std::string cmp_str = field_rbs.size() > elems.size() ? "more" :
"less";
+ return Status::InvalidArgument(
+ "Actual struct field number {} is {} than schema field number
{}.",
+ field_rbs.size(), cmp_str, elems.size());
+ }
+
+ if (is_explicit_names) {
+ if (field_names.size() != field_rbs.size()) {
+ return Status::InvalidArgument(
+ "Struct field name number {} is not equal to field number
{}.",
+ field_names.size(), field_rbs.size());
+ }
+ std::unordered_set<std::string> name_set;
+ for (size_t i = 0; i < field_names.size(); i++) {
+ // check duplicate fields
+ auto ret = name_set.insert(field_names[i]);
+ if (!ret.second) {
+ return Status::InvalidArgument("Struct field name {} is
duplicate with others.",
+ field_names[i]);
+ }
+ // check name valid
+ auto idx = try_get_position_by_name(field_names[i]);
+ if (idx == std::nullopt) {
+ return Status::InvalidArgument("Cannot find struct field name
{} in schema.",
+ field_names[i]);
+ }
+ field_pos.push_back(idx.value());
+ }
+ } else {
+ for (size_t i = 0; i < field_rbs.size(); i++) {
+ field_pos.push_back(i);
+ }
}
for (size_t idx = 0; idx < elems.size(); idx++) {
- elems[idx]->from_string(field_rbs[idx],
&struct_column->get_column(idx));
+ auto field_rb = field_rbs[field_pos[idx]];
+ // handle empty element
+ if (field_rb.count() == 0) {
+ struct_column->get_column(idx).insert_default();
+ continue;
+ }
+ // handle null element
+ if (field_rb.count() == 4 && strncmp(field_rb.position(), "null", 4)
== 0) {
+ auto& nested_null_col =
+
reinterpret_cast<ColumnNullable&>(struct_column->get_column(idx));
+ nested_null_col.insert_null_elements(1);
+ continue;
+ }
+ auto st = elems[idx]->from_string(field_rb,
&struct_column->get_column(idx));
+ if (!st.ok()) {
+ // we should do column revert if error
+ for (size_t j = 0; j < idx; j++) {
+ struct_column->get_column(j).pop_back(1);
+ }
+ return st;
+ }
}
return Status::OK();
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index f90993884d..4e71afbf31 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -381,6 +381,13 @@ FunctionContext::TypeDesc
VExpr::column_type_to_type_desc(const TypeDescriptor&
out.children.push_back(VExpr::column_type_to_type_desc(t));
}
break;
+ case TYPE_STRUCT:
+ CHECK(type.children.size() >= 1);
+ out.type = FunctionContext::TYPE_STRUCT;
+ for (const auto& t : type.children) {
+ out.children.push_back(VExpr::column_type_to_type_desc(t));
+ }
+ break;
case TYPE_STRING:
out.type = FunctionContext::TYPE_STRING;
out.len = type.len;
diff --git a/regression-test/data/load_p0/stream_load/struct_malformat.csv
b/regression-test/data/load_p0/stream_load/struct_malformat.csv
new file mode 100644
index 0000000000..8af8629e9e
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/struct_malformat.csv
@@ -0,0 +1,5 @@
+1|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26",
"f7":"2023-02-26 17:58", "f8":1.01, "f9":3.1415926, "f10":1.1}
+2|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26",
"f7":null, "f8":null, "f9":null, "f10":1.1}
+3|\N
+4|"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26",
"f7":null, "f8":null, "f9":null, "f10":1.1
+5|{f1:1, f2:100, f3:100000, f4:'a', f5:"doris", f6:"2023-02-26",
f7:"2023-02-26 17:58", f8:1.01, f9:3.1415926, f10:1.1
diff --git a/regression-test/data/load_p0/stream_load/struct_normal.csv
b/regression-test/data/load_p0/stream_load/struct_normal.csv
new file mode 100644
index 0000000000..fe82889afd
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/struct_normal.csv
@@ -0,0 +1,13 @@
+1|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26",
"f7":"2023-02-26 17:58", "f8":1.01, "f9":3.1415926, "f10":1.1}
+2|{"f1":1, "f2":100, "f3":100000, "f4":'a', "f5":"doris", "f6":"2023-02-26",
"f7":null, "f8":null, "f9":null, "f10":1.1}
+3|{'f1':1, 'f2':100, 'f3':100000, 'f4':'a', 'f5':"doris", 'f6':"2023-02-26",
'f7':null, 'f8':null, 'f9':null, 'f10':1.1}
+4|{f1:1, f2:100, f3:100000, f4:'a', f5:"doris", f6:"2023-02-26",
f7:"2023-02-26 17:58", f8:1.01, f9:3.1415926, f10:1.1}
+5|{f1: 1, f2: 100, f3: 100000, f4: a, f5: doris, f6: 2023-02-26, f7:
"2023-02-26 17:58", f8: 1.01, f9: 3.1415926, f10: 1.1}
+6|{"f10":1.1, "f9":3.1415926, "f8":1.01, "f7":"2023-02-26 17:58",
"f6":"2023-02-26", "f5":"doris", "f4":'a', "f3":100000, "f2":100, "f1":1}
+7|{f10:1.1, f9:3.1415926, f8:1.01, f7:"2023-02-26 17:58", f6:2023-02-26,
f5:doris, f4:a, f3:100000, f2:100, f1:1}
+8|{f10:1.1, f9:3.1415926, f8:1.01, f7:"2023-02-26 17:58", f6:2023-02-26,
f5:doris, f4:null, f3:null, f2:null, f1:1}
+9|{"f1":null, "f2":null, "f3":null, "f4":null, "f5":null, "f6":null,
"f7":null, "f8":null, "f9":null, "f10":null}
+10|{1, 100, 100000, 'a', "doris", "2023-02-26", "2023-02-26 17:58", 1.01,
3.1415926, 1.1}
+11|{1, 100, 100000, 'a', "doris", "2023-02-26", null, null, null, 1.1}
+12|{null, null, null, null, null, null, null, null, null, null}
+13|\N
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load.out
b/regression-test/data/load_p0/stream_load/test_stream_load.out
index 72bc763d81..3e4ccd3fc5 100644
--- a/regression-test/data/load_p0/stream_load/test_stream_load.out
+++ b/regression-test/data/load_p0/stream_load/test_stream_load.out
@@ -69,6 +69,28 @@
7 [1, 2, 3, 4, 5] \N \N \N \N \N \N \N
\N \N
8 [1, 2, 3, 4, 5] \N \N \N \N \N [NULL] \N
[NULL] \N
+-- !all111 --
+1 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.1}
+2 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
+3 \N
+4 \N
+5 \N
+
+-- !all112 --
+1 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.1}
+2 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
+3 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
+4 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.1}
+5 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.1}
+6 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.1}
+7 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.1}
+8 {1, NULL, NULL, NULL, 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.1}
+9 {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
+10 {1, 100, 100000, 'a', 'doris', 2023-02-26, 2023-02-26 17:58:00, 1.01,
3.1415926, 1.1}
+11 {1, 100, 100000, 'a', 'doris', 2023-02-26, NULL, NULL, NULL, 1.1}
+12 {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
+13 \N
+
-- !sql1 --
-2 -50 1 \N 44
2 -51 1 2 \N
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index a04d25622e..351bab3afd 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -186,12 +186,14 @@ suite("test_stream_load", "p0") {
def tableName6 = "test_unique_key"
def tableName7 = "test_unique_key_with_delete"
def tableName8 = "test_array"
+ def tableName10 = "test_struct"
sql """ DROP TABLE IF EXISTS ${tableName3} """
sql """ DROP TABLE IF EXISTS ${tableName4} """
sql """ DROP TABLE IF EXISTS ${tableName5} """
sql """ DROP TABLE IF EXISTS ${tableName6} """
sql """ DROP TABLE IF EXISTS ${tableName7} """
sql """ DROP TABLE IF EXISTS ${tableName8} """
+ sql """ DROP TABLE IF EXISTS ${tableName10} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName3} (
`k1` int(11) NULL,
@@ -287,6 +289,28 @@ suite("test_stream_load", "p0") {
"replication_allocation" = "tag.location.default: 1"
);
"""
+ sql """ADMIN SET FRONTEND CONFIG ('enable_struct_type' = 'true');"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName10} (
+ `k1` INT(11) NULL COMMENT "",
+ `k2` STRUCT<
+ f1:SMALLINT,
+ f2:INT(11),
+ f3:BIGINT,
+ f4:CHAR,
+ f5:VARCHAR(20),
+ f6:DATE,
+ f7:DATETIME,
+ f8:FLOAT,
+ f9:DOUBLE,
+ f10:DECIMAL(20, 6)> NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
// load all columns
streamLoad {
@@ -673,6 +697,86 @@ suite("test_stream_load", "p0") {
}
sql "sync"
+ // ===== test struct stream load
+ // malformat without strictmode
+ streamLoad {
+ table "${tableName10}"
+
+ set 'column_separator', '|'
+
+ file 'struct_malformat.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(5, json.NumberTotalRows)
+ assertEquals(5, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ qt_all111 "SELECT * from ${tableName10} order by k1" // 5
+ sql """truncate table ${tableName10}"""
+ sql """sync"""
+
+ // malformat with strictmode
+ streamLoad {
+ table "${tableName10}"
+
+ set 'column_separator', '|'
+ set 'strict_mode', 'true'
+
+ file 'struct_malformat.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertEquals(5, json.NumberTotalRows)
+ assertEquals(3, json.NumberLoadedRows)
+ assertEquals(2, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+
+ // normal load
+ streamLoad {
+ table "${tableName10}"
+
+ set 'column_separator', '|'
+
+ file 'struct_normal.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(13, json.NumberTotalRows)
+ assertEquals(13, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ qt_all112 "SELECT * from ${tableName10} order by k1" // 10
+ sql """truncate table ${tableName10}"""
+ sql """sync"""
+
// test immutable partition success
def tableName9 = "test_immutable_partition"
sql """ DROP TABLE IF EXISTS ${tableName9} """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]