This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new cd1c9edd714 [fix](pipeline-load) fix no error url when data quality 
error and total rows is negative (#34072) (#34204)
cd1c9edd714 is described below

commit cd1c9edd71432a82c50226e335a699d2aa337ca8
Author: Xin Liao <liaoxin...@126.com>
AuthorDate: Sat Apr 27 18:19:08 2024 +0800

    [fix](pipeline-load) fix no error url when data quality error and total 
rows is negative (#34072) (#34204)
    
    Co-authored-by: HHoflittlefish777 
<77738092+hhoflittlefish...@users.noreply.github.com>
---
 .../runtime/stream_load/stream_load_executor.cpp   | 47 +++++--------
 be/src/vec/sink/writer/vtablet_writer.cpp          | 12 ++--
 .../data/load_p0/stream_load/test_error_url.csv    |  9 +++
 .../load_p0/stream_load/test_pipeline_load.groovy  |  4 +-
 .../stream_load/test_stream_load_error_url.groovy  | 76 ++++++++++++++++++++++
 .../test_partial_update_schema_change.groovy       |  4 +-
 ...t_partial_update_schema_change_row_store.groovy |  4 +-
 7 files changed, 114 insertions(+), 42 deletions(-)

diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 720c2e86898..58621c77a2a 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -78,38 +78,25 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
         }
         ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
         ctx->commit_infos = std::move(state->tablet_commit_infos());
-        if (status->ok()) {
-            ctx->number_total_rows = state->num_rows_load_total();
-            ctx->number_loaded_rows = state->num_rows_load_success();
-            ctx->number_filtered_rows = state->num_rows_load_filtered();
-            ctx->number_unselected_rows = state->num_rows_load_unselected();
-
-            int64_t num_selected_rows = ctx->number_total_rows - 
ctx->number_unselected_rows;
-            if (!ctx->group_commit && num_selected_rows > 0 &&
-                (double)ctx->number_filtered_rows / num_selected_rows > 
ctx->max_filter_ratio) {
-                // NOTE: Do not modify the error message here, for historical 
reasons,
-                // some users may rely on this error message.
-                *status = Status::DataQualityError("too many filtered rows");
-            }
-            if (ctx->number_filtered_rows > 0 && 
!state->get_error_log_file_path().empty()) {
-                ctx->error_url = 
to_load_error_http_path(state->get_error_log_file_path());
-            }
+        ctx->number_total_rows = state->num_rows_load_total();
+        ctx->number_loaded_rows = state->num_rows_load_success();
+        ctx->number_filtered_rows = state->num_rows_load_filtered();
+        ctx->number_unselected_rows = state->num_rows_load_unselected();
+        int64_t num_selected_rows = ctx->number_total_rows - 
ctx->number_unselected_rows;
+        if (!ctx->group_commit && num_selected_rows > 0 &&
+            (double)ctx->number_filtered_rows / num_selected_rows > 
ctx->max_filter_ratio) {
+            // NOTE: Do not modify the error message here, for historical 
reasons,
+            // some users may rely on this error message.
+            *status = Status::DataQualityError("too many filtered rows");
+        }
+        if (ctx->number_filtered_rows > 0 && 
!state->get_error_log_file_path().empty()) {
+            ctx->error_url = 
to_load_error_http_path(state->get_error_log_file_path());
+        }
 
-            if (status->ok()) {
-                
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
-                DorisMetrics::instance()->stream_load_rows_total->increment(
-                        ctx->number_loaded_rows);
-            }
+        if (status->ok()) {
+            
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
+            
DorisMetrics::instance()->stream_load_rows_total->increment(ctx->number_loaded_rows);
         } else {
-            if (ctx->group_commit) {
-                ctx->number_total_rows = state->num_rows_load_total();
-                ctx->number_loaded_rows = state->num_rows_load_success();
-                ctx->number_filtered_rows = state->num_rows_load_filtered();
-                ctx->number_unselected_rows = 
state->num_rows_load_unselected();
-                if (ctx->number_filtered_rows > 0 && 
!state->get_error_log_file_path().empty()) {
-                    ctx->error_url = 
to_load_error_http_path(state->get_error_log_file_path());
-                }
-            }
             LOG(WARNING) << "fragment execute failed"
                          << ", err_msg=" << status->to_string() << ", " << 
ctx->brief();
             // cancel body_sink, make sender known it
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index fe846b1f48e..8d3a4c21168 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1662,6 +1662,12 @@ Status VTabletWriter::write(doris::vectorized::Block& 
input_block) {
     bool has_filtered_rows = false;
     int64_t filtered_rows = 0;
     _number_input_rows += rows;
+    // update incrementally so that FE can get the progress.
+    // the real 'num_rows_load_total' will be set when sink being closed.
+    _state->update_num_rows_load_total(rows);
+    _state->update_num_bytes_load_total(bytes);
+    DorisMetrics::instance()->load_rows->increment(rows);
+    DorisMetrics::instance()->load_bytes->increment(bytes);
 
     _row_distribution_watch.start();
     RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
@@ -1674,12 +1680,6 @@ Status VTabletWriter::write(doris::vectorized::Block& 
input_block) {
     _generate_index_channels_payloads(_row_part_tablet_ids, 
channel_to_payload);
     _row_distribution_watch.stop();
 
-    // update incrementally so that FE can get the progress.
-    // the real 'num_rows_load_total' will be set when sink being closed.
-    _state->update_num_rows_load_total(rows);
-    _state->update_num_bytes_load_total(bytes);
-    DorisMetrics::instance()->load_rows->increment(rows);
-    DorisMetrics::instance()->load_bytes->increment(bytes);
     // Random distribution and the block belongs to a single tablet, we could 
optimize to append the whole
     // block into node channel.
     bool load_block_to_single_tablet =
diff --git a/regression-test/data/load_p0/stream_load/test_error_url.csv 
b/regression-test/data/load_p0/stream_load/test_error_url.csv
new file mode 100644
index 00000000000..a1c8c042f11
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_error_url.csv
@@ -0,0 +1,9 @@
+281747159,apple-store,2009-08-07,price_change,2.99,1.99,"","",2024-02-02T14:08:16+08:00
+281747159,apple-store,2009-08-13,version_change,1.1.2 (iPhone OS 3.0 
Tested),1.1.1 (iPhone OS 3.0 Tested),"","",2024-02-02T14:08:16+08:00
+281790044,apple-store,2009-08-06,version_change,3.0 (iPhone OS 3.0 
Tested),1.3.1 (iPhone OS 3.0 Tested),"","",2024-02-24T19:43:05+08:00
+281790044,apple-store,2009-08-17,version_change,3.0.1 (iPhone OS 3.0 
Tested),3.0 (iPhone OS 3.0 Tested),"","",2024-02-24T19:43:05+08:00
+281796108,apple-store,2009-08-24,version_change,3.1.0 (iPhone OS 3.0 
Tested),3.0.2 (iPhone OS 3.0 Tested),"","",2024-02-10T17:48:26+08:00
+281941097,apple-store,2009-08-15,version_change,2.6.0 (iPhone OS 3.0 
Tested),2.5.0,"","",2024-02-17T11:15:40+08:00
+281941097,apple-store,2009-08-22,version_change,2.6.3 (iPhone OS 3.0 
Tested),2.6.0 (iPhone OS 3.0 Tested),"","",2024-02-17T11:15:40+08:00
+282614216,apple-store,2009-08-12,version_change,1.4.0 (iPhone OS 3.0 
Tested),1.3.0 (iPhone OS 3.0 Tested),"","",2024-02-21T21:57:58+08:00
+282738621,apple-store,2009-08-12,name_change,ZHI Chinese-English 
Dictionary,Chinese English Dictionary,"","",2024-02-17T14:20:44+08:00
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy
index 414224f5756..472176a519d 100644
--- a/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy
@@ -151,8 +151,8 @@ suite("test_pipeline_load", "nonConcurrent") {
                 log.info("Stream load result: ${result}".toString())
                 def json = parseJson(result)
                 assertEquals("fail", json.Status.toLowerCase())
-                
assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]Encountered unqualified 
data"))
-                assertEquals(0, json.NumberTotalRows)
+                assertTrue(json.Message.contains("Encountered unqualified 
data"))
+                assertEquals(100, json.NumberTotalRows)
                 assertEquals(0, json.NumberFilteredRows)
                 assertEquals(0, json.NumberUnselectedRows)
             }
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
new file mode 100644
index 00000000000..72fc212e241
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_error_url", "p0") {
+    def tableName = "test_stream_load_error_url"
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `product_id` BIGINT NULL,
+                `market_code` VARCHAR(32),
+                `date` DATE NULL,
+                `event_type` VARCHAR(255) NULL,
+                `new_value` TEXT NULL,
+                `old_value` TEXT NULL,
+                `release_note` TEXT NULL,
+                `release_date` TEXT NULL,
+                `u_time` DATETIME NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(`product_id`, `market_code`, `date`, `event_type`)
+            COMMENT 'test_error_url'
+            PARTITION BY RANGE(`date`)
+            (PARTITION p_201001 VALUES [('2010-01-01'), ('2012-01-01')),
+            PARTITION p_201201 VALUES [('2012-01-01'), ('2014-01-01')),
+            PARTITION p_201401 VALUES [('2014-01-01'), ('2016-01-01')),
+            PARTITION p_201601 VALUES [('2016-01-01'), ('2018-01-01')),
+            PARTITION p_201801 VALUES [('2018-01-01'), ('2020-01-01')),
+            PARTITION p_202001 VALUES [('2020-01-01'), ('2022-01-01')),
+            PARTITION p_202201 VALUES [('2022-01-01'), ('2024-01-01')),
+            PARTITION p_202401 VALUES [('2024-01-01'), ('2026-01-01')),
+            PARTITION p_202601 VALUES [('2026-01-01'), ('2028-01-01')),
+            PARTITION p_202801 VALUES [('2028-01-01'), ('2028-12-01')))
+            DISTRIBUTED BY HASH(`product_id`, `market_code`, `date`, 
`event_type`) BUCKETS 10
+            PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1"
+            );
+        """
+
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', ','
+            set 'columns', 'k1, k2, k3'
+            file 'test_error_url.csv'
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("fail", json.Status.toLowerCase())
+                assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many 
filtered rows"))
+                def (code, out, err) = curl("GET", json.ErrorURL)
+                log.info("error result: " + out)
+                assertTrue(out.contains("actual column number in csv file is  
more than  schema column number.actual number"))
+                log.info("url: " + json.ErrorURL)
+            }
+        }
+    } finally {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
index eee92b2b39e..0ed7c042abf 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
@@ -440,7 +440,7 @@ suite("test_partial_update_schema_change", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("fail", json.Status.toLowerCase())
-            assertEquals(0, json.NumberTotalRows)
+            assertEquals(1, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
             assertEquals(0, json.NumberUnselectedRows)
         }
@@ -1033,7 +1033,7 @@ suite("test_partial_update_schema_change", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("fail", json.Status.toLowerCase())
-            assertEquals(0, json.NumberTotalRows)
+            assertEquals(1, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
             assertEquals(0, json.NumberUnselectedRows)
         }
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
index 1fa4ec39d36..ef57c3fec40 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
@@ -444,7 +444,7 @@ suite("test_partial_update_row_store_schema_change", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("fail", json.Status.toLowerCase())
-            assertEquals(0, json.NumberTotalRows)
+            assertEquals(1, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
             assertEquals(0, json.NumberUnselectedRows)
         }
@@ -1043,7 +1043,7 @@ suite("test_partial_update_row_store_schema_change", 
"p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("fail", json.Status.toLowerCase())
-            assertEquals(0, json.NumberTotalRows)
+            assertEquals(1, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
             assertEquals(0, json.NumberUnselectedRows)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to