This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new d420ff0 display current load bytes to show load progress, (#7134) d420ff0 is described below commit d420ff0afdc1b85e4856d855e19d39133fc5f094 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Wed Nov 24 10:08:32 2021 +0800 display current load bytes to show load progress, (#7134) this value may greate than the file size when loading parquert or orc file, will less than file size when loading csv file. --- be/src/exec/tablet_sink.cpp | 12 ++++---- be/src/runtime/fragment_mgr.cpp | 18 ++++++------ .../apache/doris/load/loadv2/BrokerLoadJob.java | 10 +++++++ .../java/org/apache/doris/load/loadv2/LoadJob.java | 33 +++++++++++++++++++--- .../org/apache/doris/load/loadv2/LoadManager.java | 4 +-- .../org/apache/doris/load/loadv2/SparkLoadJob.java | 3 +- .../main/java/org/apache/doris/qe/Coordinator.java | 4 +-- .../apache/doris/load/loadv2/dpp/DppResult.java | 4 +++ gensrc/thrift/FrontendService.thrift | 2 ++ 9 files changed, 68 insertions(+), 22 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 3e2db46..7ee4e86 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -707,13 +707,15 @@ Status OlapTableSink::open(RuntimeState* state) { Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { SCOPED_TIMER(_profile->total_time_counter()); - _number_input_rows += input_batch->num_rows(); // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. - state->update_num_rows_load_total(input_batch->num_rows()); - state->update_num_bytes_load_total(input_batch->total_byte_size()); - DorisMetrics::instance()->load_rows->increment(input_batch->num_rows()); - DorisMetrics::instance()->load_bytes->increment(input_batch->total_byte_size()); + int64_t num_rows = input_batch->num_rows(); + int64_t num_bytes = input_batch->total_byte_size(); + _number_input_rows += num_rows; + state->update_num_rows_load_total(num_rows); + state->update_num_bytes_load_total(num_bytes); + DorisMetrics::instance()->load_rows->increment(num_rows); + DorisMetrics::instance()->load_bytes->increment(num_bytes); RowBatch* batch = input_batch; if (!_output_expr_ctxs.empty()) { SCOPED_RAW_TIMER(&_convert_batch_ns); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9719e6c..dab2046 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -307,9 +307,11 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil if (runtime_state->query_options().query_type == TQueryType::LOAD && !done && status.ok()) { // this is a load plan, and load is not finished, just make a brief report params.__set_loaded_rows(runtime_state->num_rows_load_total()); + params.__set_loaded_bytes(runtime_state->num_bytes_load_total()); } else { if (runtime_state->query_options().query_type == TQueryType::LOAD) { params.__set_loaded_rows(runtime_state->num_rows_load_total()); + params.__set_loaded_bytes(runtime_state->num_bytes_load_total()); } if (profile == nullptr) { params.__isset.profile = false; @@ -373,11 +375,9 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil try { coord->reportExecStatus(res, params); } catch (TTransportException& e) { - LOG(WARNING) << "Retrying ReportExecStatus. query id: " - << print_id(_query_id) << ", instance id: " - << print_id(_fragment_instance_id) - << " to " << _coord_addr - << ", err: " << e.what(); + LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id) + << ", instance id: " << print_id(_fragment_instance_id) << " to " + << _coord_addr << ", err: " << e.what(); rpc_status = coord.reopen(); if (!rpc_status.ok()) { @@ -452,9 +452,11 @@ FragmentMgr::~FragmentMgr() { static void empty_function(PlanFragmentExecutor* exec) {} void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb) { - TAG(LOG(INFO)).log("PlanFragmentExecutor::_exec_actual") - .query_id(exec_state->query_id()).instance_id(exec_state->fragment_instance_id()) - .tag("pthread_id", std::to_string((uintptr_t) pthread_self())); + TAG(LOG(INFO)) + .log("PlanFragmentExecutor::_exec_actual") + .query_id(exec_state->query_id()) + .instance_id(exec_state->fragment_instance_id()) + .tag("pthread_id", std::to_string((uintptr_t)pthread_self())); exec_state->execute(); std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index b57a2de..96cf48d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -346,6 +346,16 @@ public class BrokerLoadJob extends BulkLoadJob { } } + @Override + public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, + long scannedBytes, boolean isDone) { + super.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); + progress = (int) ((double) loadStatistic.getLoadBytes() / loadStatistic.totalFileSizeB * 100); + if (progress >= 100) { + progress = 99; + } + } + private String increaseCounter(String key, String deltaValue) { long value = 0; if (loadingStatus.getCounters().containsKey(key)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index c5c97b3..1d86239 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -136,6 +136,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements // load task id -> fragment id -> rows count private Table<TUniqueId, TUniqueId, Long> counterTbl = HashBasedTable.create(); + // load task id -> fragment id -> load bytes + private Table<TUniqueId, TUniqueId, Long> loadBytes = HashBasedTable.create(); + // load task id -> unfinished backend id list private Map<TUniqueId, List<Long>> unfinishedBackendIds = Maps.newHashMap(); // load task id -> all backend id list @@ -151,6 +154,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements for (TUniqueId fragId : fragmentIds) { counterTbl.put(loadId, fragId, 0L); } + loadBytes.rowMap().remove(loadId); + for (TUniqueId fragId : fragmentIds) { + loadBytes.put(loadId, fragId, 0L); + } allBackendIds.put(loadId, relatedBackendIds); // need to get a copy of relatedBackendIds, so that when we modify the "relatedBackendIds" in // allBackendIds, the list in unfinishedBackendIds will not be changed. @@ -159,15 +166,20 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements public synchronized void removeLoad(TUniqueId loadId) { counterTbl.rowMap().remove(loadId); + loadBytes.rowMap().remove(loadId); unfinishedBackendIds.remove(loadId); allBackendIds.remove(loadId); } public synchronized void updateLoadProgress(long backendId, TUniqueId loadId, TUniqueId fragmentId, - long rows, boolean isDone) { + long rows, long bytes, boolean isDone) { if (counterTbl.contains(loadId, fragmentId)) { counterTbl.put(loadId, fragmentId, rows); } + + if (loadBytes.contains(loadId, fragmentId)) { + loadBytes.put(loadId, fragmentId, bytes); + } if (isDone && unfinishedBackendIds.containsKey(loadId)) { unfinishedBackendIds.get(loadId).remove(backendId); } @@ -181,18 +193,30 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements return total; } + public synchronized long getLoadBytes() { + long total = 0; + for (long bytes : loadBytes.values()) { + total += bytes; + } + return total; + } + public synchronized String toJson() { long total = 0; for (long rows : counterTbl.values()) { total += rows; } + long totalBytes = 0; + for (long bytes : loadBytes.values()) { + totalBytes += bytes; + } Map<String, Object> details = Maps.newHashMap(); details.put("ScannedRows", total); + details.put("LoadBytes", totalBytes); details.put("FileNumber", fileNum); details.put("FileSize", totalFileSizeB); details.put("TaskNumber", counterTbl.rowMap().size()); - details.put("TaskNumber", counterTbl.rowMap().size()); details.put("Unfinished backends", getPrintableMap(unfinishedBackendIds)); details.put("All backends", getPrintableMap(allBackendIds)); Gson gson = new Gson(); @@ -284,8 +308,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements loadStatistic.initLoad(loadId, fragmentIds, relatedBackendIds); } - public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, boolean isDone) { - loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, isDone); + public void updateProgress(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, + long scannedBytes, boolean isDone) { + loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); } public void setLoadFileInfo(int fileNum, long fileSize) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 0c71559..42aea02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -675,10 +675,10 @@ public class LoadManager implements Writable{ } public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, - long scannedRows, boolean isDone) { + long scannedRows, long scannedBytes, boolean isDone) { LoadJob job = idToLoadJob.get(jobId); if (job != null) { - job.updateProgress(beId, loadId, fragmentId, scannedRows, isDone); + job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index cb38cbf..9185437 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -331,7 +331,8 @@ public class SparkLoadJob extends BulkLoadJob { TUniqueId dummyId = new TUniqueId(0, 0); long dummyBackendId = -1L; loadStatistic.initLoad(dummyId, Sets.newHashSet(dummyId), Lists.newArrayList(dummyBackendId)); - loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows, true); + loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows, + dppResult.scannedBytes, true); Map<String, String> counters = loadingStatus.getCounters(); counters.put(DPP_NORMAL_ALL, String.valueOf(dppResult.normalRows)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ad0a9d9..f1adb35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1538,8 +1538,8 @@ public class Coordinator { if (params.isSetLoadedRows()) { Catalog.getCurrentCatalog().getLoadManager().updateJobProgress( - jobId, params.backend_id, params.query_id, params.fragment_instance_id, params.loaded_rows, - params.done); + jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), + params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); } } diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java index fa813ca..037cc9b 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java @@ -32,6 +32,7 @@ public class DppResult implements Serializable { abnormalRows = 0; unselectRows = 0; partialAbnormalRows = ""; + scannedBytes = 0; } @SerializedName("is_success") @@ -61,4 +62,7 @@ public class DppResult implements Serializable { // only part of abnormal rows will be returned @SerializedName("partial_abnormal_rows") public String partialAbnormalRows; + + @SerializedName("scanned_bytes") + public long scannedBytes; } \ No newline at end of file diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 374e282..013ac0d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -398,6 +398,8 @@ struct TReportExecStatusParams { 15: optional i64 loaded_rows 16: optional i64 backend_id + + 17: optional i64 loaded_bytes } struct TFeResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org