This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new a6a84b8ecc3 [improvement](stream load)(cherry-pick) support hll_from_base64 for stream load column mapping (#36819) a6a84b8ecc3 is described below commit a6a84b8ecc324834f79f3a61e5e587366890a06e Author: gnehil <adamlee...@gmail.com> AuthorDate: Wed Jun 26 20:12:40 2024 +0800 [improvement](stream load)(cherry-pick) support hll_from_base64 for stream load column mapping (#36819) picked from https://github.com/apache/doris/pull/35923 --- .../java/org/apache/doris/catalog/FunctionSet.java | 1 + .../org/apache/doris/planner/FileLoadScanNode.java | 4 ++- .../data/load_p0/http_stream/test_http_stream.out | 12 +++++++ .../stream_load/test_stream_load_hll_type.csv | 10 ++++++ .../load_p0/stream_load/test_stream_load_new.out | 12 +++++++ .../load_p0/http_stream/test_http_stream.groovy | 41 +++++++++++++++++++++ .../stream_load/test_stream_load_new.groovy | 42 ++++++++++++++++++++++ 7 files changed, 121 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java index b0d4c654531..2db943993dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java @@ -178,6 +178,7 @@ public class FunctionSet<T> { public static final String HLL_UNION_AGG = "hll_union_agg"; public static final String HLL_RAW_AGG = "hll_raw_agg"; public static final String HLL_CARDINALITY = "hll_cardinality"; + public static final String HLL_FROM_BASE64 = "hll_from_base64"; public static final String TO_BITMAP = "to_bitmap"; public static final String TO_BITMAP_WITH_CHECK = "to_bitmap_with_check"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java index ca0324a51d0..0d674a70517 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java @@ -280,9 +280,11 @@ public class FileLoadScanNode extends FileScanNode { } FunctionCallExpr fn = (FunctionCallExpr) expr; if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName() - .getFunction().equalsIgnoreCase("hll_empty")) { + .getFunction().equalsIgnoreCase("hll_empty") + && !fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) { throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like " + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or " + + destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_FROM_BASE64 + "(xxx) or " + destSlotDesc.getColumn().getName() + "=hll_empty()"); } expr.setType(org.apache.doris.catalog.Type.HLL); diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.out b/regression-test/data/load_p0/http_stream/test_http_stream.out index 7ce24eea095..2475ed24961 100644 --- a/regression-test/data/load_p0/http_stream/test_http_stream.out +++ b/regression-test/data/load_p0/http_stream/test_http_stream.out @@ -620,3 +620,15 @@ 1 test 2 test +-- !sql19 -- +buag 1 1 +huang 1 1 +jfin 1 1 +koga 1 1 +kon 1 1 +lofn 1 1 +lojn 1 1 +nfubg 1 1 +nhga 1 1 +nijg 1 1 + diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv new file mode 100644 index 00000000000..0b1d798782c --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv @@ -0,0 +1,10 @@ +1001,koga,AQEMYSmSmfh+mA== +1002,nijg,AQGs1RXTaA+hkQ== +1003,lojn,AQFyJr4rwn+S0A== +1004,lofn,AQFvE0bU6Pc9uw== +1005,jfin,AQEmxbO3VGItCA== +1006,kon,AQEm5d0Gw4uvZw== +1007,nhga,AQHOpocenFnBwQ== +1008,nfubg,AQFzYsFz+NIgUg== +1009,huang,AQH2slI7qAUmYA== +1010,buag,AQGBXZ3xnU79YA== \ No newline at end of file diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_new.out b/regression-test/data/load_p0/stream_load/test_stream_load_new.out index 52440d98436..f251042a9df 100644 --- a/regression-test/data/load_p0/stream_load/test_stream_load_new.out +++ b/regression-test/data/load_p0/stream_load/test_stream_load_new.out @@ -124,3 +124,15 @@ 10009 jj 10010 kk +-- !sql13 -- +buag 1 1 +huang 1 1 +jfin 1 1 +koga 1 1 +kon 1 1 +lofn 1 1 +lojn 1 1 +nfubg 1 1 +nhga 1 1 +nijg 1 1 + diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy index 781732988e5..5411224c200 100644 --- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy @@ -854,5 +854,46 @@ suite("test_http_stream", "p0") { } finally { try_sql "DROP TABLE IF EXISTS ${tableName18}" } + + // test load hll type + def tableName19 = "test_http_stream_hll_type" + + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName19} ( + type_id int, + type_name varchar(10), + pv_hash hll hll_union not null, + pv_base64 hll hll_union not null + ) + AGGREGATE KEY(type_id,type_name) + DISTRIBUTED BY HASH(type_id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${tableName19} select c1,c2,hll_hash(c1),hll_from_base64(c3) from http_stream("format"="csv", "column_separator"=",") + """ + time 10000 + file '../stream_load/test_stream_load_hll_type.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("http_stream result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + + qt_sql19 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName19} group by type_name order by type_name" + } finally { + try_sql "DROP TABLE IF EXISTS ${tableName19}" + } + } diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy index 7df57ebbd16..48c3e5f9654 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy @@ -540,5 +540,47 @@ suite("test_stream_load_new", "p0") { } finally { try_sql "DROP TABLE IF EXISTS ${tableName12}" } + + // 13. test stream load hll type + def tableName13 = "test_stream_load_hll_type" + + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName13} ( + type_id int, + type_name varchar(10), + pv_hash hll hll_union not null, + pv_base64 hll hll_union not null + ) + AGGREGATE KEY(type_id,type_name) + DISTRIBUTED BY HASH(type_id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + streamLoad { + set 'column_separator', ',' + set 'columns', 'type_id,type_name,type_id_base64,pv_hash=hll_hash(type_id),pv_base64=hll_from_base64(type_id_base64)' + table "${tableName13}" + time 10000 + file 'test_stream_load_hll_type.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql """ sync; """ + qt_sql13 "select type_name, hll_union_agg(pv_hash), hll_union_agg(pv_base64) from ${tableName13} group by type_name order by type_name" + } finally { + try_sql "DROP TABLE IF EXISTS ${tableName13}" + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org