This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 0ef28b09 Feature: Server-end progressive reader for http protocol
(#2210)
0ef28b09 is described below
commit 0ef28b0927babaaaa04f786791f17f7a54bc4e2f
Author: Siyang Tang <[email protected]>
AuthorDate: Sun Jun 25 14:30:05 2023 +0800
Feature: Server-end progressive reader for http protocol (#2210)
* enable serverend progressive reading
* fix reviewed problems
* test and fix
* remove redundant test log
* add has_progressive_read flag
* format
---
src/brpc/controller.h | 3 +
src/brpc/details/http_message.h | 4 +
src/brpc/policy/http_rpc_protocol.cpp | 42 ++++++--
src/brpc/policy/http_rpc_protocol.h | 2 +
src/brpc/server.cpp | 8 +-
src/brpc/server.h | 13 ++-
test/brpc_http_rpc_protocol_unittest.cpp | 159 +++++++++++++++++++++++++++++++
test/echo.proto | 5 +
8 files changed, 224 insertions(+), 12 deletions(-)
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 658cc695..3d75ff50 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -291,6 +291,9 @@ public:
// Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() {
add_flag(FLAGS_READ_PROGRESSIVELY); }
+ // Make the RPC end when the HTTP request has complete headers and let
+ // user read the remaining body by using ReadProgressiveAttachmentBy().
+ void request_will_be_read_progressively() {
add_flag(FLAGS_READ_PROGRESSIVELY); }
// True if response_will_be_read_progressively() was called.
bool is_response_read_progressively() const { return
has_flag(FLAGS_READ_PROGRESSIVELY); }
diff --git a/src/brpc/details/http_message.h b/src/brpc/details/http_message.h
index e23b2010..2b9471a1 100644
--- a/src/brpc/details/http_message.h
+++ b/src/brpc/details/http_message.h
@@ -82,6 +82,10 @@ public:
bool read_body_progressively() const { return _read_body_progressively; }
+ void set_read_body_progressively(bool read_body_progressively) {
+ this->_read_body_progressively = read_body_progressively;
+ }
+
// Send new parts of the body to the reader. If the body already has some
// data, feed them to the reader immediately.
// Any error during the setting will destroy the reader.
diff --git a/src/brpc/policy/http_rpc_protocol.cpp
b/src/brpc/policy/http_rpc_protocol.cpp
index d3ae2625..53ab6290 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -21,6 +21,7 @@
#include <gflags/gflags.h>
#include <json2pb/pb_to_json.h> // ProtoMessageToJson
#include <json2pb/json_to_pb.h> // JsonToProtoMessage
+#include <string>
#include "brpc/policy/http_rpc_protocol.h"
#include "butil/unique_ptr.h" // std::unique_ptr
@@ -1082,7 +1083,7 @@ FindMethodPropertyByURI(const std::string& uri_path,
const Server* server,
}
ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket,
- bool read_eof, const void* /*arg*/) {
+ bool read_eof, const void* arg) {
HttpContext* http_imsg =
static_cast<HttpContext*>(socket->parsing_context());
if (http_imsg == NULL) {
@@ -1146,16 +1147,20 @@ ParseResult ParseHttpMessage(butil::IOBuf *source,
Socket *socket,
if (http_imsg->Completed()) {
CHECK_EQ(http_imsg, socket->release_parsing_context());
const ParseResult result = MakeMessage(http_imsg);
+ http_imsg->CheckProgressiveRead(arg, socket);
if (socket->is_read_progressive()) {
socket->OnProgressiveReadCompleted();
}
return result;
- } else if (socket->is_read_progressive() &&
- http_imsg->stage() >= HTTP_ON_HEADERS_COMPLETE) {
- // header part of a progressively-read http message is complete,
- // go on to ProcessHttpXXX w/o waiting for full body.
- http_imsg->AddOneRefForStage2(); // released when body is fully
read
- return MakeMessage(http_imsg);
+ } else if (http_imsg->stage() >= HTTP_ON_HEADERS_COMPLETE) {
+ http_imsg->CheckProgressiveRead(arg, socket);
+ if (socket->is_read_progressive()) {
+ // header part of a progressively-read http message is
complete,
+ // go on to ProcessHttpXXX w/o waiting for full body.
+ http_imsg->AddOneRefForStage2(); // released when body is
fully read
+ return MakeMessage(http_imsg);
+ }
+ return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
} else {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
@@ -1278,7 +1283,6 @@ void ProcessHttpRequest(InputMessageBase *msg) {
HttpHeader& req_header = cntl->http_request();
imsg_guard->header().Swap(req_header);
butil::IOBuf& req_body = imsg_guard->body();
-
butil::EndPoint user_addr;
if (!GetUserAddressFromHeader(req_header, &user_addr)) {
user_addr = socket->remote_side();
@@ -1547,8 +1551,12 @@ void ProcessHttpRequest(InputMessageBase *msg) {
sample->submit(start_parse_us);
}
} else {
- // A http server, just keep content as it is.
- cntl->request_attachment().swap(req_body);
+ if (imsg_guard->read_body_progressively()) {
+ accessor.set_readable_progressive_attachment(imsg_guard.get());
+ } else {
+ // A http server, just keep content as it is.
+ cntl->request_attachment().swap(req_body);
+ }
}
google::protobuf::Closure* done = new
HttpResponseSenderAsDone(&resp_sender);
@@ -1604,5 +1612,19 @@ const std::string& GetHttpMethodName(
return !path.empty() ? path : common->DEFAULT_PATH;
}
+void HttpContext::CheckProgressiveRead(const void* arg, Socket *socket) {
+ if (arg == NULL || !((Server *)arg)->has_progressive_read_method()) {
+ // arg == NULL indicates not in server-end
+ return;
+ }
+ const Server::MethodProperty *const sp = FindMethodPropertyByURI(
+ header().uri().path(), (Server *)arg,
+ const_cast<std::string *>(&header().unresolved_path()));
+ if (sp != NULL && sp->params.enable_progressive_read) {
+ this->set_read_body_progressively(true);
+ socket->read_will_be_progressive(CONNECTION_TYPE_SHORT);
+ }
+}
+
} // namespace policy
} // namespace brpc
diff --git a/src/brpc/policy/http_rpc_protocol.h
b/src/brpc/policy/http_rpc_protocol.h
index 91acae77..d7337221 100644
--- a/src/brpc/policy/http_rpc_protocol.h
+++ b/src/brpc/policy/http_rpc_protocol.h
@@ -115,6 +115,8 @@ public:
return SetBodyReader(r);
}
+ void CheckProgressiveRead(const void* arg, Socket *socket);
+
private:
bool _is_stage2;
};
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 021450fc..4953f88c 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -409,7 +409,8 @@ Server::Server(ProfilerLinker)
, _keytable_pool(NULL)
, _eps_bvar(&_nerror_bvar)
, _concurrency(0)
- , _concurrency_bvar(cast_no_barrier_int, &_concurrency) {
+ , _concurrency_bvar(cast_no_barrier_int, &_concurrency)
+ ,_has_progressive_read_method(false) {
BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0,
Server_concurrency_must_be_aligned_by_cacheline);
}
@@ -1290,6 +1291,10 @@ int
Server::AddServiceInternal(google::protobuf::Service* service,
mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
mp.params.pb_single_repeated_to_array =
svc_opt.pb_single_repeated_to_array;
+ mp.params.enable_progressive_read = svc_opt.enable_progressive_read;
+ if (mp.params.enable_progressive_read) {
+ _has_progressive_read_method = true;
+ }
mp.service = service;
mp.method = md;
mp.status = new MethodStatus;
@@ -1477,6 +1482,7 @@ ServiceOptions::ServiceOptions()
, pb_bytes_to_base64(true)
#endif
, pb_single_repeated_to_array(false)
+ , enable_progressive_read(false)
{}
int Server::AddService(google::protobuf::Service* service,
diff --git a/src/brpc/server.h b/src/brpc/server.h
index e01670be..c00f9dc8 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -319,6 +319,10 @@ struct ServiceOptions {
// decode json array to protobuf message which contains a single repeated
field.
// Default: false.
bool pb_single_repeated_to_array;
+
+ // enable server end progressive reading, mainly for http server
+ // Default: false.
+ bool enable_progressive_read;
};
// Represent ports inside [min_port, max_port]
@@ -369,6 +373,7 @@ public:
bool allow_http_body_to_pb;
bool pb_bytes_to_base64;
bool pb_single_repeated_to_array;
+ bool enable_progressive_read;
OpaqueParams();
};
OpaqueParams params;
@@ -560,10 +565,14 @@ public:
int Concurrency() const {
return butil::subtle::NoBarrier_Load(&_concurrency);
};
-
+
// Returns true if accept request, reject request otherwise.
bool AcceptRequest(Controller* cntl) const;
+ bool has_progressive_read_method() const {
+ return this->_has_progressive_read_method;
+ }
+
private:
friend class StatusService;
friend class ProtobufsService;
@@ -714,6 +723,8 @@ friend class Controller;
mutable bvar::PerSecond<bvar::Adder<int64_t> > _eps_bvar;
BAIDU_CACHELINE_ALIGNMENT mutable int32_t _concurrency;
bvar::PassiveStatus<int32_t> _concurrency_bvar;
+
+ bool _has_progressive_read_method;
};
// Get the data attached to current searching thread. The data is created by
diff --git a/test/brpc_http_rpc_protocol_unittest.cpp
b/test/brpc_http_rpc_protocol_unittest.cpp
index a9b76f37..0698651e 100644
--- a/test/brpc_http_rpc_protocol_unittest.cpp
+++ b/test/brpc_http_rpc_protocol_unittest.cpp
@@ -19,6 +19,9 @@
// Date: Sun Jul 13 15:04:18 CST 2014
+#include <cstddef>
+#include <google/protobuf/stubs/logging.h>
+#include <string>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/socket.h>
@@ -26,6 +29,10 @@
#include <gflags/gflags.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/text_format.h>
+#include <unistd.h>
+#include "brpc/http_method.h"
+#include "butil/iobuf.h"
+#include "butil/logging.h"
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/files/scoped_file.h"
@@ -1014,6 +1021,158 @@ TEST_F(HttpTest,
broken_socket_stops_progressive_reading) {
ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
}
+static const std::string TEST_PROGRESSIVE_HEADER = "Progressive";
+static const std::string TEST_PROGRESSIVE_HEADER_VAL = "Progressive-val";
+
+class ServerProgressiveReader : public ReadBody {
+public:
+ ServerProgressiveReader(brpc::Controller* cntl, google::protobuf::Closure*
done)
+ : _cntl(cntl)
+ , _done(done) {}
+
+ // @ProgressiveReader
+ void OnEndOfMessage(const butil::Status& st) {
+ butil::intrusive_ptr<ReadBody>(this);
+ brpc::ClosureGuard done_guard(_done);
+ ASSERT_LT(_buf.size(), PA_DATA_LEN);
+ ASSERT_EQ(0, memcmp(_buf.data(), PA_DATA, _buf.size()));
+ _destroyed = true;
+ _destroying_st = st;
+ LOG(INFO) << "Destroy ReadBody=" << this << ", " << st;
+ _cntl->response_attachment().append("Sucess");
+ }
+private:
+ brpc::Controller* _cntl;
+ google::protobuf::Closure* _done;
+};
+
+class ServerAlwaysFailReader : public brpc::ProgressiveReader {
+public:
+ ServerAlwaysFailReader(brpc::Controller* cntl, google::protobuf::Closure*
done)
+ : _cntl(cntl)
+ , _done(done) {}
+
+ // @ProgressiveReader
+ butil::Status OnReadOnePart(const void* /*data*/, size_t /*length*/) {
+ return butil::Status(-1, "intended fail at %s:%d", __FILE__, __LINE__);
+ }
+
+ void OnEndOfMessage(const butil::Status& st) {
+ brpc::ClosureGuard done_guard(_done);
+ CHECK_EQ(-1, st.error_code());
+ _cntl->SetFailed("Must Failed");
+ LOG(INFO) << "Destroy " << this << ": " << st;
+ delete this;
+ }
+private:
+ brpc::Controller* _cntl;
+ google::protobuf::Closure* _done;
+};
+
+class UploadServiceImpl : public ::test::UploadService {
+public:
+ void Upload(::google::protobuf::RpcController* controller,
+ const ::test::HttpRequest* request,
+ ::test::HttpResponse* response,
+ ::google::protobuf::Closure* done) {
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ check_header(cntl);
+ cntl->request_will_be_read_progressively();
+ cntl->ReadProgressiveAttachmentBy(new ServerProgressiveReader(cntl,
done));
+ }
+
+ void UploadFailed(::google::protobuf::RpcController* controller,
+ const ::test::HttpRequest* request,
+ ::test::HttpResponse* response,
+ ::google::protobuf::Closure* done) {
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+ check_header(cntl);
+ cntl->request_will_be_read_progressively();
+ cntl->ReadProgressiveAttachmentBy(new ServerAlwaysFailReader(cntl,
done));
+ }
+
+private:
+ void check_header(brpc::Controller* cntl) {
+ const std::string* test_header =
cntl->http_request().GetHeader(TEST_PROGRESSIVE_HEADER);
+ GOOGLE_CHECK_NOTNULL(test_header);
+ CHECK_EQ(*test_header, TEST_PROGRESSIVE_HEADER_VAL);
+ }
+};
+
+TEST_F(HttpTest, server_end_read_short_body_progressively) {
+ const int port = 8923;
+ brpc::ServiceOptions opt;
+ opt.enable_progressive_read = true;
+ opt.ownership = brpc::SERVER_DOESNT_OWN_SERVICE;
+ UploadServiceImpl upsvc;
+ brpc::Server server;
+ EXPECT_EQ(0, server.AddService(&upsvc, opt));
+ EXPECT_EQ(0, server.Start(port, NULL));
+
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_HTTP;
+ ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port),
&options));
+ brpc::Controller cntl;
+ cntl.http_request().uri() = "/UploadService/Upload";
+ cntl.http_request().SetHeader(TEST_PROGRESSIVE_HEADER,
TEST_PROGRESSIVE_HEADER_VAL);
+ cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+
+ ASSERT_GT(PA_DATA_LEN, 8u); // long enough to hold a 64-bit decimal.
+ char buf[PA_DATA_LEN];
+ for (size_t c = 0; c < 10000;) {
+ CopyPAPrefixedWithSeqNo(buf, c);
+ if (cntl.request_attachment().append(buf, sizeof(buf)) != 0) {
+ if (errno == brpc::EOVERCROWDED) {
+ LOG(INFO) << "full msg=" <<
cntl.request_attachment().to_string();
+ } else {
+ LOG(INFO) << "Error:" << errno;
+ }
+ break;
+ }
+ ++c;
+ }
+ channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
+ ASSERT_FALSE(cntl.Failed());
+}
+
+TEST_F(HttpTest, server_end_read_failed) {
+ const int port = 8923;
+ brpc::ServiceOptions opt;
+ opt.enable_progressive_read = true;
+ opt.ownership = brpc::SERVER_DOESNT_OWN_SERVICE;
+ UploadServiceImpl upsvc;
+ brpc::Server server;
+ EXPECT_EQ(0, server.AddService(&upsvc, opt));
+ EXPECT_EQ(0, server.Start(port, NULL));
+
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_HTTP;
+ ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port),
&options));
+ brpc::Controller cntl;
+ cntl.http_request().uri() = "/UploadService/UploadFailed";
+ cntl.http_request().SetHeader(TEST_PROGRESSIVE_HEADER,
TEST_PROGRESSIVE_HEADER_VAL);
+ cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+
+ ASSERT_GT(PA_DATA_LEN, 8u); // long enough to hold a 64-bit decimal.
+ char buf[PA_DATA_LEN];
+ for (size_t c = 0; c < 10;) {
+ CopyPAPrefixedWithSeqNo(buf, c);
+ if (cntl.request_attachment().append(buf, sizeof(buf)) != 0) {
+ if (errno == brpc::EOVERCROWDED) {
+ LOG(INFO) << "full msg=" <<
cntl.request_attachment().to_string();
+ } else {
+ LOG(INFO) << "Error:" << errno;
+ }
+ break;
+ }
+ ++c;
+ }
+ channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
+ ASSERT_TRUE(cntl.Failed());
+}
+
TEST_F(HttpTest, http2_sanity) {
const int port = 8923;
brpc::Server server;
diff --git a/test/echo.proto b/test/echo.proto
index 2a47b234..a027516e 100644
--- a/test/echo.proto
+++ b/test/echo.proto
@@ -65,6 +65,11 @@ service DownloadService {
rpc DownloadFailed(HttpRequest) returns (HttpResponse);
}
+service UploadService {
+ rpc Upload(HttpRequest) returns (HttpResponse);
+ rpc UploadFailed(HttpRequest) returns (HttpResponse);
+}
+
service UserNamingService {
rpc ListNames(HttpRequest) returns (HttpResponse);
rpc Touch(HttpRequest) returns (HttpResponse);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]