This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a6a84b8ecc3 [improvement](stream load)(cherry-pick) support 
hll_from_base64 for stream load column mapping (#36819)
a6a84b8ecc3 is described below

commit a6a84b8ecc324834f79f3a61e5e587366890a06e
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Wed Jun 26 20:12:40 2024 +0800

    [improvement](stream load)(cherry-pick) support hll_from_base64 for stream 
load column mapping (#36819)
    
    picked from https://github.com/apache/doris/pull/35923
---
 .../java/org/apache/doris/catalog/FunctionSet.java |  1 +
 .../org/apache/doris/planner/FileLoadScanNode.java |  4 ++-
 .../data/load_p0/http_stream/test_http_stream.out  | 12 +++++++
 .../stream_load/test_stream_load_hll_type.csv      | 10 ++++++
 .../load_p0/stream_load/test_stream_load_new.out   | 12 +++++++
 .../load_p0/http_stream/test_http_stream.groovy    | 41 +++++++++++++++++++++
 .../stream_load/test_stream_load_new.groovy        | 42 ++++++++++++++++++++++
 7 files changed, 121 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
index b0d4c654531..2db943993dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
@@ -178,6 +178,7 @@ public class FunctionSet<T> {
     public static final String HLL_UNION_AGG = "hll_union_agg";
     public static final String HLL_RAW_AGG = "hll_raw_agg";
     public static final String HLL_CARDINALITY = "hll_cardinality";
+    public static final String HLL_FROM_BASE64 = "hll_from_base64";
 
     public static final String TO_BITMAP = "to_bitmap";
     public static final String TO_BITMAP_WITH_CHECK = "to_bitmap_with_check";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index ca0324a51d0..0d674a70517 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -280,9 +280,11 @@ public class FileLoadScanNode extends FileScanNode {
                 }
                 FunctionCallExpr fn = (FunctionCallExpr) expr;
                 if 
(!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && 
!fn.getFnName()
-                        .getFunction().equalsIgnoreCase("hll_empty")) {
+                        .getFunction().equalsIgnoreCase("hll_empty")
+                        && 
!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) {
                     throw new AnalysisException("HLL column must use " + 
FunctionSet.HLL_HASH + " function, like "
                         + destSlotDesc.getColumn().getName() + "=" + 
FunctionSet.HLL_HASH + "(xxx) or "
+                        + destSlotDesc.getColumn().getName() + "=" + 
FunctionSet.HLL_FROM_BASE64 + "(xxx) or "
                         + destSlotDesc.getColumn().getName() + "=hll_empty()");
                 }
                 expr.setType(org.apache.doris.catalog.Type.HLL);
diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.out 
b/regression-test/data/load_p0/http_stream/test_http_stream.out
index 7ce24eea095..2475ed24961 100644
--- a/regression-test/data/load_p0/http_stream/test_http_stream.out
+++ b/regression-test/data/load_p0/http_stream/test_http_stream.out
@@ -620,3 +620,15 @@
 1      test
 2      test
 
+-- !sql19 --
+buag   1       1
+huang  1       1
+jfin   1       1
+koga   1       1
+kon    1       1
+lofn   1       1
+lojn   1       1
+nfubg  1       1
+nhga   1       1
+nijg   1       1
+
diff --git 
a/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv 
b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv
new file mode 100644
index 00000000000..0b1d798782c
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_hll_type.csv
@@ -0,0 +1,10 @@
+1001,koga,AQEMYSmSmfh+mA==
+1002,nijg,AQGs1RXTaA+hkQ==
+1003,lojn,AQFyJr4rwn+S0A==
+1004,lofn,AQFvE0bU6Pc9uw==
+1005,jfin,AQEmxbO3VGItCA==
+1006,kon,AQEm5d0Gw4uvZw==
+1007,nhga,AQHOpocenFnBwQ==
+1008,nfubg,AQFzYsFz+NIgUg==
+1009,huang,AQH2slI7qAUmYA==
+1010,buag,AQGBXZ3xnU79YA==
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_new.out 
b/regression-test/data/load_p0/stream_load/test_stream_load_new.out
index 52440d98436..f251042a9df 100644
--- a/regression-test/data/load_p0/stream_load/test_stream_load_new.out
+++ b/regression-test/data/load_p0/stream_load/test_stream_load_new.out
@@ -124,3 +124,15 @@
 10009  jj
 10010  kk
 
+-- !sql13 --
+buag   1       1
+huang  1       1
+jfin   1       1
+koga   1       1
+kon    1       1
+lofn   1       1
+lojn   1       1
+nfubg  1       1
+nhga   1       1
+nijg   1       1
+
diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy 
b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
index 781732988e5..5411224c200 100644
--- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
+++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy
@@ -854,5 +854,46 @@ suite("test_http_stream", "p0") {
     } finally {
         try_sql "DROP TABLE IF EXISTS ${tableName18}"
     }
+
+    //  test load hll type
+    def tableName19 = "test_http_stream_hll_type"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName19} (
+            type_id int,
+            type_name varchar(10),
+            pv_hash hll hll_union not null,
+            pv_base64 hll hll_union not null
+        )
+        AGGREGATE KEY(type_id,type_name)
+        DISTRIBUTED BY HASH(type_id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'version', '1'
+            set 'sql', """
+                    insert into ${db}.${tableName19} select 
c1,c2,hll_hash(c1),hll_from_base64(c3) from http_stream("format"="csv", 
"column_separator"=",")
+                    """
+            time 10000
+            file '../stream_load/test_stream_load_hll_type.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("http_stream result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+            }
+        }
+
+        qt_sql19 "select type_name, hll_union_agg(pv_hash), 
hll_union_agg(pv_base64) from ${tableName19} group by type_name  order by 
type_name"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName19}"
+    }
+
 }
 
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy
index 7df57ebbd16..48c3e5f9654 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load_new.groovy
@@ -540,5 +540,47 @@ suite("test_stream_load_new", "p0") {
     } finally {
         try_sql "DROP TABLE IF EXISTS ${tableName12}"
     }
+
+    // 13. test stream load hll type
+    def tableName13 = "test_stream_load_hll_type"
+
+    try {
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName13} (
+            type_id int,
+            type_name varchar(10),
+            pv_hash hll hll_union not null,
+            pv_base64 hll hll_union not null
+        )
+        AGGREGATE KEY(type_id,type_name)
+        DISTRIBUTED BY HASH(type_id) BUCKETS 1
+        PROPERTIES (
+          "replication_num" = "1"
+        )
+        """
+
+        streamLoad {
+            set 'column_separator', ','
+            set 'columns', 
'type_id,type_name,type_id_base64,pv_hash=hll_hash(type_id),pv_base64=hll_from_base64(type_id_base64)'
+            table "${tableName13}"
+            time 10000
+            file 'test_stream_load_hll_type.csv'
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(10, json.NumberTotalRows)
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+        sql """ sync; """
+        qt_sql13 "select type_name, hll_union_agg(pv_hash), 
hll_union_agg(pv_base64) from ${tableName13} group by type_name order by 
type_name"
+    } finally {
+        try_sql "DROP TABLE IF EXISTS ${tableName13}"
+    }
+
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to