github-actions[bot] commented on code in PR #43536: URL: https://github.com/apache/doris/pull/43536#discussion_r1839436505
########## be/src/http/action/stream_load.cpp: ########## @@ -873,4 +908,518 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, return Status::OK(); } +void StreamLoadAction::_httpstream_handle(HttpRequest* req, + std::shared_ptr<StreamLoadContext> ctx) { + // status already set to fail + if (ctx->status.ok()) { + ctx->status = _http_stream_handle(req, ctx); + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + } + } + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; + + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + } + + if (!ctx->status.ok()) { + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + return; + } + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } + // update statistics + streaming_load_requests_total->increment(1); + streaming_load_duration_ms->increment(ctx->load_cost_millis); +} + +Status StreamLoadAction::_http_stream_handle(HttpRequest* http_req, + std::shared_ptr<StreamLoadContext> ctx) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes"); + } + RETURN_IF_ERROR(ctx->body_sink->finish()); + + // wait stream load finish + RETURN_IF_ERROR(ctx->future.get()); + + if (ctx->group_commit) { + LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); + return Status::OK(); + } + + if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); + ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + } + return Status::OK(); +} + +int StreamLoadAction::_httpstream_on_header(HttpRequest* req, + std::shared_ptr<StreamLoadContext> ctx) { + req->set_handler_ctx(ctx); + + ctx->load_type = TLoadType::MANUL_LOAD; + ctx->load_src_type = TLoadSourceType::RAW; + ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; + Status st = _handle_group_commit(req, ctx); + + LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql : " << ctx->sql_str + << ", group_commit=" << ctx->group_commit; + if (st.ok()) { + st = _http_stream_on_header(req, ctx); + } + if (!st.ok()) { + ctx->status = std::move(st); + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + auto str = ctx->to_json(); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } + return -1; + } + return 0; +} + +Status StreamLoadAction::_http_stream_on_header(HttpRequest* http_req, + std::shared_ptr<StreamLoadContext> ctx) { + // auth information + if (!parse_basic_auth(*http_req, &ctx->auth)) { + LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); + return Status::NotAuthorized("no valid Basic authorization"); + } + + // TODO(zs) : need Need to request an FE to obtain information such as format + // check content length + ctx->body_bytes = 0; + size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + try { + ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + } catch (const std::exception& e) { + return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", + http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); + } + // csv max body size + if (ctx->body_bytes > csv_max_body_bytes) { + LOG(WARNING) << "body exceed max size." << ctx->brief(); + return Status::Error<ErrorCode::EXCEEDED_LIMIT>( + "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " + "are sure this load is reasonable", + ctx->body_bytes, csv_max_body_bytes); + } + } + + auto pipe = std::make_shared<io::StreamLoadPipe>( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + ctx->body_bytes /* total_length */); + ctx->body_sink = pipe; + ctx->pipe = pipe; + + RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); + + // Here, transactions are set from fe's NativeInsertStmt. + // TODO(zs) : How to support two_phase_commit + + return Status::OK(); +} + +void StreamLoadAction::_httpstream_on_chunk_data(HttpRequest* req, + std::shared_ptr<StreamLoadContext> ctx) { + if (!req->header(HTTP_WAL_ID_KY).empty()) { + ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY)); + } + struct evhttp_request* ev_req = req->get_evhttp_request(); + auto evbuf = evhttp_request_get_input_buffer(ev_req); + + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + + int64_t start_read_data_time = MonotonicNanos(); + Status st = ctx->allocate_schema_buffer(); + if (!st.ok()) { + ctx->status = st; + return; + } + while (evbuffer_get_length(evbuf) > 0) { + ByteBufferPtr bb; + st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + st = ctx->body_sink->append(bb); + // schema_buffer stores 1M of data for parsing column information + // need to determine whether to cache for the first time + if (ctx->is_read_schema) { + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); + } else { + LOG(INFO) << "use a portion of data to request fe to obtain column information"; + ctx->is_read_schema = false; + ctx->status = httpstream_process_put(req, ctx); + } + } + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; + } + // after all the data has been read and it has not reached 1M, it will execute here + if (ctx->is_read_schema) { + LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute " + << "here"; + ctx->is_read_schema = false; + ctx->status = httpstream_process_put(req, ctx); + } + ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); +} + +Status StreamLoadAction::httpstream_process_put(HttpRequest* http_req, + std::shared_ptr<StreamLoadContext> ctx) { + TStreamLoadPutRequest request; + if (http_req != nullptr) { + request.__set_load_sql(ctx->sql_str); + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { + bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); + request.__set_memtable_on_sink_node(value); + } + request.__set_columns(http_req->header(HTTP_COLUMNS)); + } else { + request.__set_token(ctx->auth.token); + request.__set_load_sql(ctx->sql_str); + ctx->auth.token = ""; + } + set_request_auth(&request, ctx->auth); + request.__set_loadId(ctx->id.to_thrift()); + request.__set_label(ctx->label); + if (ctx->group_commit) { + if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { + request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); + } else { + // used for wait_internal_group_commit_finish + request.__set_group_commit_mode("sync_mode"); + } + } + if (_exec_env->cluster_info()->backend_id != 0) { + request.__set_backend_id(_exec_env->cluster_info()->backend_id); + } else { + LOG(WARNING) << "_exec_env->master_info not set backend_id"; + } + + // plan this load + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; + int64_t stream_load_put_start_time = MonotonicNanos(); + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, ctx](FrontendServiceConnection& client) { + client->streamLoadPut(ctx->put_result, request); + })); + ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; + Status plan_status(Status::create(ctx->put_result.status)); + if (!plan_status.ok()) { + LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); + return plan_status; + } + if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { + return Status::NotSupported("http stream 2pc is unsupported for mow table"); + } + ctx->db = ctx->put_result.pipeline_params.db_name; + ctx->table = ctx->put_result.pipeline_params.table_name; + ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id; + ctx->label = ctx->put_result.pipeline_params.import_label; + ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id); + if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { + // FIXME find a way to avoid chunked stream load write large WALs + size_t content_length = 0; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + try { + content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + } catch (const std::exception& e) { + return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", + http_req->header(HttpHeaders::CONTENT_LENGTH), + e.what()); + } + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; + } + } + ctx->put_result.pipeline_params.__set_content_length(content_length); + } + + return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); +} + +std::string escapeString(const std::string& str) { + std::stringstream ss; + for (char ch : str) { + switch (ch) { + case '\'': + ss << "\\\'"; + break; + case '\"': + ss << "\\\""; + break; + case '\\': + ss << "\\\\"; + break; + case '\a': + ss << "\\a"; + break; + case '\b': + ss << "\\b"; + break; + case '\f': + ss << "\\f"; + break; + case '\n': + ss << "\\n"; + break; + case '\r': + ss << "\\r"; + break; + case '\t': + ss << "\\t"; + break; + case '\v': + ss << "\\v"; + break; + default: + ss << ch; + } + } + return ss.str(); +} + +Status StreamLoadAction::_parse_header(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) { Review Comment: warning: function '_parse_header' has cognitive complexity of 171 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status StreamLoadAction::_parse_header(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) { ^ ``` <details> <summary>Additional context</summary> **be/src/http/action/stream_load.cpp:1246:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_ENCLOSE).empty() && !req->header(HTTP_ENCLOSE).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1246:** +1 ```cpp if (!req->header(HTTP_ENCLOSE).empty() && !req->header(HTTP_ENCLOSE).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1248:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (enclose_str.length() != 1) { ^ ``` **be/src/http/action/stream_load.cpp:1253:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_ESCAPE).empty() && !req->header(HTTP_ESCAPE).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1253:** +1 ```cpp if (!req->header(HTTP_ESCAPE).empty() && !req->header(HTTP_ESCAPE).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1255:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (escape_str.length() != 1) { ^ ``` **be/src/http/action/stream_load.cpp:1260:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_PARTITIONS).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1261:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!req->header(HTTP_TEMP_PARTITIONS).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1266:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_TEMP_PARTITIONS).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1267:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!req->header(HTTP_PARTITIONS).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1273:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1276:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp } catch (const std::invalid_argument& e) { ^ ``` **be/src/http/action/stream_load.cpp:1279:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp } catch (const std::out_of_range& e) { ^ ``` **be/src/http/action/stream_load.cpp:1288:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_MERGE_TYPE).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1291:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (iter != merge_type_map.end()) { ^ ``` **be/src/http/action/stream_load.cpp:1293:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/http/action/stream_load.cpp:1296:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (merge_type == TMergeType::MERGE && req->header(HTTP_DELETE_CONDITION).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1298:** +1, nesting level increased to 2 ```cpp } else if (merge_type != TMergeType::MERGE && !req->header(HTTP_DELETE_CONDITION).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1304:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1311:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (iter != unique_key_update_mode_map.end()) { ^ ``` **be/src/http/action/stream_load.cpp:1313:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) { ^ ``` **be/src/http/action/stream_load.cpp:1315:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (ctx->format != TFileFormatType::FORMAT_JSON) { ^ ``` **be/src/http/action/stream_load.cpp:1320:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!req->header(HTTP_FUZZY_PARSE).empty() && ^ ``` **be/src/http/action/stream_load.cpp:1320:** +1 ```cpp if (!req->header(HTTP_FUZZY_PARSE).empty() && ^ ``` **be/src/http/action/stream_load.cpp:1325:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!req->header(HTTP_COLUMNS).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1329:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!req->header(HTTP_JSONPATHS).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1333:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!req->header(HTTP_HIDDEN_COLUMNS).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1338:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1343:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!req->header(HTTP_MERGE_TYPE).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1348:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!req->header(HTTP_WHERE).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1354:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/http/action/stream_load.cpp:1362:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_FORMAT_KEY).empty()) { ^ ``` **be/src/http/action/stream_load.cpp:1365:** +1, nesting level increased to 1 ```cpp } else { ^ ``` **be/src/http/action/stream_load.cpp:1378:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_COLUMNS) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1378:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_COLUMNS) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1379:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_COLUMN_SEPARATOR) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1379:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_COLUMN_SEPARATOR) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1380:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_COMPRESS_TYPE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1380:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_COMPRESS_TYPE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1381:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_LINE_DELIMITER) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1381:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_LINE_DELIMITER) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1382:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_ENCLOSE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1382:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_ENCLOSE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1383:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_ESCAPE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1383:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_ESCAPE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1384:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_PARTITIONS) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1384:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_PARTITIONS) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1385:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_TEMP_PARTITIONS) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1385:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_TEMP_PARTITIONS) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1386:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_NEGATIVE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1386:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_NEGATIVE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1387:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_STRICT_MODE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1387:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_STRICT_MODE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1388:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_TIME_ZONE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1388:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_TIME_ZONE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1389:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_EXEC_MEM_LIMIT) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1389:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_EXEC_MEM_LIMIT) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1390:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_JSONPATHS) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1390:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_JSONPATHS) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1391:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_JSONROOT) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1391:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_JSONROOT) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1392:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_STRIP_OUTER_ARRAY) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1392:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_STRIP_OUTER_ARRAY) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1393:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_NUM_AS_STRING) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1393:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_NUM_AS_STRING) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1394:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_FUZZY_PARSE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1394:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_FUZZY_PARSE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1395:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_READ_JSON_BY_LINE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1395:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_READ_JSON_BY_LINE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1396:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1396:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1397:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_SEND_BATCH_PARALLELISM) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1397:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_SEND_BATCH_PARALLELISM) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1398:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_LOAD_TO_SINGLE_TABLET) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1398:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_LOAD_TO_SINGLE_TABLET) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1399:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_MERGE_TYPE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1399:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_MERGE_TYPE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1400:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_DELETE_CONDITION) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1400:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_DELETE_CONDITION) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1401:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_MAX_FILTER_RATIO) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1401:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_MAX_FILTER_RATIO) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1402:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_HIDDEN_COLUMNS) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1402:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_HIDDEN_COLUMNS) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1403:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_TRIM_DOUBLE_QUOTES) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1403:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_TRIM_DOUBLE_QUOTES) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1404:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_SKIP_LINES) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1404:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_SKIP_LINES) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1405:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_ENABLE_PROFILE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1405:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_ENABLE_PROFILE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1406:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_UNIQUE_KEY_UPDATE_MODE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1406:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_UNIQUE_KEY_UPDATE_MODE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1407:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_MEMTABLE_ON_SINKNODE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1407:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_MEMTABLE_ON_SINKNODE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1408:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_LOAD_STREAM_PER_NODE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1408:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_LOAD_STREAM_PER_NODE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1409:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_GROUP_COMMIT) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1409:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_GROUP_COMMIT) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1410:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_CLOUD_CLUSTER) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1410:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_CLOUD_CLUSTER) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1411:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp __ADD_IF_EXIST(HTTP_UNIQUE_KEY_UPDATE_MODE) ^ ``` **be/src/http/action/stream_load.cpp:1371:** expanded from macro '__ADD_IF_EXIST' ```cpp if (!req->header(PROPERTIY_KEY).empty()) { \ ^ ``` **be/src/http/action/stream_load.cpp:1411:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp __ADD_IF_EXIST(HTTP_UNIQUE_KEY_UPDATE_MODE) ^ ``` **be/src/http/action/stream_load.cpp:1372:** expanded from macro '__ADD_IF_EXIST' ```cpp if (pri) { \ ^ ``` **be/src/http/action/stream_load.cpp:1416:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!req->header(HTTP_WHERE).empty()) { ^ ``` </details> ########## be/src/http/action/stream_load.cpp: ########## @@ -873,4 +908,518 @@ return Status::OK(); } +void StreamLoadAction::_httpstream_handle(HttpRequest* req, + std::shared_ptr<StreamLoadContext> ctx) { + // status already set to fail + if (ctx->status.ok()) { + ctx->status = _http_stream_handle(req, ctx); + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + } + } + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; + + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + } + + if (!ctx->status.ok()) { + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + return; + } + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } + // update statistics + streaming_load_requests_total->increment(1); + streaming_load_duration_ms->increment(ctx->load_cost_millis); +} + +Status StreamLoadAction::_http_stream_handle(HttpRequest* http_req, + std::shared_ptr<StreamLoadContext> ctx) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes"); + } + RETURN_IF_ERROR(ctx->body_sink->finish()); + + // wait stream load finish + RETURN_IF_ERROR(ctx->future.get()); + + if (ctx->group_commit) { + LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); + return Status::OK(); + } + + if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); + ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + } + return Status::OK(); +} + +int StreamLoadAction::_httpstream_on_header(HttpRequest* req, + std::shared_ptr<StreamLoadContext> ctx) { + req->set_handler_ctx(ctx); + + ctx->load_type = TLoadType::MANUL_LOAD; + ctx->load_src_type = TLoadSourceType::RAW; + ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; + Status st = _handle_group_commit(req, ctx); + + LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql : " << ctx->sql_str + << ", group_commit=" << ctx->group_commit; + if (st.ok()) { + st = _http_stream_on_header(req, ctx); + } + if (!st.ok()) { + ctx->status = std::move(st); + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + auto str = ctx->to_json(); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } + return -1; + } + return 0; +} + +Status StreamLoadAction::_http_stream_on_header(HttpRequest* http_req, + std::shared_ptr<StreamLoadContext> ctx) { + // auth information + if (!parse_basic_auth(*http_req, &ctx->auth)) { + LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); + return Status::NotAuthorized("no valid Basic authorization"); + } + + // TODO(zs) : need Need to request an FE to obtain information such as format + // check content length + ctx->body_bytes = 0; + size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + try { + ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + } catch (const std::exception& e) { + return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", + http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); + } + // csv max body size + if (ctx->body_bytes > csv_max_body_bytes) { + LOG(WARNING) << "body exceed max size." << ctx->brief(); + return Status::Error<ErrorCode::EXCEEDED_LIMIT>( + "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " + "are sure this load is reasonable", + ctx->body_bytes, csv_max_body_bytes); + } + } + + auto pipe = std::make_shared<io::StreamLoadPipe>( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + ctx->body_bytes /* total_length */); + ctx->body_sink = pipe; + ctx->pipe = pipe; + + RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); + + // Here, transactions are set from fe's NativeInsertStmt. + // TODO(zs) : How to support two_phase_commit + + return Status::OK(); +} + +void StreamLoadAction::_httpstream_on_chunk_data(HttpRequest* req, + std::shared_ptr<StreamLoadContext> ctx) { + if (!req->header(HTTP_WAL_ID_KY).empty()) { + ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY)); + } + struct evhttp_request* ev_req = req->get_evhttp_request(); + auto evbuf = evhttp_request_get_input_buffer(ev_req); + + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + + int64_t start_read_data_time = MonotonicNanos(); + Status st = ctx->allocate_schema_buffer(); + if (!st.ok()) { + ctx->status = st; + return; + } + while (evbuffer_get_length(evbuf) > 0) { + ByteBufferPtr bb; + st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + st = ctx->body_sink->append(bb); + // schema_buffer stores 1M of data for parsing column information + // need to determine whether to cache for the first time + if (ctx->is_read_schema) { + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); + } else { + LOG(INFO) << "use a portion of data to request fe to obtain column information"; + ctx->is_read_schema = false; + ctx->status = httpstream_process_put(req, ctx); + } + } + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; + } + // after all the data has been read and it has not reached 1M, it will execute here + if (ctx->is_read_schema) { + LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute " + << "here"; + ctx->is_read_schema = false; + ctx->status = httpstream_process_put(req, ctx); + } + ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); +} + +Status StreamLoadAction::httpstream_process_put(HttpRequest* http_req, + std::shared_ptr<StreamLoadContext> ctx) { + TStreamLoadPutRequest request; + if (http_req != nullptr) { + request.__set_load_sql(ctx->sql_str); + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { + bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); + request.__set_memtable_on_sink_node(value); + } + request.__set_columns(http_req->header(HTTP_COLUMNS)); + } else { + request.__set_token(ctx->auth.token); + request.__set_load_sql(ctx->sql_str); + ctx->auth.token = ""; + } + set_request_auth(&request, ctx->auth); + request.__set_loadId(ctx->id.to_thrift()); + request.__set_label(ctx->label); + if (ctx->group_commit) { + if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { + request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); + } else { + // used for wait_internal_group_commit_finish + request.__set_group_commit_mode("sync_mode"); + } + } + if (_exec_env->cluster_info()->backend_id != 0) { + request.__set_backend_id(_exec_env->cluster_info()->backend_id); + } else { + LOG(WARNING) << "_exec_env->master_info not set backend_id"; + } + + // plan this load + TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; + int64_t stream_load_put_start_time = MonotonicNanos(); + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, ctx](FrontendServiceConnection& client) { + client->streamLoadPut(ctx->put_result, request); + })); + ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; + Status plan_status(Status::create(ctx->put_result.status)); + if (!plan_status.ok()) { + LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); + return plan_status; + } + if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { + return Status::NotSupported("http stream 2pc is unsupported for mow table"); + } + ctx->db = ctx->put_result.pipeline_params.db_name; + ctx->table = ctx->put_result.pipeline_params.table_name; + ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id; + ctx->label = ctx->put_result.pipeline_params.import_label; + ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id); + if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { + // FIXME find a way to avoid chunked stream load write large WALs + size_t content_length = 0; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + try { + content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + } catch (const std::exception& e) { + return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", + http_req->header(HttpHeaders::CONTENT_LENGTH), + e.what()); + } + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; + } + } + ctx->put_result.pipeline_params.__set_content_length(content_length); + } + + return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); +} + +std::string escapeString(const std::string& str) { + std::stringstream ss; + for (char ch : str) { + switch (ch) { + case '\'': + ss << "\\\'"; + break; + case '\"': + ss << "\\\""; + break; + case '\\': + ss << "\\\\"; + break; + case '\a': + ss << "\\a"; + break; + case '\b': + ss << "\\b"; + break; + case '\f': + ss << "\\f"; + break; + case '\n': + ss << "\\n"; + break; + case '\r': + ss << "\\r"; + break; + case '\t': + ss << "\\t"; + break; + case '\v': + ss << "\\v"; + break; + default: + ss << ch; + } + } + return ss.str(); +} + +Status StreamLoadAction::_parse_header(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) { Review Comment: warning: function '_parse_header' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status StreamLoadAction::_parse_header(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) { ^ ``` <details> <summary>Additional context</summary> **be/src/http/action/stream_load.cpp:1230:** 192 lines including whitespace and comments (threshold 80) ```cpp Status StreamLoadAction::_parse_header(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) { ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org