This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ef28b09 Feature: Server-end progressive reader for http protocol 
(#2210)
0ef28b09 is described below

commit 0ef28b0927babaaaa04f786791f17f7a54bc4e2f
Author: Siyang Tang <[email protected]>
AuthorDate: Sun Jun 25 14:30:05 2023 +0800

    Feature: Server-end progressive reader for http protocol (#2210)
    
    * enable serverend progressive reading
    
    * fix reviewed problems
    
    * test and fix
    
    * remove redundant test log
    
    * add has_progressive_read flag
    
    * format
---
 src/brpc/controller.h                    |   3 +
 src/brpc/details/http_message.h          |   4 +
 src/brpc/policy/http_rpc_protocol.cpp    |  42 ++++++--
 src/brpc/policy/http_rpc_protocol.h      |   2 +
 src/brpc/server.cpp                      |   8 +-
 src/brpc/server.h                        |  13 ++-
 test/brpc_http_rpc_protocol_unittest.cpp | 159 +++++++++++++++++++++++++++++++
 test/echo.proto                          |   5 +
 8 files changed, 224 insertions(+), 12 deletions(-)

diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 658cc695..3d75ff50 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -291,6 +291,9 @@ public:
     // Make the RPC end when the HTTP response has complete headers and let
     // user read the remaining body by using ReadProgressiveAttachmentBy().
     void response_will_be_read_progressively() { 
add_flag(FLAGS_READ_PROGRESSIVELY); }
+    // Make the RPC end when the HTTP request has complete headers and let
+    // user read the remaining body by using ReadProgressiveAttachmentBy().
+    void request_will_be_read_progressively() { 
add_flag(FLAGS_READ_PROGRESSIVELY); }
     // True if response_will_be_read_progressively() was called.
     bool is_response_read_progressively() const { return 
has_flag(FLAGS_READ_PROGRESSIVELY); }
 
diff --git a/src/brpc/details/http_message.h b/src/brpc/details/http_message.h
index e23b2010..2b9471a1 100644
--- a/src/brpc/details/http_message.h
+++ b/src/brpc/details/http_message.h
@@ -82,6 +82,10 @@ public:
 
     bool read_body_progressively() const { return _read_body_progressively; }
 
+    void set_read_body_progressively(bool read_body_progressively) {
+        this->_read_body_progressively = read_body_progressively;
+    }
+
     // Send new parts of the body to the reader. If the body already has some
     // data, feed them to the reader immediately.
     // Any error during the setting will destroy the reader.
diff --git a/src/brpc/policy/http_rpc_protocol.cpp 
b/src/brpc/policy/http_rpc_protocol.cpp
index d3ae2625..53ab6290 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -21,6 +21,7 @@
 #include <gflags/gflags.h>
 #include <json2pb/pb_to_json.h>                    // ProtoMessageToJson
 #include <json2pb/json_to_pb.h>                    // JsonToProtoMessage
+#include <string>
 
 #include "brpc/policy/http_rpc_protocol.h"
 #include "butil/unique_ptr.h"                       // std::unique_ptr
@@ -1082,7 +1083,7 @@ FindMethodPropertyByURI(const std::string& uri_path, 
const Server* server,
 }
 
 ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket,
-                             bool read_eof, const void* /*arg*/) {
+                             bool read_eof, const void* arg) {
     HttpContext* http_imsg = 
         static_cast<HttpContext*>(socket->parsing_context());
     if (http_imsg == NULL) {
@@ -1146,16 +1147,20 @@ ParseResult ParseHttpMessage(butil::IOBuf *source, 
Socket *socket,
         if (http_imsg->Completed()) {
             CHECK_EQ(http_imsg, socket->release_parsing_context());
             const ParseResult result = MakeMessage(http_imsg);
+            http_imsg->CheckProgressiveRead(arg, socket);
             if (socket->is_read_progressive()) {
                 socket->OnProgressiveReadCompleted();
             }
             return result;
-        } else if (socket->is_read_progressive() &&
-                   http_imsg->stage() >= HTTP_ON_HEADERS_COMPLETE) {
-            // header part of a progressively-read http message is complete,
-            // go on to ProcessHttpXXX w/o waiting for full body.
-            http_imsg->AddOneRefForStage2(); // released when body is fully 
read
-            return MakeMessage(http_imsg);
+        } else if (http_imsg->stage() >= HTTP_ON_HEADERS_COMPLETE) {
+            http_imsg->CheckProgressiveRead(arg, socket);
+            if (socket->is_read_progressive()) {
+                // header part of a progressively-read http message is 
complete,
+                // go on to ProcessHttpXXX w/o waiting for full body.
+                http_imsg->AddOneRefForStage2(); // released when body is 
fully read
+                return MakeMessage(http_imsg);
+            }
+            return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
         } else {
             return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
         }
@@ -1278,7 +1283,6 @@ void ProcessHttpRequest(InputMessageBase *msg) {
     HttpHeader& req_header = cntl->http_request();
     imsg_guard->header().Swap(req_header);
     butil::IOBuf& req_body = imsg_guard->body();
-
     butil::EndPoint user_addr;
     if (!GetUserAddressFromHeader(req_header, &user_addr)) {
         user_addr = socket->remote_side();
@@ -1547,8 +1551,12 @@ void ProcessHttpRequest(InputMessageBase *msg) {
             sample->submit(start_parse_us);
         }
     } else {
-        // A http server, just keep content as it is.
-        cntl->request_attachment().swap(req_body);
+        if (imsg_guard->read_body_progressively()) {
+            accessor.set_readable_progressive_attachment(imsg_guard.get());
+        } else {
+            // A http server, just keep content as it is.
+            cntl->request_attachment().swap(req_body);
+        }
     }
 
     google::protobuf::Closure* done = new 
HttpResponseSenderAsDone(&resp_sender);
@@ -1604,5 +1612,19 @@ const std::string& GetHttpMethodName(
     return !path.empty() ? path : common->DEFAULT_PATH;
 }
 
+void HttpContext::CheckProgressiveRead(const void* arg, Socket *socket) {
+    if (arg == NULL || !((Server *)arg)->has_progressive_read_method()) {
+        // arg == NULL indicates not in server-end
+        return;
+    }
+    const Server::MethodProperty *const sp = FindMethodPropertyByURI(
+        header().uri().path(), (Server *)arg,
+        const_cast<std::string *>(&header().unresolved_path()));
+    if (sp != NULL && sp->params.enable_progressive_read) {
+        this->set_read_body_progressively(true);
+        socket->read_will_be_progressive(CONNECTION_TYPE_SHORT);
+    }
+}
+
 }  // namespace policy
 } // namespace brpc
diff --git a/src/brpc/policy/http_rpc_protocol.h 
b/src/brpc/policy/http_rpc_protocol.h
index 91acae77..d7337221 100644
--- a/src/brpc/policy/http_rpc_protocol.h
+++ b/src/brpc/policy/http_rpc_protocol.h
@@ -115,6 +115,8 @@ public:
         return SetBodyReader(r);
     }
 
+    void CheckProgressiveRead(const void* arg, Socket *socket);
+
 private:
     bool _is_stage2;
 };
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 021450fc..4953f88c 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -409,7 +409,8 @@ Server::Server(ProfilerLinker)
     , _keytable_pool(NULL)
     , _eps_bvar(&_nerror_bvar)
     , _concurrency(0)
-    , _concurrency_bvar(cast_no_barrier_int, &_concurrency) {
+    , _concurrency_bvar(cast_no_barrier_int, &_concurrency)
+    ,_has_progressive_read_method(false) {
     BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0,
                   Server_concurrency_must_be_aligned_by_cacheline);
 }
@@ -1290,6 +1291,10 @@ int 
Server::AddServiceInternal(google::protobuf::Service* service,
         mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
         mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
         mp.params.pb_single_repeated_to_array = 
svc_opt.pb_single_repeated_to_array;
+        mp.params.enable_progressive_read = svc_opt.enable_progressive_read;
+        if (mp.params.enable_progressive_read) {
+            _has_progressive_read_method = true;
+        }
         mp.service = service;
         mp.method = md;
         mp.status = new MethodStatus;
@@ -1477,6 +1482,7 @@ ServiceOptions::ServiceOptions()
     , pb_bytes_to_base64(true)
 #endif
     , pb_single_repeated_to_array(false)
+    , enable_progressive_read(false)
     {}
 
 int Server::AddService(google::protobuf::Service* service,
diff --git a/src/brpc/server.h b/src/brpc/server.h
index e01670be..c00f9dc8 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -319,6 +319,10 @@ struct ServiceOptions {
     // decode json array to protobuf message which contains a single repeated 
field.
     // Default: false.
     bool pb_single_repeated_to_array;
+
+    // enable server end progressive reading, mainly for http server
+    // Default: false.
+    bool enable_progressive_read;
 };
 
 // Represent ports inside [min_port, max_port]
@@ -369,6 +373,7 @@ public:
             bool allow_http_body_to_pb;
             bool pb_bytes_to_base64;
             bool pb_single_repeated_to_array;
+            bool enable_progressive_read;
             OpaqueParams();
         };
         OpaqueParams params;
@@ -560,10 +565,14 @@ public:
     int Concurrency() const {
         return butil::subtle::NoBarrier_Load(&_concurrency);
     };
-
+  
     // Returns true if accept request, reject request otherwise.
     bool AcceptRequest(Controller* cntl) const;
 
+    bool has_progressive_read_method() const {
+        return this->_has_progressive_read_method;
+    }
+
 private:
 friend class StatusService;
 friend class ProtobufsService;
@@ -714,6 +723,8 @@ friend class Controller;
     mutable bvar::PerSecond<bvar::Adder<int64_t> > _eps_bvar;
     BAIDU_CACHELINE_ALIGNMENT mutable int32_t _concurrency;
     bvar::PassiveStatus<int32_t> _concurrency_bvar;
+
+    bool _has_progressive_read_method;
 };
 
 // Get the data attached to current searching thread. The data is created by
diff --git a/test/brpc_http_rpc_protocol_unittest.cpp 
b/test/brpc_http_rpc_protocol_unittest.cpp
index a9b76f37..0698651e 100644
--- a/test/brpc_http_rpc_protocol_unittest.cpp
+++ b/test/brpc_http_rpc_protocol_unittest.cpp
@@ -19,6 +19,9 @@
 
 // Date: Sun Jul 13 15:04:18 CST 2014
 
+#include <cstddef>
+#include <google/protobuf/stubs/logging.h>
+#include <string>
 #include <sys/ioctl.h>
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -26,6 +29,10 @@
 #include <gflags/gflags.h>
 #include <google/protobuf/descriptor.h>
 #include <google/protobuf/text_format.h>
+#include <unistd.h>
+#include "brpc/http_method.h"
+#include "butil/iobuf.h"
+#include "butil/logging.h"
 #include "butil/time.h"
 #include "butil/macros.h"
 #include "butil/files/scoped_file.h"
@@ -1014,6 +1021,158 @@ TEST_F(HttpTest, 
broken_socket_stops_progressive_reading) {
     ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
 }
 
+static const std::string TEST_PROGRESSIVE_HEADER = "Progressive";
+static const std::string TEST_PROGRESSIVE_HEADER_VAL = "Progressive-val";
+
+class ServerProgressiveReader : public ReadBody {
+public:
+    ServerProgressiveReader(brpc::Controller* cntl, google::protobuf::Closure* 
done) 
+        : _cntl(cntl)
+        , _done(done) {}
+
+    // @ProgressiveReader
+    void OnEndOfMessage(const butil::Status& st) {
+        butil::intrusive_ptr<ReadBody>(this);
+        brpc::ClosureGuard done_guard(_done);
+        ASSERT_LT(_buf.size(), PA_DATA_LEN);
+        ASSERT_EQ(0, memcmp(_buf.data(), PA_DATA, _buf.size()));
+        _destroyed = true;
+        _destroying_st = st;
+        LOG(INFO) << "Destroy ReadBody=" << this << ", " << st;
+        _cntl->response_attachment().append("Sucess");
+    }
+private:
+    brpc::Controller* _cntl;
+    google::protobuf::Closure* _done;
+};
+
+class ServerAlwaysFailReader : public brpc::ProgressiveReader {
+public:
+    ServerAlwaysFailReader(brpc::Controller* cntl, google::protobuf::Closure* 
done) 
+        : _cntl(cntl)
+        , _done(done) {}
+
+    // @ProgressiveReader
+    butil::Status OnReadOnePart(const void* /*data*/, size_t /*length*/) {
+        return butil::Status(-1, "intended fail at %s:%d", __FILE__, __LINE__);
+    }    
+
+    void OnEndOfMessage(const butil::Status& st) {
+        brpc::ClosureGuard done_guard(_done);
+        CHECK_EQ(-1, st.error_code());
+        _cntl->SetFailed("Must Failed");
+        LOG(INFO) << "Destroy " << this << ": " << st;
+        delete this;
+    }
+private:
+    brpc::Controller* _cntl;
+    google::protobuf::Closure* _done;
+};
+
+class UploadServiceImpl : public ::test::UploadService {
+public:
+    void Upload(::google::protobuf::RpcController* controller,
+                        const ::test::HttpRequest* request,
+                        ::test::HttpResponse* response,
+                        ::google::protobuf::Closure* done) {
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        check_header(cntl);
+        cntl->request_will_be_read_progressively();
+        cntl->ReadProgressiveAttachmentBy(new ServerProgressiveReader(cntl, 
done));
+    }
+                        
+    void UploadFailed(::google::protobuf::RpcController* controller,
+                        const ::test::HttpRequest* request,
+                        ::test::HttpResponse* response,
+                        ::google::protobuf::Closure* done) {
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+        check_header(cntl);
+        cntl->request_will_be_read_progressively();
+        cntl->ReadProgressiveAttachmentBy(new ServerAlwaysFailReader(cntl, 
done));
+    }
+
+private:
+    void check_header(brpc::Controller* cntl) {
+        const std::string* test_header = 
cntl->http_request().GetHeader(TEST_PROGRESSIVE_HEADER);
+        GOOGLE_CHECK_NOTNULL(test_header);
+        CHECK_EQ(*test_header, TEST_PROGRESSIVE_HEADER_VAL);
+    }
+};
+
+TEST_F(HttpTest, server_end_read_short_body_progressively) {
+    const int port = 8923;
+    brpc::ServiceOptions opt;
+    opt.enable_progressive_read = true;
+    opt.ownership = brpc::SERVER_DOESNT_OWN_SERVICE;
+    UploadServiceImpl upsvc;
+    brpc::Server server;
+    EXPECT_EQ(0, server.AddService(&upsvc, opt));
+    EXPECT_EQ(0, server.Start(port, NULL));
+
+    brpc::Channel channel;
+    brpc::ChannelOptions options;
+    options.protocol = brpc::PROTOCOL_HTTP;
+    ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), 
&options));
+    brpc::Controller cntl;
+    cntl.http_request().uri() = "/UploadService/Upload";
+    cntl.http_request().SetHeader(TEST_PROGRESSIVE_HEADER, 
TEST_PROGRESSIVE_HEADER_VAL);
+    cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+    
+    ASSERT_GT(PA_DATA_LEN, 8u);  // long enough to hold a 64-bit decimal.
+    char buf[PA_DATA_LEN];
+    for (size_t c = 0; c < 10000;) {
+        CopyPAPrefixedWithSeqNo(buf, c);
+        if (cntl.request_attachment().append(buf, sizeof(buf)) != 0) {
+            if (errno == brpc::EOVERCROWDED) {
+                LOG(INFO) << "full msg=" << 
cntl.request_attachment().to_string();
+            } else {
+                LOG(INFO) << "Error:" << errno;
+            }
+            break;
+        }
+        ++c;
+    }
+    channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
+    ASSERT_FALSE(cntl.Failed());
+}
+
+TEST_F(HttpTest, server_end_read_failed) {
+    const int port = 8923;
+    brpc::ServiceOptions opt;
+    opt.enable_progressive_read = true;
+    opt.ownership = brpc::SERVER_DOESNT_OWN_SERVICE;
+    UploadServiceImpl upsvc;
+    brpc::Server server;
+    EXPECT_EQ(0, server.AddService(&upsvc, opt));
+    EXPECT_EQ(0, server.Start(port, NULL));
+
+    brpc::Channel channel;
+    brpc::ChannelOptions options;
+    options.protocol = brpc::PROTOCOL_HTTP;
+    ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), 
&options));
+    brpc::Controller cntl;
+    cntl.http_request().uri() = "/UploadService/UploadFailed";
+    cntl.http_request().SetHeader(TEST_PROGRESSIVE_HEADER, 
TEST_PROGRESSIVE_HEADER_VAL);
+    cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+
+    ASSERT_GT(PA_DATA_LEN, 8u);  // long enough to hold a 64-bit decimal.
+    char buf[PA_DATA_LEN];
+    for (size_t c = 0; c < 10;) {
+        CopyPAPrefixedWithSeqNo(buf, c);
+        if (cntl.request_attachment().append(buf, sizeof(buf)) != 0) {
+            if (errno == brpc::EOVERCROWDED) {
+                LOG(INFO) << "full msg=" << 
cntl.request_attachment().to_string();
+            } else {
+                LOG(INFO) << "Error:" << errno;
+            }
+            break;
+        }
+        ++c;
+    }
+    channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
+    ASSERT_TRUE(cntl.Failed());
+}
+
 TEST_F(HttpTest, http2_sanity) {
     const int port = 8923;
     brpc::Server server;
diff --git a/test/echo.proto b/test/echo.proto
index 2a47b234..a027516e 100644
--- a/test/echo.proto
+++ b/test/echo.proto
@@ -65,6 +65,11 @@ service DownloadService {
     rpc DownloadFailed(HttpRequest) returns (HttpResponse);
 }
 
+service UploadService {
+    rpc Upload(HttpRequest) returns (HttpResponse);
+    rpc UploadFailed(HttpRequest) returns (HttpResponse);
+}
+
 service UserNamingService {
     rpc ListNames(HttpRequest) returns (HttpResponse);
     rpc Touch(HttpRequest) returns (HttpResponse);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to