This is an automated email from the ASF dual-hosted git repository.
hubgeter pushed a commit to branch mc-test-branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/mc-test-branch-4.1 by this
push:
new f597ada252c [test](log)add some log for debug maxcompute
f597ada252c is described below
commit f597ada252ccfbc4acdf7a7cf4f28889f5272db9
Author: daidai <[email protected]>
AuthorDate: Mon May 25 18:02:27 2026 +0800
[test](log)add some log for debug maxcompute
---
be/src/exec/pipeline/pipeline_fragment_context.cpp | 52 +++++
.../writer/maxcompute/vmc_partition_writer.cpp | 131 ++++++++++-
.../sink/writer/maxcompute/vmc_table_writer.cpp | 193 +++++++++++++++-
.../format/transformer/vjni_format_transformer.cpp | 255 ++++++++++++++++++---
.../org/apache/doris/common/jni/JniWriter.java | 45 +++-
.../doris/maxcompute/MaxComputeFeClient.java | 68 +++++-
.../doris/maxcompute/MaxComputeJniWriter.java | 213 ++++++++++++++++-
.../doris/datasource/maxcompute/MCTransaction.java | 104 +++++++++
.../plans/commands/insert/MCInsertExecutor.java | 41 ++++
.../apache/doris/planner/MaxComputeTableSink.java | 17 ++
.../main/java/org/apache/doris/qe/Coordinator.java | 15 +-
.../org/apache/doris/qe/runtime/LoadProcessor.java | 15 ++
.../apache/doris/service/FrontendServiceImpl.java | 26 +++
13 files changed, 1123 insertions(+), 52 deletions(-)
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index c8f83ad0781..9d13a8bcaf4 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -2168,15 +2168,43 @@ void
PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r
if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
params.__isset.mc_commit_datas = true;
params.mc_commit_datas.insert(params.mc_commit_datas.end(),
mcd.begin(), mcd.end());
+ LOG(INFO) << "MC_DIAG stage=BE_REPORT_MC_COMMIT_DATA_FROM_PRIMARY"
+ << ", query_id=" << print_id(req.query_id)
+ << ", fragment_instance_id=" <<
print_id(req.fragment_instance_id)
+ << ", commit_datas=" << mcd.size();
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
params.__isset.mc_commit_datas = true;
params.mc_commit_datas.insert(params.mc_commit_datas.end(),
rs_mcd.begin(),
rs_mcd.end());
+ LOG(INFO) << "MC_DIAG
stage=BE_REPORT_MC_COMMIT_DATA_FROM_RUNTIME_STATE"
+ << ", query_id=" << print_id(req.query_id)
+ << ", fragment_instance_id=" <<
print_id(req.fragment_instance_id)
+ << ", commit_datas=" << rs_mcd.size();
}
}
}
+ if (params.__isset.mc_commit_datas) {
+ int64_t mc_rows = 0;
+ int64_t mc_commit_messages = 0;
+ int64_t mc_commit_message_bytes = 0;
+ for (const auto& data : params.mc_commit_datas) {
+ if (data.__isset.row_count) {
+ mc_rows += data.row_count;
+ }
+ if (data.__isset.commit_message && !data.commit_message.empty()) {
+ mc_commit_messages++;
+ mc_commit_message_bytes += data.commit_message.size();
+ }
+ }
+ LOG(INFO) << "MC_DIAG stage=BE_REPORT_MC_COMMIT_DATA_READY"
+ << ", query_id=" << print_id(req.query_id)
+ << ", fragment_instance_id=" <<
print_id(req.fragment_instance_id)
+ << ", commit_datas=" << params.mc_commit_datas.size()
+ << ", rows=" << mc_rows << ", commit_messages=" <<
mc_commit_messages
+ << ", commit_message_bytes=" << mc_commit_message_bytes;
+ }
req.runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (!params.error_log.empty());
@@ -2197,7 +2225,19 @@ void
PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r
}
try {
try {
+ LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_BEFORE"
+ << ", query_id=" << print_id(req.query_id)
+ << ", fragment_instance_id=" <<
print_id(req.fragment_instance_id)
+ << ", has_mc_commit_datas=" <<
params.__isset.mc_commit_datas
+ << ", mc_commit_datas="
+ << (params.__isset.mc_commit_datas ?
params.mc_commit_datas.size() : 0);
(*coord)->reportExecStatus(res, params);
+ LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_AFTER"
+ << ", query_id=" << print_id(req.query_id)
+ << ", fragment_instance_id=" <<
print_id(req.fragment_instance_id)
+ << ", has_mc_commit_datas=" <<
params.__isset.mc_commit_datas
+ << ", mc_commit_datas="
+ << (params.__isset.mc_commit_datas ?
params.mc_commit_datas.size() : 0);
} catch ([[maybe_unused]]
apache::thrift::transport::TTransportException& e) {
#ifndef ADDRESS_SANITIZER
LOG(WARNING) << "Retrying ReportExecStatus. query id: " <<
print_id(req.query_id)
@@ -2210,7 +2250,19 @@ void
PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r
req.cancel_fn(rpc_status);
return;
}
+ LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_RETRY_BEFORE"
+ << ", query_id=" << print_id(req.query_id)
+ << ", fragment_instance_id=" <<
print_id(req.fragment_instance_id)
+ << ", has_mc_commit_datas=" <<
params.__isset.mc_commit_datas
+ << ", mc_commit_datas="
+ << (params.__isset.mc_commit_datas ?
params.mc_commit_datas.size() : 0);
(*coord)->reportExecStatus(res, params);
+ LOG(INFO) << "MC_DIAG stage=BE_REPORT_EXEC_STATUS_RETRY_AFTER"
+ << ", query_id=" << print_id(req.query_id)
+ << ", fragment_instance_id=" <<
print_id(req.fragment_instance_id)
+ << ", has_mc_commit_datas=" <<
params.__isset.mc_commit_datas
+ << ", mc_commit_datas="
+ << (params.__isset.mc_commit_datas ?
params.mc_commit_datas.size() : 0);
}
rpc_status = Status::create<false>(res.status);
diff --git a/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp
b/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp
index ef1e6f069da..349df0e7067 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp
+++ b/be/src/exec/sink/writer/maxcompute/vmc_partition_writer.cpp
@@ -19,9 +19,20 @@
#include "format/transformer/vjni_format_transformer.h"
#include "runtime/runtime_state.h"
+#include "util/time.h"
+#include "util/uid_util.h"
namespace doris {
+namespace {
+
+std::string mc_diag_param(const std::map<std::string, std::string>& params,
const std::string& key) {
+ auto it = params.find(key);
+ return it == params.end() ? "" : it->second;
+}
+
+} // namespace
+
VMCPartitionWriter::VMCPartitionWriter(RuntimeState* state,
const VExprContextSPtrs&
output_vexpr_ctxs,
const std::string& partition_spec,
@@ -32,30 +43,120 @@ VMCPartitionWriter::VMCPartitionWriter(RuntimeState* state,
_writer_params(std::move(writer_params)) {}
Status VMCPartitionWriter::open() {
+ int64_t start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITER_OPEN_ENTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec;
_jni_format_transformer = std::make_unique<VJniFormatTransformer>(
_state, _output_vexpr_ctxs,
"org/apache/doris/maxcompute/MaxComputeJniWriter",
_writer_params);
- return _jni_format_transformer->open();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_OPEN_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec;
+ Status status = _jni_format_transformer->open();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_OPEN_AFTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ return status;
}
Status VMCPartitionWriter::write(Block& block) {
- RETURN_IF_ERROR(_jni_format_transformer->write(block));
+ int64_t start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITE_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec << ", rows=" <<
block.rows()
+ << ", accumulated_rows=" << _row_count;
+ Status status = _jni_format_transformer->write(block);
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_WRITE_AFTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec << ", rows=" <<
block.rows()
+ << ", status=" << status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ RETURN_IF_ERROR(status);
_row_count += block.rows();
return Status::OK();
}
Status VMCPartitionWriter::close(const Status& status) {
+ int64_t start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_CLOSE_ENTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec << ", input_status="
<< status.to_string()
+ << ", accumulated_rows=" << _row_count;
Status result_status;
if (_jni_format_transformer) {
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_CLOSE_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec;
result_status = _jni_format_transformer->close();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_JNI_CLOSE_AFTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec
+ << ", status=" << result_status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
if (!result_status.ok()) {
LOG(WARNING) << "VMCPartitionWriter close failed: " <<
result_status.to_string();
}
}
if (result_status.ok() && status.ok()) {
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_BUILD_COMMIT_DATA_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec;
auto commit_data = _build_mc_commit_data();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_ADD_COMMIT_DATA_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec
+ << ", row_count=" << commit_data.row_count
+ << ", has_commit_message=" <<
commit_data.__isset.commit_message
+ << ", commit_message_length="
+ << (commit_data.__isset.commit_message ?
commit_data.commit_message.size() : 0)
+ << ", has_written_bytes=" <<
commit_data.__isset.written_bytes;
_state->add_mc_commit_datas(commit_data);
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_ADD_COMMIT_DATA_AFTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec;
}
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_CLOSE_EXIT"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec << ", status=" <<
result_status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
return result_status;
}
@@ -66,7 +167,22 @@ TMCCommitData VMCPartitionWriter::_build_mc_commit_data() {
// Get statistics from Java side via JNI getStatistics()
if (_jni_format_transformer) {
+ int64_t start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_GET_STATS_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec;
auto statistics = _jni_format_transformer->get_statistics();
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_GET_STATS_AFTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec
+ << ", stats_size=" << statistics.size()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
auto it = statistics.find("mc_commit_message");
if (it != statistics.end() && !it->second.empty()) {
commit_data.__set_commit_message(it->second);
@@ -76,6 +192,17 @@ TMCCommitData VMCPartitionWriter::_build_mc_commit_data() {
commit_data.__set_written_bytes(std::stoll(it->second));
}
}
+ LOG(INFO) << "MC_DIAG stage=BE_PARTITION_BUILD_COMMIT_DATA_AFTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << _partition_spec
+ << ", row_count=" << commit_data.row_count
+ << ", has_commit_message=" << commit_data.__isset.commit_message
+ << ", commit_message_length="
+ << (commit_data.__isset.commit_message ?
commit_data.commit_message.size() : 0)
+ << ", has_written_bytes=" << commit_data.__isset.written_bytes;
return commit_data;
}
diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
index f4a984dac05..c7805fb9bd4 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
+++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
@@ -24,11 +24,20 @@
#include "format/transformer/vjni_format_transformer.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
+#include "util/time.h"
#include "util/uid_util.h"
namespace doris {
#include "common/compile_check_begin.h"
+namespace {
+
+const char* mc_diag_bool(bool value) {
+ return value ? "true" : "false";
+}
+
+} // namespace
+
VMCTableWriter::VMCTableWriter(const TDataSink& t_sink, const
VExprContextSPtrs& output_expr_ctxs,
std::shared_ptr<Dependency> dep,
std::shared_ptr<Dependency> fin_dep)
: AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
@@ -43,11 +52,19 @@ Status VMCTableWriter::init_properties(ObjectPool* pool) {
Status VMCTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
+ int64_t start_ms = MonotonicMillis();
LOG(INFO) << "VMCTableWriter::open"
<< ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
<< ", per_fragment_instance_idx=" <<
state->per_fragment_instance_idx()
<< ", write_session_id=" << _mc_sink.write_session_id;
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_OPEN_ENTER"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", per_fragment_instance_idx=" <<
state->per_fragment_instance_idx()
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "");
_written_rows_counter = ADD_COUNTER(_operator_profile, "WrittenRows",
TUnit::UNIT);
_send_data_timer = ADD_TIMER(_operator_profile, "SendDataTime");
@@ -95,6 +112,18 @@ Status VMCTableWriter::open(RuntimeState* state,
RuntimeProfile* profile) {
}
}
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_OPEN_EXIT"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", per_fragment_instance_idx=" <<
state->per_fragment_instance_idx()
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_columns=" << _partition_column_names.size()
+ << ", has_static_partition=" <<
mc_diag_bool(_has_static_partition)
+ << ", static_partition_spec=" << _static_partition_spec
+ << ", write_exprs=" << _write_output_vexpr_ctxs.size()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
return Status::OK();
}
@@ -124,6 +153,15 @@ std::map<std::string, std::string>
VMCTableWriter::_build_base_writer_params() {
params["fe_port"] = std::to_string(master_fe_addr.port);
params["fe_rpc_timeout_ms"] =
std::to_string(config::thrift_rpc_timeout_ms);
params["fe_thrift_server_type"] = config::thrift_server_type_of_fe;
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_BUILD_WRITER_PARAMS"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", fe=" << master_fe_addr.hostname << ":" <<
master_fe_addr.port
+ << ", read_timeout=" << (_mc_sink.__isset.read_timeout ?
_mc_sink.read_timeout : 0)
+ << ", retry_count=" << (_mc_sink.__isset.retry_count ?
_mc_sink.retry_count : 0);
return params;
}
@@ -134,6 +172,13 @@ std::shared_ptr<VMCPartitionWriter>
VMCTableWriter::_create_partition_writer(
LOG(INFO) << "VMCTableWriter::_create_partition_writer"
<< ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
<< ", partition_spec=" << partition_spec;
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_CREATE_PARTITION_WRITER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << partition_spec;
return std::make_shared<VMCPartitionWriter>(_state,
_write_output_vexpr_ctxs, partition_spec,
std::move(params));
}
@@ -144,10 +189,41 @@ Status VMCTableWriter::write(RuntimeState* state, Block&
block) {
return Status::OK();
}
+ int64_t start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_WRITE_ENTER"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", input_rows=" << block.rows() << ", input_columns=" <<
block.columns()
+ << ", has_static_partition=" <<
mc_diag_bool(_has_static_partition)
+ << ", partition_columns=" << _partition_column_names.size();
Block output_block;
-
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs,
block,
-
&output_block, false));
+ int64_t expr_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_EXPR_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", input_rows=" << block.rows() << ", input_columns=" <<
block.columns();
+ Status status =
VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs, block,
+
&output_block, false);
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_EXPR_AFTER"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", status=" << status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - expr_start_ms;
+ RETURN_IF_ERROR(status);
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_MATERIALIZE_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", output_rows=" << output_block.rows()
+ << ", output_columns=" << output_block.columns();
materialize_block_inplace(output_block);
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_MATERIALIZE_AFTER"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", output_rows=" << output_block.rows()
+ << ", output_columns=" << output_block.columns();
_row_count += output_block.rows();
@@ -156,12 +232,46 @@ Status VMCTableWriter::write(RuntimeState* state, Block&
block) {
auto it = _partitions_to_writers.find(_static_partition_spec);
if (it == _partitions_to_writers.end()) {
auto writer = _create_partition_writer(_static_partition_spec);
- RETURN_IF_ERROR(writer->open());
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << _static_partition_spec;
+ status = writer->open();
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_AFTER"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << _static_partition_spec
+ << ", status=" << status.to_string();
+ RETURN_IF_ERROR(status);
_partitions_to_writers.insert({_static_partition_spec, writer});
it = _partitions_to_writers.find(_static_partition_spec);
}
output_block.erase(_non_write_columns_indices);
- return it->second->write(output_block);
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << _static_partition_spec
+ << ", rows=" << output_block.rows() << ", columns=" <<
output_block.columns();
+ status = it->second->write(output_block);
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_AFTER"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << _static_partition_spec
+ << ", rows=" << output_block.rows() << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ return status;
}
// Case 2: Dynamic partition or non-partitioned table
@@ -172,20 +282,81 @@ Status VMCTableWriter::write(RuntimeState* state, Block&
block) {
auto it = _partitions_to_writers.find(partition_key);
if (it == _partitions_to_writers.end()) {
auto writer = _create_partition_writer("");
- RETURN_IF_ERROR(writer->open());
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << partition_key
+ << ", dynamic_partition=" <<
mc_diag_bool(!_partition_column_names.empty());
+ status = writer->open();
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_OPEN_AFTER"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << partition_key << ", status=" <<
status.to_string();
+ RETURN_IF_ERROR(status);
_partitions_to_writers.insert({partition_key, writer});
it = _partitions_to_writers.find(partition_key);
}
- return it->second->write(output_block);
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << partition_key
+ << ", dynamic_partition=" <<
mc_diag_bool(!_partition_column_names.empty())
+ << ", rows=" << output_block.rows() << ", columns=" <<
output_block.columns();
+ status = it->second->write(output_block);
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_PARTITION_WRITE_AFTER"
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << partition_key << ", rows=" <<
output_block.rows()
+ << ", status=" << status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ return status;
}
Status VMCTableWriter::close(Status status) {
+ int64_t start_ms = MonotonicMillis();
Status result_status;
int64_t partitions_count = _partitions_to_writers.size();
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_ENTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", input_status=" << status.to_string()
+ << ", partitions_count=" << partitions_count
+ << ", row_count=" << _row_count;
{
SCOPED_RAW_TIMER(&_close_ns);
for (const auto& [partition_spec, writer] : _partitions_to_writers) {
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_PARTITION_BEFORE"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << partition_spec;
+ int64_t partition_close_start_ms = MonotonicMillis();
Status st = writer->close(status);
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_PARTITION_AFTER"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", partition_spec=" << partition_spec << ", status="
<< st.to_string()
+ << ", cost_ms=" << MonotonicMillis() -
partition_close_start_ms;
if (!st.ok()) {
LOG(WARNING) << "VMCPartitionWriter close failed for partition
" << partition_spec
<< ": " << st.to_string();
@@ -202,6 +373,16 @@ Status VMCTableWriter::close(Status status) {
COUNTER_SET(_close_timer, _close_ns);
COUNTER_SET(_partition_writers_count, partitions_count);
}
+ LOG(INFO) << "MC_DIAG stage=BE_TABLE_CLOSE_EXIT"
+ << ", fragment_instance_id=" <<
print_id(_state->fragment_instance_id())
+ << ", table=" << (_mc_sink.__isset.table_name ?
_mc_sink.table_name : "")
+ << ", txn_id=" << (_mc_sink.__isset.txn_id ?
std::to_string(_mc_sink.txn_id) : "")
+ << ", write_session_id="
+ << (_mc_sink.__isset.write_session_id ?
_mc_sink.write_session_id : "")
+ << ", result_status=" << result_status.to_string()
+ << ", partitions_count=" << partitions_count
+ << ", row_count=" << _row_count
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
return result_status;
}
diff --git a/be/src/format/transformer/vjni_format_transformer.cpp
b/be/src/format/transformer/vjni_format_transformer.cpp
index 782818f05aa..ac9a9bdb33f 100644
--- a/be/src/format/transformer/vjni_format_transformer.cpp
+++ b/be/src/format/transformer/vjni_format_transformer.cpp
@@ -19,9 +19,19 @@
#include "exec/connector/jni_connector.h"
#include "runtime/runtime_state.h"
+#include "util/time.h"
namespace doris {
+namespace {
+
+std::string mc_diag_param(const std::map<std::string, std::string>& params,
const std::string& key) {
+ auto it = params.find(key);
+ return it == params.end() ? "" : it->second;
+}
+
+} // namespace
+
VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state,
const VExprContextSPtrs&
output_vexpr_ctxs,
std::string writer_class,
@@ -32,45 +42,118 @@ VJniFormatTransformer::VJniFormatTransformer(RuntimeState*
state,
Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) {
// Load writer class via the same class loader as JniScanner
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_INIT_WRITER_ENTER"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << mc_diag_param(_writer_params,
"partition_spec")
+ << ", batch_size=" << batch_size;
Jni::GlobalClass jni_writer_cls;
- RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env,
_writer_class.c_str(), &jni_writer_cls));
+ int64_t start_ms = MonotonicMillis();
+ Status status = Jni::Util::get_jni_scanner_class(env,
_writer_class.c_str(), &jni_writer_cls);
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_CLASS_AFTER"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ RETURN_IF_ERROR(status);
// Get constructor: (int batchSize, Map<String,String> params)
Jni::MethodId writer_constructor;
- RETURN_IF_ERROR(
- jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V",
&writer_constructor));
+ start_ms = MonotonicMillis();
+ status = jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V",
&writer_constructor);
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_CONSTRUCTOR_AFTER"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ RETURN_IF_ERROR(status);
// Convert C++ params map to Java HashMap
Jni::LocalObject hashmap_object;
- RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _writer_params,
&hashmap_object));
+ start_ms = MonotonicMillis();
+ status = Jni::Util::convert_to_java_map(env, _writer_params,
&hashmap_object);
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_CONVERT_PARAMS_AFTER"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ RETURN_IF_ERROR(status);
// Create writer instance
- RETURN_IF_ERROR(jni_writer_cls.new_object(env, writer_constructor)
- .with_arg((jint)batch_size)
- .with_arg(hashmap_object)
- .call(&_jni_writer_obj));
+ start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_NEW_WRITER_BEFORE"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id");
+ status = jni_writer_cls.new_object(env, writer_constructor)
+ .with_arg((jint)batch_size)
+ .with_arg(hashmap_object)
+ .call(&_jni_writer_obj);
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_NEW_WRITER_AFTER"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ RETURN_IF_ERROR(status);
// Resolve method IDs
- RETURN_IF_ERROR(jni_writer_cls.get_method(env, "open", "()V",
&_jni_writer_open));
- RETURN_IF_ERROR(
- jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V",
&_jni_writer_write));
- RETURN_IF_ERROR(jni_writer_cls.get_method(env, "close", "()V",
&_jni_writer_close));
- RETURN_IF_ERROR(jni_writer_cls.get_method(env, "getStatistics",
"()Ljava/util/Map;",
- &_jni_writer_get_statistics));
+ status = jni_writer_cls.get_method(env, "open", "()V", &_jni_writer_open);
+ RETURN_IF_ERROR(status);
+ status = jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V",
&_jni_writer_write);
+ RETURN_IF_ERROR(status);
+ status = jni_writer_cls.get_method(env, "close", "()V",
&_jni_writer_close);
+ RETURN_IF_ERROR(status);
+ status = jni_writer_cls.get_method(env, "getStatistics",
"()Ljava/util/Map;",
+ &_jni_writer_get_statistics);
+ RETURN_IF_ERROR(status);
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_INIT_WRITER_EXIT"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id");
return Status::OK();
}
Status VJniFormatTransformer::open() {
+ int64_t open_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_ENTER"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", partition_spec=" << mc_diag_param(_writer_params,
"partition_spec");
JNIEnv* env = nullptr;
- RETURN_IF_ERROR(Jni::Env::Get(&env));
+ Status status = Jni::Env::Get(&env);
+ if (!status.ok()) {
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_GET_ENV_AFTER"
+ << ", status=" << status.to_string();
+ return status;
+ }
int batch_size = _state->batch_size();
- RETURN_IF_ERROR(_init_jni_writer(env, batch_size));
+ status = _init_jni_writer(env, batch_size);
+ if (!status.ok()) {
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_INIT_AFTER"
+ << ", status=" << status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - open_start_ms;
+ return status;
+ }
- RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env,
_jni_writer_open).call());
+ int64_t java_open_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_JAVA_OPEN_BEFORE"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id");
+ status = _jni_writer_obj.call_void_method(env, _jni_writer_open).call();
+ LOG(INFO) << "MC_DIAG stage=BE_JAVA_OPEN_CALL_RETURNED"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - java_open_start_ms;
+ RETURN_IF_ERROR(status);
RETURN_ERROR_IF_EXC(env);
_opened = true;
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_OPEN_EXIT"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", cost_ms=" << MonotonicMillis() - open_start_ms;
return Status::OK();
}
@@ -79,20 +162,50 @@ Status VJniFormatTransformer::write(const Block& block) {
return Status::OK();
}
+ int64_t write_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_ENTER"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", rows=" << block.rows() << ", columns=" << block.columns();
JNIEnv* env = nullptr;
- RETURN_IF_ERROR(Jni::Env::Get(&env));
+ Status status = Jni::Env::Get(&env);
+ if (!status.ok()) {
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_GET_ENV_AFTER"
+ << ", status=" << status.to_string();
+ return status;
+ }
// 1. Convert Block to Java table metadata (column addresses)
Block* mutable_block = const_cast<Block*>(&block);
std::unique_ptr<long[]> input_table;
- RETURN_IF_ERROR(JniConnector::to_java_table(mutable_block, input_table));
+ int64_t convert_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_TO_JAVA_TABLE_BEFORE"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", rows=" << block.rows() << ", columns=" << block.columns();
+ status = JniConnector::to_java_table(mutable_block, input_table);
+ LOG(INFO) << "MC_DIAG stage=BE_TO_JAVA_TABLE_AFTER"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - convert_start_ms;
+ RETURN_IF_ERROR(status);
// 2. Cache schema on first call
if (!_schema_cached) {
+ int64_t schema_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_PARSE_SCHEMA_BEFORE"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table");
auto schema = JniConnector::parse_table_schema(mutable_block);
_cached_required_fields = schema.first;
_cached_columns_types = schema.second;
_schema_cached = true;
+ LOG(INFO) << "MC_DIAG stage=BE_PARSE_SCHEMA_AFTER"
+ << ", writer_class=" << _writer_class
+ << ", required_fields_length=" <<
_cached_required_fields.size()
+ << ", columns_types_length=" << _cached_columns_types.size()
+ << ", cost_ms=" << MonotonicMillis() - schema_start_ms;
}
// 3. Build input params map for Java writer
@@ -103,28 +216,83 @@ Status VJniFormatTransformer::write(const Block& block) {
// 4. Convert to Java Map and call writer.write(inputParams)
Jni::LocalObject input_map;
- RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params,
&input_map));
+ int64_t map_start_ms = MonotonicMillis();
+ status = Jni::Util::convert_to_java_map(env, input_params, &input_map);
+ LOG(INFO) << "MC_DIAG stage=BE_CONVERT_INPUT_MAP_AFTER"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - map_start_ms;
+ RETURN_IF_ERROR(status);
- RETURN_IF_ERROR(
- _jni_writer_obj.call_void_method(env,
_jni_writer_write).with_arg(input_map).call());
+ int64_t java_write_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_JAVA_WRITE_BEFORE"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", rows=" << block.rows() << ", columns=" << block.columns();
+ status = _jni_writer_obj.call_void_method(env,
_jni_writer_write).with_arg(input_map).call();
+ LOG(INFO) << "MC_DIAG stage=BE_JAVA_WRITE_CALL_RETURNED"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - java_write_start_ms;
+ RETURN_IF_ERROR(status);
RETURN_ERROR_IF_EXC(env);
_cur_written_rows += block.rows();
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_WRITE_EXIT"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", rows=" << block.rows()
+ << ", cur_written_rows=" << _cur_written_rows
+ << ", cost_ms=" << MonotonicMillis() - write_start_ms;
return Status::OK();
}
Status VJniFormatTransformer::close() {
if (_closed || !_opened) {
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_SKIP"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", opened=" << _opened << ", closed=" << _closed;
return Status::OK();
}
_closed = true;
+ int64_t close_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_ENTER"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", cur_written_rows=" << _cur_written_rows;
JNIEnv* env = nullptr;
- RETURN_IF_ERROR(Jni::Env::Get(&env));
+ Status status = Jni::Env::Get(&env);
+ if (!status.ok()) {
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_GET_ENV_AFTER"
+ << ", status=" << status.to_string();
+ return status;
+ }
- RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env,
_jni_writer_close).call());
+ int64_t java_close_start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_JAVA_CLOSE_BEFORE"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id");
+ status = _jni_writer_obj.call_void_method(env, _jni_writer_close).call();
+ LOG(INFO) << "MC_DIAG stage=BE_JAVA_CLOSE_CALL_RETURNED"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - java_close_start_ms;
+ RETURN_IF_ERROR(status);
RETURN_ERROR_IF_EXC(env);
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_CLOSE_EXIT"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id")
+ << ", cost_ms=" << MonotonicMillis() - close_start_ms;
return Status::OK();
}
@@ -136,26 +304,57 @@ int64_t VJniFormatTransformer::written_len() {
std::map<std::string, std::string> VJniFormatTransformer::get_statistics() {
std::map<std::string, std::string> result;
if (!_opened) {
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_SKIP_NOT_OPENED"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table");
return result;
}
+ int64_t start_ms = MonotonicMillis();
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_ENTER"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", txn_id=" << mc_diag_param(_writer_params, "txn_id")
+ << ", write_session_id=" << mc_diag_param(_writer_params,
"write_session_id");
JNIEnv* env = nullptr;
- if (!Jni::Env::Get(&env).ok()) {
+ Status status = Jni::Env::Get(&env);
+ if (!status.ok()) {
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_GET_ENV_AFTER"
+ << ", status=" << status.to_string();
return result;
}
Jni::LocalObject stats_map;
- if (!_jni_writer_obj.call_object_method(env, _jni_writer_get_statistics)
- .call(&stats_map)
- .ok()) {
+ status = _jni_writer_obj.call_object_method(env,
_jni_writer_get_statistics).call(&stats_map);
+ LOG(INFO) << "MC_DIAG stage=BE_JAVA_GET_STATS_CALL_RETURNED"
+ << ", writer_class=" << _writer_class << ", status=" <<
status.to_string()
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
+ if (!status.ok()) {
return result;
}
if (stats_map.uninitialized()) {
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_EMPTY"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table");
return result;
}
// Convert Java Map<String,String> to C++ map
static_cast<void>(Jni::Util::convert_to_cpp_map(env, stats_map, &result));
+ auto commit_it = result.find("mc_commit_message");
+ auto written_rows_it = result.find("counter:WrittenRows");
+ auto written_bytes_it = result.find("bytes:WrittenBytes");
+ LOG(INFO) << "MC_DIAG stage=BE_JNI_GET_STATS_EXIT"
+ << ", writer_class=" << _writer_class
+ << ", table=" << mc_diag_param(_writer_params, "table")
+ << ", stats_size=" << result.size()
+ << ", commit_message_length="
+ << (commit_it == result.end() ? 0 : commit_it->second.size())
+ << ", written_rows="
+ << (written_rows_it == result.end() ? "" :
written_rows_it->second)
+ << ", written_bytes="
+ << (written_bytes_it == result.end() ? "" :
written_bytes_it->second)
+ << ", cost_ms=" << MonotonicMillis() - start_ms;
return result;
}
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
index 2dfefd2ac79..b94f7a0908f 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
@@ -20,6 +20,8 @@ package org.apache.doris.common.jni;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.VectorTable;
+import org.apache.log4j.Logger;
+
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
@@ -32,6 +34,8 @@ import java.util.Map;
* Lifecycle: open() -> write() [repeated] -> close()
*/
public abstract class JniWriter {
+ private static final Logger LOG = Logger.getLogger(JniWriter.class);
+
protected int batchSize;
protected Map<String, String> params;
protected ColumnType[] columnTypes;
@@ -51,10 +55,21 @@ public abstract class JniWriter {
* then delegates to writeInternal.
*/
public void write(Map<String, String> inputParams) throws IOException {
+ long writeEnterNs = System.nanoTime();
+ String requiredFields = inputParams.get("required_fields");
+ String columnsTypes = inputParams.get("columns_types");
+ LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_ENTER writer=" +
getClass().getName()
+ + ", batchSize=" + batchSize
+ + ", schemaCached=" + (columnTypes != null)
+ + ", requiredFieldsLength=" + (requiredFields == null ? 0 :
requiredFields.length())
+ + ", columnsTypesLength=" + (columnsTypes == null ? 0 :
columnsTypes.length())
+ + ", thread=" + Thread.currentThread().getName());
+
// Parse and cache schema on first call
if (columnTypes == null) {
- String requiredFields = inputParams.get("required_fields");
- String columnsTypes = inputParams.get("columns_types");
+ long schemaStartNs = System.nanoTime();
+ LOG.info("MC_DIAG stage=JNI_WRITER_SCHEMA_PARSE_BEFORE writer=" +
getClass().getName()
+ + ", thread=" + Thread.currentThread().getName());
if (requiredFields != null && !requiredFields.isEmpty()) {
fields = requiredFields.split(",");
String[] typeStrs = columnsTypes.split("#");
@@ -66,15 +81,37 @@ public abstract class JniWriter {
fields = new String[0];
columnTypes = new ColumnType[0];
}
+ LOG.info("MC_DIAG stage=JNI_WRITER_SCHEMA_PARSE_AFTER writer=" +
getClass().getName()
+ + ", fields=" + fields.length
+ + ", columnTypes=" + columnTypes.length
+ + ", costMs=" + elapsedMs(schemaStartNs)
+ + ", thread=" + Thread.currentThread().getName());
}
long startRead = System.nanoTime();
+ LOG.info("MC_DIAG stage=JNI_WRITER_CREATE_READABLE_TABLE_BEFORE
writer=" + getClass().getName()
+ + ", thread=" + Thread.currentThread().getName());
VectorTable inputTable = VectorTable.createReadableTable(inputParams);
readTableTime += System.nanoTime() - startRead;
+ LOG.info("MC_DIAG stage=JNI_WRITER_CREATE_READABLE_TABLE_AFTER
writer=" + getClass().getName()
+ + ", rows=" + inputTable.getNumRows()
+ + ", columns=" + inputTable.getNumColumns()
+ + ", costMs=" + elapsedMs(startRead)
+ + ", thread=" + Thread.currentThread().getName());
long startWrite = System.nanoTime();
+ LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_INTERNAL_BEFORE writer=" +
getClass().getName()
+ + ", rows=" + inputTable.getNumRows()
+ + ", columns=" + inputTable.getNumColumns()
+ + ", thread=" + Thread.currentThread().getName());
writeInternal(inputTable);
writeTime += System.nanoTime() - startWrite;
+ LOG.info("MC_DIAG stage=JNI_WRITER_WRITE_INTERNAL_AFTER writer=" +
getClass().getName()
+ + ", rows=" + inputTable.getNumRows()
+ + ", columns=" + inputTable.getNumColumns()
+ + ", writeCostMs=" + elapsedMs(startWrite)
+ + ", totalCostMs=" + elapsedMs(writeEnterNs)
+ + ", thread=" + Thread.currentThread().getName());
}
protected abstract void writeInternal(VectorTable inputTable) throws
IOException;
@@ -96,4 +133,8 @@ public abstract class JniWriter {
public long getReadTableTime() {
return readTableTime;
}
+
+ private static long elapsedMs(long startNs) {
+ return (System.nanoTime() - startNs) / 1_000_000L;
+ }
}
diff --git
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
index 82b58f48493..1c907b7c331 100644
---
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
+++
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
@@ -87,12 +87,22 @@ class MaxComputeFeClient implements AutoCloseable {
throw new IOException("empty MaxCompute write_session_id for
block_id allocation");
}
+ long startNs = System.nanoTime();
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_BEFORE txnId=" + txnId
+ + ", writeSessionId=" + writeSessionId
+ + ", masterFe=" + formatAddress(masterAddress)
+ + ", rpcTimeoutMs=" + rpcTimeoutMs);
TMaxComputeBlockIdRequest request = buildBlockIdRequest(txnId,
writeSessionId);
- return callWithMasterRedirect(
+ long blockId = callWithMasterRedirect(
"allocate MaxCompute block_id",
client -> client.getMaxComputeBlockIdRange(request),
(result, requestAddress, retryTimes) ->
handleBlockIdResult(result, requestAddress,
retryTimes, txnId, writeSessionId));
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_AFTER txnId=" + txnId
+ + ", writeSessionId=" + writeSessionId
+ + ", blockId=" + blockId
+ + ", costMs=" + elapsedMs(startNs));
+ return blockId;
}
@Override
@@ -109,16 +119,30 @@ class MaxComputeFeClient implements AutoCloseable {
for (int retryTimes = 0; retryTimes < FETCH_BLOCK_ID_MAX_RETRY_TIMES;
retryTimes++) {
TNetworkAddress requestAddress = copyAddress(masterAddress);
T result;
+ long rpcStartNs = System.nanoTime();
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_RPC_BEFORE operation=" +
operation
+ + ", retry=" + retryTimes
+ + ", fe=" + formatAddress(requestAddress)
+ + ", rpcTimeoutMs=" + rpcTimeoutMs
+ + ", framedTransport=" + useFramedTransport());
try {
result = rpcExecutor.call(requestAddress, rpcTimeoutMs,
useFramedTransport(), call);
} catch (Exception e) {
lastException = e;
rpcExecutor.close();
+ LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_RPC_ERROR operation=" +
operation
+ + ", retry=" + retryTimes
+ + ", fe=" + formatAddress(requestAddress)
+ + ", costMs=" + elapsedMs(rpcStartNs), e);
LOG.warn("Failed to " + operation + ", rpc failure,
retry_time="
+ retryTimes + ", fe=" +
formatAddress(requestAddress), e);
sleepBeforeRetry();
continue;
}
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_RPC_AFTER operation=" +
operation
+ + ", retry=" + retryTimes
+ + ", fe=" + formatAddress(requestAddress)
+ + ", costMs=" + elapsedMs(rpcStartNs));
try {
return handler.handle(result, requestAddress, retryTimes);
@@ -155,6 +179,11 @@ class MaxComputeFeClient implements AutoCloseable {
+ formatAddress(requestAddress) + ", switch to FE@" +
formatAddress(result.getMasterAddress())
+ ", retry_time=" + retryTimes + ", txn_id=" + txnId
+ ", write_session_id=" + writeSessionId);
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_NOT_MASTER txnId="
+ txnId
+ + ", writeSessionId=" + writeSessionId
+ + ", requestFe=" + formatAddress(requestAddress)
+ + ", masterFe=" + formatAddress(result.getMasterAddress())
+ + ", retry=" + retryTimes);
throw new NotMasterException(result.getMasterAddress());
}
@@ -173,11 +202,11 @@ class MaxComputeFeClient implements AutoCloseable {
+ result.getLength() + ", txn_id=" + txnId + ",
write_session_id=" + writeSessionId);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Allocated MaxCompute block_id from FE@" +
formatAddress(requestAddress)
- + ", txn_id=" + txnId + ", write_session_id=" +
writeSessionId
- + ", block_id=" + result.getStart());
- }
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_BLOCK_ID_RESULT txnId=" + txnId
+ + ", writeSessionId=" + writeSessionId
+ + ", fe=" + formatAddress(requestAddress)
+ + ", retry=" + retryTimes
+ + ", blockId=" + result.getStart());
return result.getStart();
}
@@ -248,6 +277,10 @@ class MaxComputeFeClient implements AutoCloseable {
return address.getHostname() + ":" + address.getPort();
}
+ private static long elapsedMs(long startNs) {
+ return (System.nanoTime() - startNs) / 1_000_000L;
+ }
+
interface RpcExecutor {
<T> T call(TNetworkAddress address, int timeoutMs, boolean
useFramedTransport,
FeCall<T> call) throws Exception;
@@ -273,10 +306,22 @@ class MaxComputeFeClient implements AutoCloseable {
@Override
public synchronized <T> T call(TNetworkAddress address, int timeoutMs,
boolean useFramedTransport,
FeCall<T> call) throws Exception {
+ long callStartNs = System.nanoTime();
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_EXECUTOR_CALL_ENTER fe=" +
formatAddress(address)
+ + ", timeoutMs=" + timeoutMs
+ + ", framedTransport=" + useFramedTransport);
ensureConnected(address, timeoutMs, useFramedTransport);
try {
- return call.call(client);
+ long thriftCallStartNs = System.nanoTime();
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_THRIFT_CALL_BEFORE fe="
+ formatAddress(address));
+ T result = call.call(client);
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_THRIFT_CALL_AFTER fe="
+ formatAddress(address)
+ + ", thriftCallCostMs=" + elapsedMs(thriftCallStartNs)
+ + ", totalCostMs=" + elapsedMs(callStartNs));
+ return result;
} catch (Exception e) {
+ LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_EXECUTOR_CALL_ERROR
fe=" + formatAddress(address)
+ + ", costMs=" + elapsedMs(callStartNs), e);
close();
throw e;
}
@@ -297,10 +342,15 @@ class MaxComputeFeClient implements AutoCloseable {
if (client != null && transport != null && transport.isOpen()
&& connectedFramedTransport == useFramedTransport
&& sameAddress(connectedAddress, address)) {
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_REUSE_CONNECTION fe=" +
formatAddress(address));
return;
}
close();
+ long connectStartNs = System.nanoTime();
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_BEFORE fe=" +
formatAddress(address)
+ + ", timeoutMs=" + timeoutMs
+ + ", framedTransport=" + useFramedTransport);
TTransport newTransport = createTransport(address, timeoutMs,
useFramedTransport);
try {
newTransport.open();
@@ -308,7 +358,11 @@ class MaxComputeFeClient implements AutoCloseable {
client = new FrontendService.Client(new
TBinaryProtocol(transport));
connectedAddress = copyAddress(address);
connectedFramedTransport = useFramedTransport;
+ LOG.info("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_AFTER fe=" +
formatAddress(address)
+ + ", costMs=" + elapsedMs(connectStartNs));
} catch (Exception e) {
+ LOG.warn("MC_DIAG stage=JAVA_FE_CLIENT_CONNECT_ERROR fe=" +
formatAddress(address)
+ + ", costMs=" + elapsedMs(connectStartNs), e);
newTransport.close();
throw e;
}
diff --git
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
index 9788184057e..334f7124f59 100644
---
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
+++
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
@@ -156,18 +156,37 @@ public class MaxComputeJniWriter extends JniWriter {
params.getOrDefault(MCProperties.WRITE_MAX_BLOCK_BYTES,
MCProperties.DEFAULT_WRITE_MAX_BLOCK_BYTES));
this.feClient = MaxComputeFeClient.create(params);
+ logDiag("JAVA_WRITER_CONSTRUCTED",
+ "connectTimeout=" + connectTimeout
+ + ", readTimeout=" + readTimeout
+ + ", retryCount=" + retryCount
+ + ", maxBlockBytes=" + maxBlockBytes
+ + ", preallocatedBlockId=" + preallocatedBlockId);
}
@Override
public void open() throws IOException {
+ logDiag("JAVA_OPEN_ENTER",
+ "connectTimeout=" + connectTimeout
+ + ", readTimeout=" + readTimeout
+ + ", retryCount=" + retryCount
+ + ", maxBlockBytes=" + maxBlockBytes);
try {
+ long stageStartNs = System.nanoTime();
+ logDiag("JAVA_CREATE_MC_CLIENT_BEFORE", "");
Odps odps = MCUtils.createMcClient(params);
odps.setDefaultProject(project);
odps.setEndpoint(endpoint);
+ logDiag("JAVA_CREATE_MC_CLIENT_AFTER", "costMs=" +
elapsedMs(stageStartNs));
+ stageStartNs = System.nanoTime();
+ logDiag("JAVA_BUILD_CREDENTIALS_BEFORE", "");
Credentials credentials =
Credentials.newBuilder().withAccount(odps.getAccount())
.withAppAccount(odps.getAppAccount()).build();
+ logDiag("JAVA_BUILD_CREDENTIALS_AFTER", "costMs=" +
elapsedMs(stageStartNs));
+ stageStartNs = System.nanoTime();
+ logDiag("JAVA_BUILD_SETTINGS_BEFORE", "");
RestOptions restOptions = RestOptions.newBuilder()
.withConnectTimeout(connectTimeout)
.withReadTimeout(readTimeout)
@@ -179,16 +198,22 @@ public class MaxComputeJniWriter extends JniWriter {
.withQuotaName(Strings.isNullOrEmpty(quota) ? null : quota)
.withRestOptions(restOptions)
.build();
+ logDiag("JAVA_BUILD_SETTINGS_AFTER", "costMs=" +
elapsedMs(stageStartNs));
// Restore the write session created by FE
+ stageStartNs = System.nanoTime();
+ logDiag("JAVA_RESTORE_SESSION_BEFORE", "");
writeSession = new TableWriteSessionBuilder()
.identifier(com.aliyun.odps.table.TableIdentifier.of(project, tableName))
.withSessionId(writeSessionId)
.withSettings(settings)
.buildBatchWriteSession();
+ logDiag("JAVA_RESTORE_SESSION_AFTER", "costMs=" +
elapsedMs(stageStartNs));
// SDK skips ArrowOptions when restoring session via withSessionId,
// set it via reflection to avoid NPE in ArrowWriterImpl
+ stageStartNs = System.nanoTime();
+ logDiag("JAVA_SET_ARROW_OPTIONS_BEFORE", "");
ArrowOptions arrowOptions = ArrowOptions.newBuilder()
.withDatetimeUnit(TimestampUnit.MILLI)
.withTimestampUnit(TimestampUnit.MILLI)
@@ -197,8 +222,11 @@ public class MaxComputeJniWriter extends JniWriter {
.getSuperclass().getDeclaredField("arrowOptions");
arrowField.setAccessible(true);
arrowField.set(writeSession, arrowOptions);
+ logDiag("JAVA_SET_ARROW_OPTIONS_AFTER", "costMs=" +
elapsedMs(stageStartNs));
// Get schema info for type mapping
+ stageStartNs = System.nanoTime();
+ logDiag("JAVA_REQUIRED_SCHEMA_BEFORE", "");
com.aliyun.odps.table.DataSchema dataSchema =
writeSession.requiredSchema();
columnTypeInfos = new java.util.ArrayList<>();
columnNames = new java.util.ArrayList<>();
@@ -206,20 +234,30 @@ public class MaxComputeJniWriter extends JniWriter {
columnTypeInfos.add(col.getTypeInfo());
columnNames.add(col.getName());
}
+ logDiag("JAVA_REQUIRED_SCHEMA_AFTER",
+ "columns=" + columnNames.size() + ", costMs=" +
elapsedMs(stageStartNs));
+ stageStartNs = System.nanoTime();
+ logDiag("JAVA_ALLOCATOR_CREATE_BEFORE", "");
allocator = new RootAllocator(Long.MAX_VALUE);
+ logDiag("JAVA_ALLOCATOR_CREATE_AFTER", "costMs=" +
elapsedMs(stageStartNs));
+ stageStartNs = System.nanoTime();
+ logDiag("JAVA_WRITER_OPTIONS_BEFORE", "");
writerOptions = WriterOptions.newBuilder()
.withSettings(settings)
.withCompressionCodec(CompressionCodec.ZSTD)
.build();
+ logDiag("JAVA_WRITER_OPTIONS_AFTER", "costMs=" +
elapsedMs(stageStartNs));
openBatchWriter(resolveInitialBlockId());
LOG.info("MaxComputeJniWriter opened: project=" + project + ",
table=" + tableName
+ ", writeSessionId=" + writeSessionId + ",
partitionSpec=" + partitionSpec
+ ", blockId=" + currentBlockId);
+ logDiag("JAVA_OPEN_EXIT", "");
} catch (Exception e) {
String errorMsg = "Failed to open MaxCompute write session for
table " + project + "." + tableName;
+ logDiag("JAVA_OPEN_ERROR", "error=" + e.getClass().getName() + ":
" + e.getMessage());
LOG.error(errorMsg, e);
throw new IOException(errorMsg, e);
}
@@ -233,58 +271,112 @@ public class MaxComputeJniWriter extends JniWriter {
return;
}
+ long startNs = System.nanoTime();
+ logDiag("JAVA_WRITE_INTERNAL_ENTER", "rows=" + numRows + ", columns="
+ numCols);
try {
writeRowsWithRowChecks(inputTable, numRows, numCols);
+ logDiag("JAVA_WRITE_INTERNAL_EXIT",
+ "rows=" + numRows + ", columns=" + numCols + ", costMs=" +
elapsedMs(startNs));
} catch (Exception e) {
String errorMsg = "Failed to write data to MaxCompute table " +
project + "." + tableName;
+ logDiag("JAVA_WRITE_INTERNAL_ERROR",
+ "rows=" + numRows + ", columns=" + numCols
+ + ", costMs=" + elapsedMs(startNs)
+ + ", error=" + e.getClass().getName() + ": " +
e.getMessage());
LOG.error(errorMsg, e);
throw new IOException(errorMsg, e);
}
}
private long resolveInitialBlockId() throws IOException {
- return preallocatedBlockId != null ? preallocatedBlockId :
requestBlockId();
+ if (preallocatedBlockId != null) {
+ logDiag("JAVA_RESOLVE_INITIAL_BLOCK_PREALLOCATED", "blockId=" +
preallocatedBlockId);
+ return preallocatedBlockId;
+ }
+ return requestBlockId();
}
private long requestBlockId() throws IOException {
- return feClient.requestBlockId(txnId, writeSessionId);
+ long startNs = System.nanoTime();
+ logDiag("JAVA_REQUEST_BLOCK_ID_BEFORE", "");
+ long blockId = feClient.requestBlockId(txnId, writeSessionId);
+ logDiag("JAVA_REQUEST_BLOCK_ID_AFTER",
+ "newBlockId=" + blockId + ", costMs=" + elapsedMs(startNs));
+ return blockId;
}
private void openBatchWriter(long blockId) throws IOException {
+ long startNs = System.nanoTime();
+ logDiag("JAVA_OPEN_BATCH_WRITER_BEFORE", "targetBlockId=" + blockId);
currentBlockId = blockId;
currentBlockWrittenBytes = 0L;
batchWriter = writeSession.createArrowWriter(blockId,
WriterAttemptId.of(0), writerOptions);
+ logDiag("JAVA_OPEN_BATCH_WRITER_AFTER",
+ "targetBlockId=" + blockId + ", costMs=" + elapsedMs(startNs));
}
private void closeCurrentBatchWriterAndCollectCommit() throws IOException {
if (batchWriter == null) {
+ logDiag("JAVA_COMMIT_SKIP_NO_BATCH_WRITER", "");
return;
}
+ long startNs = System.nanoTime();
+ logDiag("JAVA_COMMIT_BEFORE",
+ "currentBlockWrittenBytes=" + currentBlockWrittenBytes
+ + ", commitMessageCount=" + commitMessages.size());
WriterCommitMessage commitMessage = batchWriter.commit();
if (commitMessage != null) {
commitMessages.add(commitMessage);
}
batchWriter = null;
+ logDiag("JAVA_COMMIT_AFTER",
+ "commitMessageAdded=" + (commitMessage != null)
+ + ", commitMessageCount=" + commitMessages.size()
+ + ", costMs=" + elapsedMs(startNs));
}
private void rotateCurrentBatchWriter() throws IOException {
+ long startNs = System.nanoTime();
+ logDiag("JAVA_ROTATE_BEFORE",
+ "currentBlockWrittenBytes=" + currentBlockWrittenBytes
+ + ", commitMessageCount=" + commitMessages.size());
closeCurrentBatchWriterAndCollectCommit();
openBatchWriter(requestBlockId());
+ logDiag("JAVA_ROTATE_AFTER", "costMs=" + elapsedMs(startNs));
}
private void writeRowsWithRowChecks(VectorTable inputTable, int numRows,
int numCols) throws IOException {
+ logDiag("JAVA_WRITE_ROWS_ENTER", "rows=" + numRows + ", columns=" +
numCols);
int rowStart = 0;
while (rowStart < numRows) {
int rowEnd = rowStart;
long batchEstimatedBytes = 0L;
boolean rotateAfterWrite = false;
+ int estimatedRows = 0;
+ long rangeStartNs = System.nanoTime();
+ logDiag("JAVA_RANGE_SELECT_BEFORE",
+ "rowStart=" + rowStart
+ + ", rows=" + numRows
+ + ", currentBlockWrittenBytes=" +
currentBlockWrittenBytes);
while (rowEnd < numRows) {
+ if (estimatedRows == 0 || estimatedRows % 1024 == 0) {
+ logDiag("JAVA_RANGE_SELECT_PROGRESS",
+ "rowStart=" + rowStart
+ + ", probingRow=" + rowEnd
+ + ", estimatedRows=" + estimatedRows
+ + ", batchEstimatedBytes=" +
batchEstimatedBytes);
+ }
long rowEstimatedBytes =
estimateSingleRowPayloadBytes(inputTable, numCols, rowEnd);
+ estimatedRows++;
boolean exceedsHardLimit = currentBlockWrittenBytes +
batchEstimatedBytes
+ rowEstimatedBytes > maxBlockBytes;
if (exceedsHardLimit) {
if (rowEnd == rowStart) {
if (currentBlockWrittenBytes > 0) {
+
logDiag("JAVA_RANGE_SELECT_ROTATE_FOR_OVERSIZE_ROW",
+ "rowStart=" + rowStart
+ + ", rowEstimatedBytes=" +
rowEstimatedBytes
+ + ", currentBlockWrittenBytes=" +
currentBlockWrittenBytes);
rotateCurrentBatchWriter();
continue;
}
@@ -301,28 +393,75 @@ public class MaxComputeJniWriter extends JniWriter {
break;
}
}
+ logDiag("JAVA_RANGE_SELECT_AFTER",
+ "rowStart=" + rowStart
+ + ", rowEnd=" + rowEnd
+ + ", selectedRows=" + (rowEnd - rowStart)
+ + ", estimatedRows=" + estimatedRows
+ + ", batchEstimatedBytes=" + batchEstimatedBytes
+ + ", rotateAfterWrite=" + rotateAfterWrite
+ + ", costMs=" + elapsedMs(rangeStartNs));
if (rowEnd == rowStart) {
+ long fallbackStartNs = System.nanoTime();
+ logDiag("JAVA_RANGE_SELECT_FALLBACK_BEFORE", "rowStart=" +
rowStart);
long rowEstimatedBytes =
estimateSingleRowPayloadBytes(inputTable, numCols, rowStart);
batchEstimatedBytes = rowEstimatedBytes;
rowEnd = rowStart + 1;
rotateAfterWrite = true;
+ logDiag("JAVA_RANGE_SELECT_FALLBACK_AFTER",
+ "rowStart=" + rowStart
+ + ", rowEstimatedBytes=" + rowEstimatedBytes
+ + ", costMs=" + elapsedMs(fallbackStartNs));
}
- try (VectorSchemaRoot root = buildRowRangeRoot(inputTable,
numCols, rowStart, rowEnd)) {
+ long buildStartNs = System.nanoTime();
+ logDiag("JAVA_BUILD_ROOT_BEFORE",
+ "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+ + ", selectedRows=" + (rowEnd - rowStart));
+ try (VectorSchemaRoot root = buildRowRangeRoot(inputTable,
numCols, rowStart, rowEnd, true)) {
+ logDiag("JAVA_BUILD_ROOT_AFTER",
+ "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+ + ", selectedRows=" + (rowEnd - rowStart)
+ + ", costMs=" + elapsedMs(buildStartNs));
+ long writeStartNs = System.nanoTime();
+ logDiag("JAVA_BATCH_WRITE_BEFORE",
+ "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+ + ", selectedRows=" + (rowEnd - rowStart)
+ + ", batchEstimatedBytes=" +
batchEstimatedBytes);
batchWriter.write(root);
+ logDiag("JAVA_BATCH_WRITE_AFTER",
+ "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+ + ", selectedRows=" + (rowEnd - rowStart)
+ + ", costMs=" + elapsedMs(writeStartNs));
}
+ long flushStartNs = System.nanoTime();
+ logDiag("JAVA_FLUSH_BEFORE",
+ "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+ + ", selectedRows=" + (rowEnd - rowStart));
batchWriter.flush();
+ logDiag("JAVA_FLUSH_AFTER",
+ "rowStart=" + rowStart + ", rowEnd=" + rowEnd
+ + ", selectedRows=" + (rowEnd - rowStart)
+ + ", costMs=" + elapsedMs(flushStartNs));
int rowsWrittenNow = rowEnd - rowStart;
writtenRows += rowsWrittenNow;
currentBlockWrittenBytes += batchEstimatedBytes;
writtenBytes += batchEstimatedBytes;
+ logDiag("JAVA_BATCH_DONE",
+ "rowsWrittenNow=" + rowsWrittenNow
+ + ", nextRowStart=" + rowEnd
+ + ", currentBlockWrittenBytes=" +
currentBlockWrittenBytes
+ + ", writtenRows=" + writtenRows
+ + ", writtenBytes=" + writtenBytes
+ + ", rotateAfterWrite=" + rotateAfterWrite);
rowStart = rowEnd;
if (rotateAfterWrite && rowStart < numRows) {
rotateCurrentBatchWriter();
}
}
+ logDiag("JAVA_WRITE_ROWS_EXIT", "rows=" + numRows + ", columns=" +
numCols);
}
private static class CountingDiscardOutputStream extends OutputStream {
@@ -339,7 +478,7 @@ public class MaxComputeJniWriter extends JniWriter {
private long estimateSingleRowPayloadBytes(VectorTable inputTable, int
numCols, int rowIndex)
throws IOException {
- try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols,
rowIndex, rowIndex + 1);
+ try (VectorSchemaRoot root = buildRowRangeRoot(inputTable, numCols,
rowIndex, rowIndex + 1, false);
ArrowWriter estimator =
ArrowWriterFactory.getRecordBatchWriter(
new CountingDiscardOutputStream(), writerOptions)) {
estimator.writeBatch(root);
@@ -347,13 +486,32 @@ public class MaxComputeJniWriter extends JniWriter {
}
}
- private VectorSchemaRoot buildRowRangeRoot(VectorTable inputTable, int
numCols, int rowStart, int rowEnd) {
+ private VectorSchemaRoot buildRowRangeRoot(VectorTable inputTable, int
numCols, int rowStart, int rowEnd,
+ boolean logColumns) {
int rowCount = rowEnd - rowStart;
VectorSchemaRoot root = batchWriter.newElement();
root.setRowCount(rowCount);
for (int col = 0; col < numCols && col < columnTypeInfos.size();
col++) {
+ long columnStartNs = System.nanoTime();
+ if (logColumns) {
+ logDiag("JAVA_BUILD_COLUMN_BEFORE",
+ "columnIndex=" + col
+ + ", columnName=" + columnNames.get(col)
+ + ", odpsType=" +
columnTypeInfos.get(col).getOdpsType()
+ + ", rowStart=" + rowStart
+ + ", rowCount=" + rowCount);
+ }
fillArrowVectorStreaming(root, col,
columnTypeInfos.get(col).getOdpsType(),
inputTable.getColumn(col), rowStart, rowCount);
+ if (logColumns) {
+ logDiag("JAVA_BUILD_COLUMN_AFTER",
+ "columnIndex=" + col
+ + ", columnName=" + columnNames.get(col)
+ + ", odpsType=" +
columnTypeInfos.get(col).getOdpsType()
+ + ", rowStart=" + rowStart
+ + ", rowCount=" + rowCount
+ + ", costMs=" + elapsedMs(columnStartNs));
+ }
}
return root;
}
@@ -904,28 +1062,42 @@ public class MaxComputeJniWriter extends JniWriter {
@Override
public void close() throws IOException {
+ long closeStartNs = System.nanoTime();
+ logDiag("JAVA_CLOSE_ENTER", "");
try {
closeCurrentBatchWriterAndCollectCommit();
if (allocator != null) {
+ long allocatorStartNs = System.nanoTime();
+ logDiag("JAVA_ALLOCATOR_CLOSE_BEFORE", "");
allocator.close();
allocator = null;
+ logDiag("JAVA_ALLOCATOR_CLOSE_AFTER", "costMs=" +
elapsedMs(allocatorStartNs));
}
LOG.info("MaxComputeJniWriter closed: writeSessionId=" +
writeSessionId
+ ", partitionSpec=" + partitionSpec
+ ", writtenRows=" + writtenRows
+ ", lastBlockId=" + currentBlockId
+ ", commitMessageCount=" + commitMessages.size());
+ logDiag("JAVA_CLOSE_EXIT", "costMs=" + elapsedMs(closeStartNs));
} catch (Exception e) {
String errorMsg = "Failed to close MaxCompute arrow writer";
+ logDiag("JAVA_CLOSE_ERROR",
+ "costMs=" + elapsedMs(closeStartNs)
+ + ", error=" + e.getClass().getName() + ": " +
e.getMessage());
LOG.error(errorMsg, e);
throw new IOException(errorMsg, e);
} finally {
+ long feClientCloseStartNs = System.nanoTime();
+ logDiag("JAVA_FE_CLIENT_CLOSE_BEFORE", "");
feClient.close();
+ logDiag("JAVA_FE_CLIENT_CLOSE_AFTER", "costMs=" +
elapsedMs(feClientCloseStartNs));
}
}
@Override
public Map<String, String> getStatistics() {
+ long startNs = System.nanoTime();
+ logDiag("JAVA_GET_STATISTICS_ENTER", "commitMessageCount=" +
commitMessages.size());
Map<String, String> stats = new HashMap<>();
stats.put("mc_partition_spec", partitionSpec != null ? partitionSpec :
"");
@@ -936,8 +1108,14 @@ public class MaxComputeJniWriter extends JniWriter {
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(commitMessages);
oos.close();
- stats.put("mc_commit_message",
Base64.getEncoder().encodeToString(baos.toByteArray()));
+ String encoded =
Base64.getEncoder().encodeToString(baos.toByteArray());
+ stats.put("mc_commit_message", encoded);
+ logDiag("JAVA_GET_STATISTICS_COMMIT_MESSAGE_SERIALIZED",
+ "commitMessageCount=" + commitMessages.size()
+ + ", encodedLength=" + encoded.length());
} catch (IOException e) {
+ logDiag("JAVA_GET_STATISTICS_ERROR",
+ "error=" + e.getClass().getName() + ": " +
e.getMessage());
LOG.error("Failed to serialize WriterCommitMessages", e);
}
}
@@ -946,6 +1124,29 @@ public class MaxComputeJniWriter extends JniWriter {
stats.put("bytes:WrittenBytes", String.valueOf(writtenBytes));
stats.put("timer:WriteTime", String.valueOf(writeTime));
stats.put("timer:ReadTableTime", String.valueOf(readTableTime));
+ logDiag("JAVA_GET_STATISTICS_EXIT",
+ "statsSize=" + stats.size()
+ + ", costMs=" + elapsedMs(startNs));
return stats;
}
+
+ private void logDiag(String stage, String detail) {
+ String message = "MC_DIAG stage=" + stage
+ + ", table=" + tableName
+ + ", txnId=" + txnId
+ + ", writeSessionId=" + writeSessionId
+ + ", partitionSpec=" + partitionSpec
+ + ", blockId=" + currentBlockId
+ + ", writtenRows=" + writtenRows
+ + ", writtenBytes=" + writtenBytes
+ + ", thread=" + Thread.currentThread().getName();
+ if (detail != null && !detail.isEmpty()) {
+ message += ", " + detail;
+ }
+ LOG.info(message);
+ }
+
+ private static long elapsedMs(long startNs) {
+ return (System.nanoTime() - startNs) / 1_000_000L;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
index 9f1c61ddf24..fa9fcefab81 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
@@ -64,8 +64,24 @@ public class MCTransaction implements Transaction {
}
public void updateMCCommitData(List<TMCCommitData> commitDataList) {
+ long incomingRows = 0;
+ int incomingCommitMessages = 0;
+ long incomingCommitMessageBytes = 0;
+ for (TMCCommitData data : commitDataList) {
+ incomingRows += data.getRowCount();
+ if (data.isSetCommitMessage() &&
!data.getCommitMessage().isEmpty()) {
+ incomingCommitMessages++;
+ incomingCommitMessageBytes += data.getCommitMessage().length();
+ }
+ }
synchronized (this) {
+ LOG.info("MC_DIAG stage=FE_UPDATE_COMMIT_DATA table={}
sessionId={} incomingDatas={} incomingRows={}"
+ + " incomingCommitMessages={}
incomingCommitMessageBytes={} existingDatas={}",
+ table == null ? "null" : table.getName(), writeSessionId,
commitDataList.size(), incomingRows,
+ incomingCommitMessages, incomingCommitMessageBytes,
this.commitDataList.size());
this.commitDataList.addAll(commitDataList);
+ LOG.info("MC_DIAG stage=FE_UPDATE_COMMIT_DATA_DONE table={}
sessionId={} totalDatas={}",
+ table == null ? "null" : table.getName(), writeSessionId,
this.commitDataList.size());
}
}
@@ -73,7 +89,15 @@ public class MCTransaction implements Transaction {
this.table = (MaxComputeExternalTable) dorisTable;
try {
+ long beginStartMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_START db={} table={}",
+ table.getDbName(), table.getName());
+ long tableIdStartMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_GET_ODPS_TABLE_ID_BEFORE db={}
table={}",
+ table.getDbName(), table.getName());
TableIdentifier tableId =
catalog.getOdpsTableIdentifier(table.getDbName(), table.getName());
+ LOG.info("MC_DIAG stage=FE_GET_ODPS_TABLE_ID_AFTER db={} table={}
costMs={}",
+ table.getDbName(), table.getName(),
System.currentTimeMillis() - tableIdStartMs);
boolean isDynamicPartition =
!table.getPartitionColumns().isEmpty();
boolean isStaticPartition = false;
@@ -94,6 +118,10 @@ public class MCTransaction implements Transaction {
}
isOverwrite = mcCtx.isOverwrite();
}
+ LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_ENTER table={}.{}
dynamicPartition={} staticPartition={}"
+ + " staticPartitionSpec={} overwrite={}
partitionColumns={}",
+ catalog.getDefaultProject(), table.getName(),
isDynamicPartition, isStaticPartition,
+ staticPartitionSpecStr, isOverwrite,
table.getPartitionColumns().size());
TableWriteSessionBuilder builder = new TableWriteSessionBuilder()
.identifier(tableId)
@@ -114,13 +142,25 @@ public class MCTransaction implements Transaction {
builder.overwrite(true);
}
+ long buildSessionStartMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_BUILD_WRITE_SESSION_BEFORE table={}.{}
dynamicPartition={}"
+ + " staticPartition={} overwrite={}",
+ catalog.getDefaultProject(), table.getName(),
isDynamicPartition, isStaticPartition, isOverwrite);
TableBatchWriteSession writeSession =
builder.buildBatchWriteSession();
writeSessionId = writeSession.getId();
nextBlockId.set(0);
+ LOG.info("MC_DIAG stage=FE_BUILD_WRITE_SESSION_AFTER table={}.{}
sessionId={} costMs={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId,
+ System.currentTimeMillis() - buildSessionStartMs);
LOG.info("Created MC Storage API write session: {} for table
{}.{}",
writeSessionId, catalog.getDefaultProject(),
table.getName());
+ LOG.info("MC_DIAG stage=FE_BEGIN_INSERT_EXIT table={}.{}
sessionId={} costMs={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId,
+ System.currentTimeMillis() - beginStartMs);
} catch (Exception e) {
+ LOG.warn("MC_DIAG stage=FE_BEGIN_INSERT_ERROR table={} error={}:
{}",
+ dorisTable.getName(), e.getClass().getName(),
e.getMessage(), e);
throw new UserException("Failed to begin insert for MaxCompute
table "
+ dorisTable.getName() + ": " + e.getMessage(), e);
}
@@ -131,6 +171,11 @@ public class MCTransaction implements Transaction {
}
public long allocateBlockIdRange(String requestWriteSessionId, long
length) throws UserException {
+ long startMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_ALLOCATE_BLOCK_ID_ENTER table={}
sessionId={} requestSessionId={} length={}"
+ + " nextBlockId={}",
+ table == null ? "null" : table.getName(), writeSessionId,
requestWriteSessionId, length,
+ nextBlockId.get());
if (length <= 0) {
throw new UserException("MaxCompute block_id allocation length
must be positive: " + length);
}
@@ -156,29 +201,48 @@ public class MCTransaction implements Transaction {
LOG.info("Allocated MaxCompute block_id range: sessionId={}, start={},
length={}",
writeSessionId, start, length);
+ LOG.info("MC_DIAG stage=FE_ALLOCATE_BLOCK_ID_EXIT table={}
sessionId={} start={} length={} nextBlockId={}"
+ + " costMs={}",
+ table == null ? "null" : table.getName(), writeSessionId,
start, length, nextBlockId.get(),
+ System.currentTimeMillis() - startMs);
return start;
}
private void appendCommitMessages(List<WriterCommitMessage> allMessages,
String encodedCommitMessage)
throws Exception {
+ long startMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_BEFORE table={}
sessionId={} encodedLength={}",
+ table == null ? "null" : table.getName(), writeSessionId,
encodedCommitMessage.length());
byte[] bytes = Base64.getDecoder().decode(encodedCommitMessage);
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);
Object payload = ois.readObject();
ois.close();
+ LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_DECODED table={}
sessionId={} payloadType={}"
+ + " decodedBytes={} costMs={}",
+ table == null ? "null" : table.getName(), writeSessionId,
+ payload == null ? "null" : payload.getClass().getName(),
bytes.length,
+ System.currentTimeMillis() - startMs);
if (payload instanceof WriterCommitMessage) {
allMessages.add((WriterCommitMessage) payload);
+ LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_AFTER table={}
sessionId={} added=1 totalMessages={}",
+ table == null ? "null" : table.getName(), writeSessionId,
allMessages.size());
return;
}
if (payload instanceof List<?>) {
+ int added = 0;
for (Object item : (List<?>) payload) {
if (!(item instanceof WriterCommitMessage)) {
throw new UserException("Unexpected MaxCompute commit
payload item type: "
+ (item == null ? "null" :
item.getClass().getName()));
}
allMessages.add((WriterCommitMessage) item);
+ added++;
}
+ LOG.info("MC_DIAG stage=FE_APPEND_COMMIT_MESSAGE_AFTER table={}
sessionId={} added={}"
+ + " totalMessages={}",
+ table == null ? "null" : table.getName(), writeSessionId,
added, allMessages.size());
return;
}
throw new UserException("Unexpected MaxCompute commit payload type: "
@@ -188,33 +252,69 @@ public class MCTransaction implements Transaction {
public void finishInsert() throws UserException {
try {
long t0 = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_FINISH_INSERT_ENTER table={}.{}
sessionId={} commitDataCount={}"
+ + " updateRows={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId, commitDataList.size(),
+ getUpdateCnt());
// Collect all WriterCommitMessages from BEs
List<WriterCommitMessage> allMessages = new ArrayList<>();
synchronized (this) {
+ LOG.info("MC_DIAG stage=FE_DESERIALIZE_COMMIT_MESSAGES_BEFORE
table={}.{} sessionId={}"
+ + " commitDataCount={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId, commitDataList.size());
for (TMCCommitData data : commitDataList) {
if (data.isSetCommitMessage() &&
!data.getCommitMessage().isEmpty()) {
+ LOG.info("MC_DIAG
stage=FE_DESERIALIZE_COMMIT_MESSAGE_ITEM table={}.{} sessionId={}"
+ + " partitionSpec={} rowCount={}
commitMessageLength={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId,
+ data.isSetPartitionSpec() ?
data.getPartitionSpec() : "",
+ data.getRowCount(),
data.getCommitMessage().length());
appendCommitMessages(allMessages,
data.getCommitMessage());
}
}
}
long t1 = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_DESERIALIZE_COMMIT_MESSAGES_AFTER
table={}.{} sessionId={}"
+ + " writerCommitMessages={} costMs={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId, allMessages.size(), t1 - t0);
// Restore session and commit all messages
+ long tableIdStartMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_COMMIT_GET_ODPS_TABLE_ID_BEFORE
table={}.{} sessionId={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId);
TableIdentifier tableId =
catalog.getOdpsTableIdentifier(table.getDbName(), table.getName());
+ LOG.info("MC_DIAG stage=FE_COMMIT_GET_ODPS_TABLE_ID_AFTER
table={}.{} sessionId={} costMs={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId,
+ System.currentTimeMillis() - tableIdStartMs);
+ LOG.info("MC_DIAG stage=FE_RESTORE_COMMIT_SESSION_BEFORE
table={}.{} sessionId={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId);
TableBatchWriteSession commitSession = new
TableWriteSessionBuilder()
.identifier(tableId)
.withSessionId(writeSessionId)
.withSettings(catalog.getSettings())
.buildBatchWriteSession();
long t2 = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_RESTORE_COMMIT_SESSION_AFTER
table={}.{} sessionId={} costMs={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId, t2 - t1);
+ LOG.info("MC_DIAG stage=FE_COMMIT_SESSION_BEFORE table={}.{}
sessionId={} writerCommitMessages={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId, allMessages.size());
commitSession.commit(allMessages.toArray(new
WriterCommitMessage[0]));
long t3 = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_COMMIT_SESSION_AFTER table={}.{}
sessionId={} writerCommitMessages={}"
+ + " costMs={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId, allMessages.size(), t3 - t2);
LOG.info("Committed MC write session {} with {} messages for table
{}.{}"
+ " Breakdown: deserialize={}ms,
restoreSession={}ms, commit={}ms, total={}ms",
writeSessionId, allMessages.size(),
catalog.getDefaultProject(), table.getName(),
t1 - t0, t2 - t1, t3 - t2, t3 - t0);
+ LOG.info("MC_DIAG stage=FE_FINISH_INSERT_EXIT table={}.{}
sessionId={} writerCommitMessages={}"
+ + " totalCostMs={}",
+ catalog.getDefaultProject(), table.getName(),
writeSessionId, allMessages.size(), t3 - t0);
} catch (Exception e) {
+ LOG.warn("MC_DIAG stage=FE_FINISH_INSERT_ERROR table={}.{}
sessionId={} error={}: {}",
+ catalog.getDefaultProject(), table == null ? "null" :
table.getName(), writeSessionId,
+ e.getClass().getName(), e.getMessage(), e);
throw new UserException("Failed to commit MaxCompute write
session: " + e.getMessage(), e);
}
}
@@ -222,11 +322,15 @@ public class MCTransaction implements Transaction {
@Override
public void commit() throws UserException {
// commit is handled in finishInsert()
+ LOG.info("MC_DIAG stage=FE_TRANSACTION_COMMIT_NOOP table={}
sessionId={}",
+ table == null ? "null" : table.getName(), writeSessionId);
}
@Override
public void rollback() {
// MC sessions auto-expire if not committed; no explicit rollback
needed
+ LOG.info("MC_DIAG stage=FE_TRANSACTION_ROLLBACK table={} sessionId={}
commitDataCount={}",
+ table == null ? "null" : table.getName(), writeSessionId,
commitDataList.size());
LOG.info("MCTransaction rollback called; uncommitted sessions will
auto-expire.");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java
index 47df06485e7..7d6cb608d31 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/MCInsertExecutor.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.maxcompute.MCTransaction;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.nereids.NereidsPlanner;
@@ -52,29 +53,69 @@ public class MCInsertExecutor extends
BaseExternalTableInsertExecutor {
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
+ LOG.info("MC_DIAG stage=FE_MC_FINALIZE_SINK_ENTER queryId={} label={}
table={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName());
// Let parent call bindDataSink() to build the Thrift sink
super.finalizeSink(fragment, sink, physicalSink);
// Save reference so beforeExec() can inject writeSessionId later
mcTableSink = (MaxComputeTableSink) sink;
+ LOG.info("MC_DIAG stage=FE_MC_FINALIZE_SINK_EXIT queryId={} label={}
table={} hasSink={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName(),
mcTableSink != null);
}
@Override
protected void beforeExec() throws UserException {
+ long startMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_MC_BEFORE_EXEC_ENTER queryId={} label={}
table={} txnId={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName(),
txnId);
// 1. Create Storage API write session as part of the transaction
MCTransaction transaction = (MCTransaction)
transactionManager.getTransaction(txnId);
+ long beginInsertStartMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_MC_BEGIN_INSERT_BEFORE queryId={} label={}
table={} txnId={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName(),
txnId);
transaction.beginInsert((MaxComputeExternalTable) table, insertCtx);
+ LOG.info("MC_DIAG stage=FE_MC_BEGIN_INSERT_AFTER queryId={} label={}
table={} txnId={} sessionId={}"
+ + " costMs={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName(),
txnId,
+ transaction.getWriteSessionId(), System.currentTimeMillis() -
beginInsertStartMs);
// 2. Inject write context into the Thrift sink before fragments are
sent to BE
if (mcTableSink != null) {
+ LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_BEFORE queryId={}
label={} table={} txnId={}"
+ + " sessionId={}",
+ DebugUtil.printId(ctx.queryId()), labelName,
table.getName(), txnId,
+ transaction.getWriteSessionId());
mcTableSink.setWriteContext(txnId,
transaction.getWriteSessionId());
+ LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_AFTER queryId={}
label={} table={} txnId={}"
+ + " sessionId={}",
+ DebugUtil.printId(ctx.queryId()), labelName,
table.getName(), txnId,
+ transaction.getWriteSessionId());
+ } else {
+ LOG.info("MC_DIAG stage=FE_MC_SET_WRITE_CONTEXT_SKIP queryId={}
label={} table={} txnId={}",
+ DebugUtil.printId(ctx.queryId()), labelName,
table.getName(), txnId);
}
+ LOG.info("MC_DIAG stage=FE_MC_BEFORE_EXEC_EXIT queryId={} label={}
table={} txnId={} sessionId={}"
+ + " costMs={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName(),
txnId,
+ transaction.getWriteSessionId(), System.currentTimeMillis() -
startMs);
}
@Override
protected void doBeforeCommit() throws UserException {
+ long startMs = System.currentTimeMillis();
+ LOG.info("MC_DIAG stage=FE_MC_DO_BEFORE_COMMIT_ENTER queryId={}
label={} table={} txnId={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName(),
txnId);
MCTransaction transaction = (MCTransaction)
transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
+ LOG.info("MC_DIAG stage=FE_MC_FINISH_INSERT_BEFORE queryId={} label={}
table={} txnId={} loadedRows={}"
+ + " sessionId={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName(),
txnId, loadedRows,
+ transaction.getWriteSessionId());
transaction.finishInsert();
+ LOG.info("MC_DIAG stage=FE_MC_FINISH_INSERT_AFTER queryId={} label={}
table={} txnId={} loadedRows={}"
+ + " sessionId={} costMs={}",
+ DebugUtil.printId(ctx.queryId()), labelName, table.getName(),
txnId, loadedRows,
+ transaction.getWriteSessionId(), System.currentTimeMillis() -
startMs);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
index 98537fa0307..93be5a914fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java
@@ -28,6 +28,9 @@ import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TMaxComputeTableSink;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -37,6 +40,8 @@ import java.util.stream.Collectors;
public class MaxComputeTableSink extends BaseExternalTableDataSink {
+ private static final Logger LOG =
LogManager.getLogger(MaxComputeTableSink.class);
+
private final MaxComputeExternalTable targetTable;
public MaxComputeTableSink(MaxComputeExternalTable targetTable) {
@@ -97,6 +102,11 @@ public class MaxComputeTableSink extends
BaseExternalTableDataSink {
tDataSink = new TDataSink(TDataSinkType.MAXCOMPUTE_TABLE_SINK);
tDataSink.setMaxComputeTableSink(tSink);
+ LOG.info("MC_DIAG stage=FE_MC_BIND_SINK table={} project={}
endpoint={} partitionColumns={}"
+ + " hasStaticPartition={} connectTimeout={}
readTimeout={} retryCount={}",
+ targetTable.getName(), catalog.getDefaultProject(),
catalog.getEndpoint(),
+ partitionColumnNames.size(), tSink.isSetStaticPartitionSpec(),
catalog.getConnectTimeout(),
+ catalog.getReadTimeout(), catalog.getRetryTimes());
}
/**
@@ -106,8 +116,15 @@ public class MaxComputeTableSink extends
BaseExternalTableDataSink {
*/
public void setWriteContext(long txnId, String writeSessionId) {
if (tDataSink != null && tDataSink.isSetMaxComputeTableSink()) {
+ LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_BEFORE
table={} txnId={} sessionId={}",
+ targetTable.getName(), txnId, writeSessionId);
tDataSink.getMaxComputeTableSink().setTxnId(txnId);
tDataSink.getMaxComputeTableSink().setWriteSessionId(writeSessionId);
+ LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_AFTER
table={} txnId={} sessionId={}",
+ targetTable.getName(), txnId, writeSessionId);
+ } else {
+ LOG.info("MC_DIAG stage=FE_MC_SET_SINK_WRITE_CONTEXT_SKIP table={}
txnId={} sessionId={}",
+ targetTable.getName(), txnId, writeSessionId);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dac24f2f798..321a5aab912 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2535,6 +2535,20 @@ public class Coordinator implements CoordInterface {
.updateIcebergCommitData(params.getIcebergCommitDatas());
}
if (params.isSetMcCommitDatas()) {
+ long mcRows = 0;
+ int mcCommitMessages = 0;
+ long mcCommitMessageBytes = 0;
+ for (org.apache.doris.thrift.TMCCommitData data :
params.getMcCommitDatas()) {
+ mcRows += data.getRowCount();
+ if (data.isSetCommitMessage() &&
!data.getCommitMessage().isEmpty()) {
+ mcCommitMessages++;
+ mcCommitMessageBytes += data.getCommitMessage().length();
+ }
+ }
+ LOG.info("MC_DIAG stage=FE_COORDINATOR_MC_COMMIT_DATA queryId={}
txnId={} fragmentId={} backendId={}"
+ + " datas={} rows={} commitMessages={}
commitMessageBytes={}",
+ DebugUtil.printId(queryId), txnId, params.getFragmentId(),
params.getBackendId(),
+ params.getMcCommitDatasSize(), mcRows, mcCommitMessages,
mcCommitMessageBytes);
((MCTransaction)
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
.updateMCCommitData(params.getMcCommitDatas());
}
@@ -3554,4 +3568,3 @@ public class Coordinator implements CoordInterface {
this.queryOptions.setEnableProfile(isSafe &&
queryOptions.isEnableProfile());
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
index 45995a7aad7..48c08700494 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
@@ -231,6 +231,21 @@ public class LoadProcessor extends AbstractJobProcessor {
.updateIcebergCommitData(params.getIcebergCommitDatas());
}
if (params.isSetMcCommitDatas()) {
+ long mcRows = 0;
+ int mcCommitMessages = 0;
+ long mcCommitMessageBytes = 0;
+ for (org.apache.doris.thrift.TMCCommitData data :
params.getMcCommitDatas()) {
+ mcRows += data.getRowCount();
+ if (data.isSetCommitMessage() &&
!data.getCommitMessage().isEmpty()) {
+ mcCommitMessages++;
+ mcCommitMessageBytes += data.getCommitMessage().length();
+ }
+ }
+ LOG.info("MC_DIAG stage=FE_LOAD_PROCESSOR_MC_COMMIT_DATA
queryId={} txnId={} fragmentId={}"
+ + " backendId={} datas={} rows={}
commitMessages={} commitMessageBytes={}",
+ DebugUtil.printId(coordinatorContext.queryId), txnId,
params.getFragmentId(),
+ params.getBackendId(), params.getMcCommitDatasSize(),
mcRows, mcCommitMessages,
+ mcCommitMessageBytes);
((MCTransaction)
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
.updateMCCommitData(params.getMcCommitDatas());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 99dd56cbe42..602d89f29bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2915,8 +2915,11 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
@Override
public TMaxComputeBlockIdResult
getMaxComputeBlockIdRange(TMaxComputeBlockIdRequest request) {
+ long startMs = System.currentTimeMillis();
String clientAddr = getClientAddrAsString();
LOG.info("receive getMaxComputeBlockIdRange request: {}, backend: {}",
request, clientAddr);
+ LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_ENTER txnId={} sessionId={}
length={} backend={}",
+ request.getTxnId(), request.getWriteSessionId(),
request.getLength(), clientAddr);
TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult();
TStatus status = checkMaster();
@@ -2924,10 +2927,16 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (status.getStatusCode() != TStatusCode.OK) {
result.setMasterAddress(getMasterAddress());
+ LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_NOT_MASTER txnId={}
sessionId={} backend={} status={}"
+ + " master={} costMs={}",
+ request.getTxnId(), request.getWriteSessionId(),
clientAddr, status.getStatusCode(),
+ result.getMasterAddress(), System.currentTimeMillis() -
startMs);
return result;
}
try {
+ LOG.info("MC_DIAG stage=FE_BLOCK_ID_GET_TXN_BEFORE txnId={}
sessionId={} backend={}",
+ request.getTxnId(), request.getWriteSessionId(),
clientAddr);
Transaction transaction =
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr()
.getTxnById(request.getTxnId());
if (!(transaction instanceof MCTransaction)) {
@@ -2935,25 +2944,42 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
+ " is not a MaxCompute transaction");
}
+ LOG.info("MC_DIAG stage=FE_BLOCK_ID_ALLOCATE_BEFORE txnId={}
sessionId={} length={} backend={}",
+ request.getTxnId(), request.getWriteSessionId(),
request.getLength(), clientAddr);
long start = ((MCTransaction) transaction).allocateBlockIdRange(
request.getWriteSessionId(), request.getLength());
result.setStart(start);
result.setLength(request.getLength());
+ LOG.info("MC_DIAG stage=FE_BLOCK_ID_ALLOCATE_AFTER txnId={}
sessionId={} start={} length={}"
+ + " backend={} costMs={}",
+ request.getTxnId(), request.getWriteSessionId(), start,
request.getLength(), clientAddr,
+ System.currentTimeMillis() - startMs);
} catch (UserException e) {
+ LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_USER_ERROR txnId={}
sessionId={} backend={} error={}",
+ request.getTxnId(), request.getWriteSessionId(),
clientAddr, e.getMessage(), e);
LOG.warn("failed to allocate MaxCompute block_id, txnId={},
sessionId={}, errmsg={}",
request.getTxnId(), request.getWriteSessionId(),
e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (RuntimeException e) {
+ LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_RUNTIME_ERROR txnId={}
sessionId={} backend={} error={}",
+ request.getTxnId(), request.getWriteSessionId(),
clientAddr, e.getMessage(), e);
LOG.warn("failed to allocate MaxCompute block_id, txnId={},
sessionId={}, errmsg={}",
request.getTxnId(), request.getWriteSessionId(),
e.getMessage(), e);
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
} catch (Throwable e) {
+ LOG.warn("MC_DIAG stage=FE_BLOCK_ID_RPC_UNKNOWN_ERROR txnId={}
sessionId={} backend={} error={}",
+ request.getTxnId(), request.getWriteSessionId(),
clientAddr, e.getMessage(), e);
LOG.warn("catch unknown result when allocating MaxCompute
block_id.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " +
Strings.nullToEmpty(e.getMessage()));
}
+ LOG.info("MC_DIAG stage=FE_BLOCK_ID_RPC_EXIT txnId={} sessionId={}
status={} startSet={} start={} lengthSet={}"
+ + " length={} backend={} costMs={}",
+ request.getTxnId(), request.getWriteSessionId(),
status.getStatusCode(), result.isSetStart(),
+ result.isSetStart() ? result.getStart() : -1,
result.isSetLength(),
+ result.isSetLength() ? result.getLength() : -1, clientAddr,
System.currentTimeMillis() - startMs);
return result;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]