morningman closed pull request #445: Add log to detect empty load file
URL: https://github.com/apache/incubator-doris/pull/445
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/be/src/agent/pusher.cpp b/be/src/agent/pusher.cpp
index cbf2f70e..eab1f1b5 100644
--- a/be/src/agent/pusher.cpp
+++ b/be/src/agent/pusher.cpp
@@ -117,6 +117,7 @@ void Pusher::_get_file_name_from_path(const string&
file_path, string* file_name
AgentStatus Pusher::process(vector<TTabletInfo>* tablet_infos) {
AgentStatus status = DORIS_SUCCESS;
+
// Remote file not empty, need to download
if (_push_req.__isset.http_file_path) {
// Get file length and timeout
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 6d933378..751f1a3b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -750,36 +750,45 @@ void* TaskWorkerPool::_push_worker_thread_callback(void*
arg_this) {
LOG(INFO) << "get push task. signature: " << agent_task_req.signature
<< " user: " << user << " priority: " << priority;
+
vector<TTabletInfo> tablet_infos;
if (push_req.push_type == TPushType::LOAD || push_req.push_type ==
TPushType::LOAD_DELETE) {
+ if (!push_req.__isset.http_file_path) {
+ LOG(WARNING) << "push request does not set load file for
tablet: "
+ << agent_task_req.signature;
+ status = DORIS_FILE_DOWNLOAD_NOT_EXIST;
+ }
+
+ if (status == DORIS_SUCCESS) {
#ifndef BE_TEST
- Pusher pusher(worker_pool_this->_env->olap_engine(), push_req);
- status = pusher.init();
+ Pusher pusher(worker_pool_this->_env->olap_engine(), push_req);
+ status = pusher.init();
#else
- status = worker_pool_this->_pusher->init();
+ status = worker_pool_this->_pusher->init();
#endif
- if (status == DORIS_SUCCESS) {
- uint32_t retry_time = 0;
- while (retry_time < PUSH_MAX_RETRY) {
+ if (status == DORIS_SUCCESS) {
+ uint32_t retry_time = 0;
+ while (retry_time < PUSH_MAX_RETRY) {
#ifndef BE_TEST
- status = pusher.process(&tablet_infos);
+ status = pusher.process(&tablet_infos);
#else
- status = worker_pool_this->_pusher->process(&tablet_infos);
+ status =
worker_pool_this->_pusher->process(&tablet_infos);
#endif
- if (status == DORIS_PUSH_HAD_LOADED) {
- OLAP_LOG_WARNING("transaction exists when realtime
push, "
- "but unfinished, do not report to fe,
signature: %ld",
- agent_task_req.signature);
- break; // not retry any more
- }
- // Internal error, need retry
- if (status == DORIS_ERROR) {
- OLAP_LOG_WARNING("push internal error, need
retry.signature: %ld",
- agent_task_req.signature);
- retry_time += 1;
- } else {
- break;
+ if (status == DORIS_PUSH_HAD_LOADED) {
+ OLAP_LOG_WARNING("transaction exists when realtime
push, "
+ "but unfinished, do not report to fe,
signature: %ld",
+ agent_task_req.signature);
+ break; // not retry any more
+ }
+ // Internal error, need retry
+ if (status == DORIS_ERROR) {
+ OLAP_LOG_WARNING("push internal error, need
retry.signature: %ld",
+ agent_task_req.signature);
+ retry_time += 1;
+ } else {
+ break;
+ }
}
}
}
diff --git a/fe/src/main/java/org/apache/doris/load/LoadChecker.java
b/fe/src/main/java/org/apache/doris/load/LoadChecker.java
index 91a8ba34..97d5b8c1 100644
--- a/fe/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -448,6 +448,11 @@ private void tryCommitJob(LoadJob job, Database db) {
type = TPushType.LOAD_DELETE;
}
+ if (type == TPushType.LOAD && (filePath == null ||
fileSize < 0)) {
+ LOG.warn("get empty load file for tablet {}",
tabletId);
+ continue;
+ }
+
// add task to batchTask
Set<Long> allReplicas = new HashSet<Long>();
Set<Long> finishedReplicas = new HashSet<Long>();
diff --git a/fe/src/main/java/org/apache/doris/task/LoadEtlTask.java
b/fe/src/main/java/org/apache/doris/task/LoadEtlTask.java
index 350a388d..236775cd 100644
--- a/fe/src/main/java/org/apache/doris/task/LoadEtlTask.java
+++ b/fe/src/main/java/org/apache/doris/task/LoadEtlTask.java
@@ -242,19 +242,15 @@ protected String getPartitionIndexBucketString(String
filePath) throws LoadExcep
db.readLock();
try {
table = (OlapTable) db.getTable(tableId);
- } finally {
- db.readUnlock();
- }
- if (table == null) {
- throw new LoadException("table does not exist. id: " +
tableId);
- }
-
- TableLoadInfo tableLoadInfo = tableEntry.getValue();
- for (Entry<Long, PartitionLoadInfo> partitionEntry :
tableLoadInfo.getIdToPartitionLoadInfo().entrySet()) {
- long partitionId = partitionEntry.getKey();
- boolean needLoad = false;
- db.readLock();
- try {
+ if (table == null) {
+ throw new LoadException("table does not exist. id: " +
tableId);
+ }
+
+ TableLoadInfo tableLoadInfo = tableEntry.getValue();
+ for (Entry<Long, PartitionLoadInfo> partitionEntry :
tableLoadInfo.getIdToPartitionLoadInfo().entrySet()) {
+ long partitionId = partitionEntry.getKey();
+ boolean needLoad = false;
+
Partition partition = table.getPartition(partitionId);
if (partition == null) {
throw new LoadException("partition does not exist. id:
" + partitionId);
@@ -292,9 +288,10 @@ protected String getPartitionIndexBucketString(String
filePath) throws LoadExcep
// partition might have no load data
partitionEntry.getValue().setNeedLoad(needLoad);
- } finally {
- db.readUnlock();
+
}
+ } finally {
+ db.readUnlock();
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]