This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d52a426002e [fix](pipeline-load) fix no error url when data quality error and total rows is negative (#34072) d52a426002e is described below commit d52a426002ec912516a592304146e5da2ac5997d Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Sat Apr 27 16:21:16 2024 +0800 [fix](pipeline-load) fix no error url when data quality error and total rows is negative (#34072) --- .../runtime/stream_load/stream_load_executor.cpp | 47 +++++-------- be/src/vec/sink/writer/vtablet_writer.cpp | 12 ++-- .../data/load_p0/stream_load/test_error_url.csv | 9 +++ .../load_p0/stream_load/test_pipeline_load.groovy | 2 +- .../stream_load/test_stream_load_error_url.groovy | 76 ++++++++++++++++++++++ .../test_partial_update_schema_change.groovy | 4 +- ...t_partial_update_schema_change_row_store.groovy | 4 +- 7 files changed, 113 insertions(+), 41 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 720c2e86898..58621c77a2a 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -78,38 +78,25 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte } ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); ctx->commit_infos = std::move(state->tablet_commit_infos()); - if (status->ok()) { - ctx->number_total_rows = state->num_rows_load_total(); - ctx->number_loaded_rows = state->num_rows_load_success(); - ctx->number_filtered_rows = state->num_rows_load_filtered(); - ctx->number_unselected_rows = state->num_rows_load_unselected(); - - int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows; - if (!ctx->group_commit && num_selected_rows > 0 && - (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { - // NOTE: Do not modify the error message here, for historical reasons, - // some users may rely on this error message. - *status = Status::DataQualityError("too many filtered rows"); - } - if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { - ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); - } + ctx->number_total_rows = state->num_rows_load_total(); + ctx->number_loaded_rows = state->num_rows_load_success(); + ctx->number_filtered_rows = state->num_rows_load_filtered(); + ctx->number_unselected_rows = state->num_rows_load_unselected(); + int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows; + if (!ctx->group_commit && num_selected_rows > 0 && + (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { + // NOTE: Do not modify the error message here, for historical reasons, + // some users may rely on this error message. + *status = Status::DataQualityError("too many filtered rows"); + } + if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { + ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); + } - if (status->ok()) { - DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); - DorisMetrics::instance()->stream_load_rows_total->increment( - ctx->number_loaded_rows); - } + if (status->ok()) { + DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); + DorisMetrics::instance()->stream_load_rows_total->increment(ctx->number_loaded_rows); } else { - if (ctx->group_commit) { - ctx->number_total_rows = state->num_rows_load_total(); - ctx->number_loaded_rows = state->num_rows_load_success(); - ctx->number_filtered_rows = state->num_rows_load_filtered(); - ctx->number_unselected_rows = state->num_rows_load_unselected(); - if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { - ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); - } - } LOG(WARNING) << "fragment execute failed" << ", err_msg=" << status->to_string() << ", " << ctx->brief(); // cancel body_sink, make sender known it diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 6f0c4a3e2a8..fdd683f42ce 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1676,6 +1676,12 @@ Status VTabletWriter::write(doris::vectorized::Block& input_block) { bool has_filtered_rows = false; int64_t filtered_rows = 0; _number_input_rows += rows; + // update incrementally so that FE can get the progress. + // the real 'num_rows_load_total' will be set when sink being closed. + _state->update_num_rows_load_total(rows); + _state->update_num_bytes_load_total(bytes); + DorisMetrics::instance()->load_rows->increment(rows); + DorisMetrics::instance()->load_bytes->increment(bytes); _row_distribution_watch.start(); RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( @@ -1688,12 +1694,6 @@ Status VTabletWriter::write(doris::vectorized::Block& input_block) { _generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload); _row_distribution_watch.stop(); - // update incrementally so that FE can get the progress. - // the real 'num_rows_load_total' will be set when sink being closed. - _state->update_num_rows_load_total(rows); - _state->update_num_bytes_load_total(bytes); - DorisMetrics::instance()->load_rows->increment(rows); - DorisMetrics::instance()->load_bytes->increment(bytes); // Random distribution and the block belongs to a single tablet, we could optimize to append the whole // block into node channel. bool load_block_to_single_tablet = diff --git a/regression-test/data/load_p0/stream_load/test_error_url.csv b/regression-test/data/load_p0/stream_load/test_error_url.csv new file mode 100644 index 00000000000..a1c8c042f11 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_error_url.csv @@ -0,0 +1,9 @@ +281747159,apple-store,2009-08-07,price_change,2.99,1.99,"","",2024-02-02T14:08:16+08:00 +281747159,apple-store,2009-08-13,version_change,1.1.2 (iPhone OS 3.0 Tested),1.1.1 (iPhone OS 3.0 Tested),"","",2024-02-02T14:08:16+08:00 +281790044,apple-store,2009-08-06,version_change,3.0 (iPhone OS 3.0 Tested),1.3.1 (iPhone OS 3.0 Tested),"","",2024-02-24T19:43:05+08:00 +281790044,apple-store,2009-08-17,version_change,3.0.1 (iPhone OS 3.0 Tested),3.0 (iPhone OS 3.0 Tested),"","",2024-02-24T19:43:05+08:00 +281796108,apple-store,2009-08-24,version_change,3.1.0 (iPhone OS 3.0 Tested),3.0.2 (iPhone OS 3.0 Tested),"","",2024-02-10T17:48:26+08:00 +281941097,apple-store,2009-08-15,version_change,2.6.0 (iPhone OS 3.0 Tested),2.5.0,"","",2024-02-17T11:15:40+08:00 +281941097,apple-store,2009-08-22,version_change,2.6.3 (iPhone OS 3.0 Tested),2.6.0 (iPhone OS 3.0 Tested),"","",2024-02-17T11:15:40+08:00 +282614216,apple-store,2009-08-12,version_change,1.4.0 (iPhone OS 3.0 Tested),1.3.0 (iPhone OS 3.0 Tested),"","",2024-02-21T21:57:58+08:00 +282738621,apple-store,2009-08-12,name_change,ZHI Chinese-English Dictionary,Chinese English Dictionary,"","",2024-02-17T14:20:44+08:00 \ No newline at end of file diff --git a/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy b/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy index b8978a15ca9..472176a519d 100644 --- a/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy @@ -152,7 +152,7 @@ suite("test_pipeline_load", "nonConcurrent") { def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) assertTrue(json.Message.contains("Encountered unqualified data")) - assertEquals(0, json.NumberTotalRows) + assertEquals(100, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy new file mode 100644 index 00000000000..72fc212e241 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_error_url", "p0") { + def tableName = "test_stream_load_error_url" + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `product_id` BIGINT NULL, + `market_code` VARCHAR(32), + `date` DATE NULL, + `event_type` VARCHAR(255) NULL, + `new_value` TEXT NULL, + `old_value` TEXT NULL, + `release_note` TEXT NULL, + `release_date` TEXT NULL, + `u_time` DATETIME NULL + ) ENGINE=OLAP + UNIQUE KEY(`product_id`, `market_code`, `date`, `event_type`) + COMMENT 'test_error_url' + PARTITION BY RANGE(`date`) + (PARTITION p_201001 VALUES [('2010-01-01'), ('2012-01-01')), + PARTITION p_201201 VALUES [('2012-01-01'), ('2014-01-01')), + PARTITION p_201401 VALUES [('2014-01-01'), ('2016-01-01')), + PARTITION p_201601 VALUES [('2016-01-01'), ('2018-01-01')), + PARTITION p_201801 VALUES [('2018-01-01'), ('2020-01-01')), + PARTITION p_202001 VALUES [('2020-01-01'), ('2022-01-01')), + PARTITION p_202201 VALUES [('2022-01-01'), ('2024-01-01')), + PARTITION p_202401 VALUES [('2024-01-01'), ('2026-01-01')), + PARTITION p_202601 VALUES [('2026-01-01'), ('2028-01-01')), + PARTITION p_202801 VALUES [('2028-01-01'), ('2028-12-01'))) + DISTRIBUTED BY HASH(`product_id`, `market_code`, `date`, `event_type`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'columns', 'k1, k2, k3' + file 'test_error_url.csv' + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows")) + def (code, out, err) = curl("GET", json.ErrorURL) + log.info("error result: " + out) + assertTrue(out.contains("actual column number in csv file is more than schema column number.actual number")) + log.info("url: " + json.ErrorURL) + } + } + } finally { + sql """ DROP TABLE IF EXISTS ${tableName} """ + } +} \ No newline at end of file diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy index 347cafd499a..820c8a5b09c 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy @@ -440,7 +440,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) + assertEquals(1, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } @@ -1035,7 +1035,7 @@ suite("test_partial_update_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) + assertEquals(1, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy index a9480b664de..3ea3c7a613e 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy @@ -444,7 +444,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) + assertEquals(1, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } @@ -1045,7 +1045,7 @@ suite("test_partial_update_row_store_schema_change", "p0") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("fail", json.Status.toLowerCase()) - assertEquals(0, json.NumberTotalRows) + assertEquals(1, json.NumberTotalRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org