This is an automated email from the ASF dual-hosted git repository. wyf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new d79cdc8 [Bug] Filter out unavaliable backends when getting tablet location (#6204) d79cdc8 is described below commit d79cdc829ffcc742a9331fb4705fa93bce1dbd85 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Tue Jul 13 11:17:49 2021 +0800 [Bug] Filter out unavaliable backends when getting tablet location (#6204) * [Bug] Filter out unavaiable backends when getting scan range location In the previous implementation, we will eliminate non-surviving BEs in the Coordinator phase. But for Spark or Flink Connector, there is no such logic, so when a BE node is down, it will cause the problem of querying errors through the Connector. * fix ut * fix compiule --- be/src/http/action/stream_load.cpp | 10 ++++++++-- .../src/main/java/org/apache/doris/planner/OlapScanNode.java | 5 +++-- .../src/test/java/org/apache/doris/http/DorisHttpTestCase.java | 3 +++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index f1c8e2e..ac409b1 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -268,15 +268,21 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct ctx->body_bytes = 0; size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024; + bool read_json_by_line = false; + if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) { + if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { + read_json_by_line = true; + } + } if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); // json max body size if ((ctx->format == TFileFormatType::FORMAT_JSON) && - (ctx->body_bytes > json_max_body_bytes)) { + (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) { std::stringstream ss; ss << "The size of this batch exceed the max size [" << json_max_body_bytes << "] of json type data " - << " data [ " << ctx->body_bytes << " ]"; + << " data [ " << ctx->body_bytes << " ]. Split the file, or use 'read_json_by_line'"; return Status::InternalError(ss.str()); } // csv max body size diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 143c175..62948c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -430,8 +430,9 @@ public class OlapScanNode extends ScanNode { boolean collectedStat = false; for (Replica replica : replicas) { Backend backend = Catalog.getCurrentSystemInfo().getBackend(replica.getBackendId()); - if (backend == null) { - LOG.debug("replica {} not exists", replica.getBackendId()); + if (backend == null || !backend.isAlive()) { + LOG.debug("backend {} not exists or is not alive for replica {}", + replica.getBackendId(), replica.getId()); continue; } String ip = backend.getHost(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index 94fa7f0..14abfbc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -263,10 +263,13 @@ abstract public class DorisHttpTestCase { private static void assignBackends() { Backend backend1 = new Backend(testBackendId1, "node-1", 9308); backend1.setBePort(9300); + backend1.setAlive(true); Backend backend2 = new Backend(testBackendId2, "node-2", 9308); backend2.setBePort(9300); + backend2.setAlive(true); Backend backend3 = new Backend(testBackendId3, "node-3", 9308); backend3.setBePort(9300); + backend3.setAlive(true); Catalog.getCurrentSystemInfo().addBackend(backend1); Catalog.getCurrentSystemInfo().addBackend(backend2); Catalog.getCurrentSystemInfo().addBackend(backend3); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org