This is an automated email from the ASF dual-hosted git repository.

chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cd0a727 Add IOBuf<->std::iostream adapters for zero-copy I/O (#3341)
4cd0a727 is described below

commit 4cd0a727e272fa285e1ff5f82f661a5bada83d3c
Author: Bright Chen <[email protected]>
AuthorDate: Sun Jun 14 12:51:18 2026 +0800

    Add IOBuf<->std::iostream adapters for zero-copy I/O (#3341)
    
    Introduce four classes that bridge IOBuf and the standard iostream
    hierarchy, enabling parsers and serializers that take std::istream& /
    std::ostream& (e.g. nlohmann::json) to read from and write to IOBuf
    directly without an intermediate string copy:
    
      - IOBufAsInputStreamBuf  : read-only streambuf over IOBuf blocks
      - IOBufInputStream       : std::istream view over IOBuf
      - IOBufAsOutputStreamBuf : append-only streambuf, reusing
                                 IOBufAsZeroCopyOutputStream's TLS block pool
      - IOBufOutputStream      : std::ostream view over IOBuf
---
 MODULE.bazel            |   1 +
 WORKSPACE               |  18 +++
 src/butil/iobuf.cpp     | 126 +++++++++++++++++++
 src/butil/iobuf.h       | 161 +++++++++++++++++++++++++
 test/BUILD.bazel        |   2 +
 test/iobuf_unittest.cpp | 313 +++++++++++++++++++++++++++++++++++++++++++++++-
 6 files changed, 618 insertions(+), 3 deletions(-)

diff --git a/MODULE.bazel b/MODULE.bazel
index bd5b8e38..b855cd50 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -44,6 +44,7 @@ bazel_dep(name = 'thrift', version = '0.21.0', repo_name = 
'org_apache_thrift')
 
 # test only
 bazel_dep(name = "gperftools", version = "2.18.1", dev_dependency = True)
+bazel_dep(name = "nlohmann_json", version = "3.12.0", dev_dependency = True)
 bazel_dep(name = 'googletest', version = '1.14.0.bcr.1', repo_name = 
'com_google_googletest', dev_dependency = True)
 bazel_dep(name = 'hedron_compile_commands', dev_dependency = True)
 git_override(
diff --git a/WORKSPACE b/WORKSPACE
index a107f0a5..78a6c283 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -279,6 +279,24 @@ http_archive(
     urls = 
["https://archive.apache.org/dist/thrift/0.15.0/thrift-0.15.0.tar.gz";],
 )
 
+# Header-only JSON library used by iobuf_unittest's IOBuf<->std::iostream
+# adapter tests. Keep version in sync with MODULE.bazel.
+http_archive(
+    name = "nlohmann_json",
+    build_file_content = """
+cc_library(
+    name = "json",
+    hdrs = ["single_include/nlohmann/json.hpp", 
"single_include/nlohmann/json_fwd.hpp"],
+    strip_include_prefix = "single_include",
+    visibility = ["//visibility:public"],
+)
+""",
+    sha256 = 
"4b92eb0c06d10683f7447ce9406cb97cd4b453be18d7279320f7b2f025c10187",
+    strip_prefix = "json-3.12.0",
+    urls = 
["https://github.com/nlohmann/json/archive/refs/tags/v3.12.0.tar.gz";],
+)
+
+
 #
 # Perl Dependencies
 #
diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp
index af77d968..01469e22 100644
--- a/src/butil/iobuf.cpp
+++ b/src/butil/iobuf.cpp
@@ -30,6 +30,8 @@
 #include <fcntl.h>                         // O_RDONLY
 #include <errno.h>                         // errno
 #include <limits.h>                        // CHAR_BIT
+#include <algorithm>                       // std::min
+#include <limits>                          // std::numeric_limits
 #include <stdexcept>                       // std::invalid_argument
 #include <gflags/gflags.h>                 // gflags
 #include "butil/build_config.h"             // ARCH_CPU_X86_64
@@ -2025,6 +2027,130 @@ void IOBufAsZeroCopyOutputStream::_release_block() {
     _cur_block = NULL;
 }
 
+std::streambuf::int_type IOBufAsInputStreamBuf::underflow() {
+    size_t block_num = _buf.backing_block_num();
+    StringPiece blk;
+    while (_block_index < block_num) {
+        blk = _buf.backing_block(_block_index++);
+        if (!blk.empty()) {
+            break;
+        }
+    }
+    if (blk.empty()) {
+        return traits_type::eof();
+    }
+    // const_cast is safe here: setg() takes char* by API contract, but this
+    // streambuf never writes through it (no overflow/sputc path).
+    char* p = const_cast<char*>(blk.data());
+    setg(p, p, p + blk.size());
+    return traits_type::to_int_type(*gptr());
+}
+
+std::streamsize IOBufAsInputStreamBuf::xsgetn(char* s, std::streamsize n) {
+    auto kIntMax = 
static_cast<std::streamsize>(std::numeric_limits<int>::max());
+    std::streamsize total = 0;
+    while (total < n) {
+        std::streamsize avail = egptr() - gptr();
+        if (avail == 0) {
+            if (underflow() == traits_type::eof()) {
+                break;
+            }
+            avail = egptr() - gptr();
+        }
+        // Cap step at INT_MAX so gbump(int) cannot overflow when a user-data
+        // block exceeds 2GB.
+        std::streamsize step =
+            std::min(std::min(avail, n - total), kIntMax);
+        iobuf::cp(s + total, gptr(), static_cast<size_t>(step));
+        gbump(static_cast<int>(step));
+        total += step;
+    }
+    return total;
+}
+
+std::streamsize IOBufAsInputStreamBuf::showmanyc() {
+    std::streamsize kMax = std::numeric_limits<std::streamsize>::max();
+    std::streamsize n = egptr() - gptr();
+    size_t block_num = _buf.backing_block_num();
+    for (size_t i = _block_index; i < block_num; ++i) {
+        const std::streamsize sz =
+            static_cast<std::streamsize>(_buf.backing_block(i).size());
+        // Saturate instead of overflowing on pathologically large IOBufs.
+        if (n > kMax - sz) {
+            return kMax;
+        }
+        n += sz;
+    }
+    return n;
+}
+
+IOBufAsOutputStreamBuf::~IOBufAsOutputStreamBuf() { shrink(); }
+
+void IOBufAsOutputStreamBuf::shrink() {
+    if (pbase() != NULL) {
+        std::streamsize unused = epptr() - pptr();
+        // _zc.BackUp takes int. A single put area never exceeds one block
+        // (Next() returns int size), so this fits in int by construction;
+        // the cap is purely defensive.
+        int kIntMax = std::numeric_limits<int>::max();
+        _zc.BackUp(unused > kIntMax ? kIntMax : static_cast<int>(unused));
+        setp(NULL, NULL);
+    }
+}
+
+std::streambuf::int_type IOBufAsOutputStreamBuf::overflow(int_type ch) {
+    if (traits_type::eq_int_type(ch, traits_type::eof())) {
+        return traits_type::not_eof(ch);
+    }
+    if (!refresh_put_area()) {
+        return traits_type::eof();
+    }
+    return sputc(traits_type::to_char_type(ch));
+}
+
+std::streamsize IOBufAsOutputStreamBuf::xsputn(
+        const char* s, std::streamsize n) {
+    auto kIntMax = 
static_cast<std::streamsize>(std::numeric_limits<int>::max());
+    std::streamsize total = 0;
+    while (total < n) {
+        std::streamsize avail = epptr() - pptr();
+        if (avail == 0) {
+            if (!refresh_put_area()) {
+                break;
+            }
+            avail = epptr() - pptr();
+            if (avail == 0) {
+                break;
+            }
+        }
+        // Cap step at INT_MAX so pbump(int) cannot overflow when a dedicated
+        // block exceeds 2GB.
+        std::streamsize step =
+            std::min(std::min(avail, n - total), kIntMax);
+        iobuf::cp(pptr(), s + total, static_cast<size_t>(step));
+        pbump(static_cast<int>(step));
+        total += step;
+    }
+    return total;
+}
+
+int IOBufAsOutputStreamBuf::sync() {
+    shrink();
+    return 0;
+}
+
+bool IOBufAsOutputStreamBuf::refresh_put_area() {
+    void* block = NULL;
+    int size = 0;
+    if (!_zc.Next(&block, &size)) {
+        setp(NULL, NULL);
+        return false;
+    }
+    char* p = static_cast<char*>(block);
+    setp(p, p + size);
+    return true;
+}
+
 IOBufAsSnappySink::IOBufAsSnappySink(butil::IOBuf& buf)
     : _cur_buf(NULL), _cur_len(0), _buf(&buf), _buf_stream(&buf) {
 }
diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h
index 978aa34f..b92a2e3d 100644
--- a/src/butil/iobuf.h
+++ b/src/butil/iobuf.h
@@ -25,6 +25,8 @@
 #include <sys/uio.h>                             // iovec
 #include <stdint.h>                              // uint32_t, int64_t
 #include <functional>
+#include <istream>                               // std::istream
+#include <streambuf>                             // std::streambuf
 #include <string>                                // std::string
 #include <ostream>                               // std::ostream
 #include <google/protobuf/io/zero_copy_stream.h> // ZeroCopyInputStream
@@ -609,6 +611,165 @@ private:
     int64_t _byte_count;
 };
 
+// Wrap IOBuf into a std::streambuf for std::istream-based parsers
+// (e.g. nlohmann::json::parse(std::istream&)).
+//
+// Read-only view: the streambuf never writes to the source IOBuf. Forward-only
+// (seekoff/seekpos are not overridden).
+//
+// NOTE: The source IOBuf MUST NOT be modified during the lifetime of this
+// streambuf, otherwise the StringPieces returned by backing_block() may be
+// invalidated and the stream will read garbage or crash.
+class IOBufAsInputStreamBuf : public std::streambuf {
+public:
+    // `buf' must outlive this streambuf and must not be modified while the
+    // streambuf is in use.
+    explicit IOBufAsInputStreamBuf(const IOBuf& buf) : _buf(buf) {}
+
+protected:
+    int_type underflow() override;
+    std::streamsize xsgetn(char* s, std::streamsize n) override;
+    std::streamsize showmanyc() override;
+
+private:
+    const IOBuf& _buf;
+    size_t _block_index{0};
+};
+
+// std::istream view over an IOBuf. The IOBuf must outlive this stream and
+// must not be modified while the stream is in use. Forward-only — seeking
+// is not supported.
+//
+// Typical use is feeding an IOBuf to a parser that takes std::istream&,
+// e.g. nlohmann::json:
+//
+//     butil::IOBufInputStream in(request_body);
+//     auto j = nlohmann::json::parse(in);
+//
+// Or formatted extraction:
+//
+//     butil::IOBuf buf;
+//     buf.append("42 3.14 hello");
+//     butil::IOBufInputStream in(buf);
+//     int i; double d;
+//     std::string s;
+//     in >> i >> d >> s;
+//
+// Bulk read into a buffer (goes through xsgetn, copies one block at a time):
+//
+//     std::string out(buf.length(), '\0');
+//     butil::IOBufInputStream in(buf);
+//     in.read(&out[0], out.size());
+class IOBufInputStream : public std::istream {
+public:
+    // `buf' must outlive this stream and must not be modified while the
+    // stream is in use.
+    explicit IOBufInputStream(const IOBuf& buf)
+        : std::istream(NULL), _sb(buf) {
+        rdbuf(&_sb);
+    }
+
+private:
+    IOBufAsInputStreamBuf _sb;
+};
+
+// Wrap IOBuf into a std::streambuf for std::ostream-based serializers
+// (e.g. nlohmann::json's `os << j`). Bytes are appended directly into IOBuf
+// blocks with no intermediate string copy.
+//
+// Internally backed by IOBufAsZeroCopyOutputStream:
+//  - by default, blocks are taken from the per-thread TLS pool (8KB);
+//  - pass `block_size` to allocate dedicated blocks instead, which avoids
+//    fragmenting the TLS pool when many output streams coexist.
+//
+// Append-only — seekoff/seekpos are not overridden.
+//
+// IMPORTANT: The exact length of the source IOBuf only reflects what was
+// written AFTER shrink() / sync() / destruction — `Next()` over-claims the
+// remainder of each block and the unused tail is BackUp'd later. If you need
+// the precise length mid-stream, call sync() (e.g. via `os.flush()` or
+// `_sb.shrink()`).
+class IOBufAsOutputStreamBuf : public std::streambuf {
+public:
+    // `buf' must outlive this streambuf.
+    explicit IOBufAsOutputStreamBuf(IOBuf& buf) : _zc(&buf) {}
+    IOBufAsOutputStreamBuf(IOBuf& buf, uint32_t block_size)
+        : _zc(&buf, block_size) {}
+    ~IOBufAsOutputStreamBuf() override;
+
+    // Return the unused tail of the current put area to the underlying IOBuf
+    // so that the IOBuf length matches exactly what was written so far.
+    // Automatically invoked from sync() and the destructor.
+    void shrink();
+
+protected:
+    int_type overflow(int_type ch) override;
+    std::streamsize xsputn(const char* s, std::streamsize n) override;
+    int sync() override;
+
+private:
+    bool refresh_put_area();
+
+    IOBufAsZeroCopyOutputStream _zc;
+};
+
+// std::ostream view over an IOBuf. Appends written bytes to the referenced
+// IOBuf; the IOBuf must outlive this stream. Append-only — seeking is not
+// supported.
+//
+// The IOBuf's length only reflects bytes written AFTER flush()/destruction
+// (see IOBufAsOutputStreamBuf for details).
+//
+// Serialize with a library that writes to std::ostream&, e.g. nlohmann::json
+// (zero intermediate string copy — bytes flow straight into IOBuf blocks):
+//
+//     #include <nlohmann/json.hpp>
+//     using nlohmann::json;
+//
+//     json reply = {
+//         {"status", "ok"},
+//         {"items",  {1, 2, 3}},
+//     };
+//
+//     butil::IOBuf out;                          // e.g. 
controller->response_attachment()
+//     {
+//         butil::IOBufOutputStream os(out);
+//         os << reply;                           // compact:  
{"items":[1,2,3],"status":"ok"}
+//         // os << std::setw(2) << reply;        // pretty-print with 2-space 
indent
+//     } // dtor runs shrink(); `out` now has the exact serialized bytes.
+//
+// Formatted insertion (works like any std::ostream):
+//
+//     butil::IOBuf out;
+//     butil::IOBufOutputStream os(out);
+//     os << "x=" << 42 << " y=" << 3.14;
+//     os.flush();                                // commit to `out` now
+//
+// Bulk write of a known-size buffer (goes through xsputn, memcpy per block):
+//
+//     butil::IOBufOutputStream os(out);
+//     os.write(payload.data(), payload.size());
+//
+// Pass `block_size` when many output streams coexist in one thread, to keep
+// each stream's allocations off the shared TLS block pool:
+//
+//     butil::IOBufOutputStream os(out, /*block_size=*/64 * 1024);
+class IOBufOutputStream : public std::ostream {
+public:
+    // `buf' must outlive this stream.
+    explicit IOBufOutputStream(IOBuf& buf)
+        : std::ostream(NULL), _sb(buf) {
+        rdbuf(&_sb);
+    }
+    IOBufOutputStream(IOBuf& buf, uint32_t block_size)
+        : std::ostream(NULL), _sb(buf, block_size) {
+        rdbuf(&_sb);
+    }
+
+private:
+    IOBufAsOutputStreamBuf _sb;
+};
+
 // Wrap IOBuf into input of snappy compression.
 class IOBufAsSnappySource : public butil::snappy::Source {
 public:
diff --git a/test/BUILD.bazel b/test/BUILD.bazel
index 13ef5922..d442d3b7 100644
--- a/test/BUILD.bazel
+++ b/test/BUILD.bazel
@@ -179,12 +179,14 @@ cc_test(
         "test_switches.h",
     ],
     copts = COPTS,
+    defines = ["HAS_NLOHMANN_JSON=1"],
     deps = [
         ":cc_test_proto",
         ":sstream_workaround",
         ":gperftools_helper",
         "//:butil",
         "//:bthread",
+        "@nlohmann_json//:json",
         "@com_google_googletest//:gtest",
     ],
 )
diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp
index 489460e2..82112045 100644
--- a/test/iobuf_unittest.cpp
+++ b/test/iobuf_unittest.cpp
@@ -23,6 +23,9 @@
 #include <stdlib.h>
 #include <memory>
 #include <cstring>
+#if HAS_NLOHMANN_JSON
+#include <nlohmann/json.hpp>
+#endif // HAS_NLOHMANN_JSON
 #include <butil/files/temp_file.h>      // TempFile
 #include <butil/containers/flat_map.h>
 #include <butil/macros.h>
@@ -52,8 +55,8 @@ extern void release_tls_block_chain(IOBuf::Block* b);
 extern uint32_t block_cap(IOBuf::Block const* b);
 extern uint32_t block_size(IOBuf::Block const* b);
 extern IOBuf::Block* get_portal_next(IOBuf::Block const* b);
-}
-}
+} // namespace iobuf
+} // namespace butil
 
 namespace {
 
@@ -1939,11 +1942,315 @@ TEST_F(IOBufTest, single_iobuf) {
     ASSERT_EQ(null_buf, nullptr);
 
     uint32_t old_size = sbuf3.get_length();
-    void *p = sbuf3.reallocate_downward(old_size + 16, 0, old_size); 
+    void *p = sbuf3.reallocate_downward(old_size + 16, 0, old_size);
     ASSERT_TRUE(p != nullptr);
     old_size = sbuf3.get_length();
     p = sbuf3.reallocate_downward(old_size + 16, old_size, 0);
     ASSERT_TRUE(p != nullptr);
 }
 
+TEST_F(IOBufTest, as_input_stream_basic) {
+    butil::IOBuf buf;
+    buf.append("hello world");
+
+    butil::IOBufInputStream stream(buf);
+    std::string s;
+    stream >> s;
+    ASSERT_EQ("hello", s);
+    stream >> s;
+    ASSERT_EQ("world", s);
+    ASSERT_EQ(EOF, stream.peek());
+
+    // Stream construction must not mutate the source IOBuf.
+    ASSERT_EQ("hello world", buf.to_string());
+}
+
+TEST_F(IOBufTest, as_input_stream_empty) {
+    butil::IOBuf buf;
+    butil::IOBufInputStream stream(buf);
+    ASSERT_EQ(EOF, stream.peek());
+    char c;
+    ASSERT_FALSE(stream.get(c));
+    ASSERT_TRUE(stream.eof());
+}
+
+TEST_F(IOBufTest, as_input_stream_accepts_const_iobuf) {
+    butil::IOBuf buf;
+    buf.append("abc");
+    const butil::IOBuf& cbuf = buf;
+    butil::IOBufInputStream stream(cbuf);
+    char c;
+    ASSERT_TRUE(stream.get(c)); ASSERT_EQ('a', c);
+    ASSERT_TRUE(stream.get(c)); ASSERT_EQ('b', c);
+    ASSERT_TRUE(stream.get(c)); ASSERT_EQ('c', c);
+    ASSERT_EQ(EOF, stream.peek());
+}
+
+// Each call to append_user_data adds a separate BlockRef, giving us a
+// multi-block IOBuf that exercises underflow() across block boundaries.
+static void append_as_separate_blocks(butil::IOBuf* buf,
+                                      const std::string& payload,
+                                      size_t chunk) {
+    for (size_t i = 0; i < payload.size(); i += chunk) {
+        const size_t n = std::min(chunk, payload.size() - i);
+        char* p = static_cast<char*>(malloc(n));
+        memcpy(p, payload.data() + i, n);
+        ASSERT_EQ(0, buf->append_user_data(p, n, free));
+    }
+}
+
+TEST_F(IOBufTest, as_input_stream_multi_block_read) {
+    butil::IOBuf buf;
+    const std::string payload = "the quick brown fox jumps over the lazy dog";
+    append_as_separate_blocks(&buf, payload, 7);
+    ASSERT_GT(buf.backing_block_num(), 1u);
+
+    butil::IOBufInputStream stream(buf);
+    std::string got(payload.size(), '\0');
+    stream.read(&got[0], got.size());
+    ASSERT_EQ(static_cast<std::streamsize>(payload.size()), stream.gcount());
+    ASSERT_EQ(payload, got);
+    ASSERT_EQ(EOF, stream.peek());
+}
+
+TEST_F(IOBufTest, as_input_stream_large_payload) {
+    // Payload >> DEFAULT_BLOCK_SIZE (8192) forces multiple blocks even with
+    // a single append call.
+    std::string payload;
+    payload.reserve(100 * 1024);
+    for (int i = 0; i < 100 * 1024; ++i) {
+        payload.push_back(static_cast<char>('a' + (i % 26)));
+    }
+    butil::IOBuf buf;
+    buf.append(payload);
+    ASSERT_GT(buf.backing_block_num(), 1u);
+
+    butil::IOBufInputStream stream(buf);
+    std::string got(payload.size(), '\0');
+    stream.read(&got[0], got.size());
+    ASSERT_EQ(static_cast<std::streamsize>(payload.size()), stream.gcount());
+    ASSERT_EQ(payload, got);
+}
+
+TEST_F(IOBufTest, as_input_stream_get_matches_read) {
+    butil::IOBuf buf;
+    const std::string payload = "the quick brown fox jumps over the lazy dog";
+    append_as_separate_blocks(&buf, payload, 7);
+
+    // Byte-by-byte path (sbumpc).
+    butil::IOBufInputStream s1(buf);
+    std::string got1;
+    char c;
+    while (s1.get(c)) {
+        got1.push_back(c);
+    }
+    ASSERT_EQ(payload, got1);
+
+    // Bulk path (xsgetn).
+    butil::IOBufInputStream s2(buf);
+    std::string got2(payload.size(), '\0');
+    s2.read(&got2[0], got2.size());
+    ASSERT_EQ(static_cast<std::streamsize>(payload.size()), s2.gcount());
+    ASSERT_EQ(payload, got2);
+}
+
+TEST_F(IOBufTest, as_input_stream_short_read_at_eof) {
+    butil::IOBuf buf;
+    buf.append("abcd");
+    butil::IOBufInputStream stream(buf);
+
+    char got[8] = {};
+    stream.read(got, sizeof(got));
+    // istream sets failbit on short read at EOF, but gcount() reflects the
+    // actual number of bytes transferred.
+    ASSERT_EQ(4, stream.gcount());
+    ASSERT_EQ(0, memcmp(got, "abcd", 4));
+    ASSERT_TRUE(stream.eof());
+}
+
+TEST_F(IOBufTest, as_input_stream_in_avail) {
+    butil::IOBuf buf;
+    const std::string parts[] = {"aaa", "bbbb", "ccccc"};
+    size_t total = 0;
+    for (size_t i = 0; i < arraysize(parts); ++i) {
+        char* p = static_cast<char*>(malloc(parts[i].size()));
+        memcpy(p, parts[i].data(), parts[i].size());
+        ASSERT_EQ(0, buf.append_user_data(p, parts[i].size(), free));
+        total += parts[i].size();
+    }
+
+    butil::IOBufInputStream stream(buf);
+    // get area is empty, so in_avail() defers to showmanyc() which must sum
+    // all remaining backing blocks.
+    ASSERT_EQ(static_cast<std::streamsize>(total), stream.rdbuf()->in_avail());
+}
+
+TEST_F(IOBufTest, as_output_stream_basic) {
+    butil::IOBuf buf;
+    {
+        butil::IOBufOutputStream stream(buf);
+        stream << "hello " << 42 << ' ' << 3.5;
+    } // dtor calls shrink()
+    ASSERT_EQ("hello 42 3.5", buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_appends_not_overwrites) {
+    butil::IOBuf buf;
+    buf.append("prefix:");
+    {
+        butil::IOBufOutputStream stream(buf);
+        stream << "payload";
+    }
+    ASSERT_EQ("prefix:payload", buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_large_payload) {
+    // Cross multiple blocks (DEFAULT_BLOCK_SIZE == 8192).
+    std::string payload;
+    payload.reserve(100 * 1024);
+    for (int i = 0; i < 100 * 1024; ++i) {
+        payload.push_back(static_cast<char>('a' + (i % 26)));
+    }
+    butil::IOBuf buf;
+    {
+        butil::IOBufOutputStream stream(buf);
+        stream.write(payload.data(), payload.size());
+        ASSERT_TRUE(stream.good());
+    }
+    ASSERT_GT(buf.backing_block_num(), 1u);
+    ASSERT_EQ(payload, buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_xsputn_matches_overflow) {
+    // Same payload, two write paths: bulk write() vs per-byte put().
+    const std::string payload = "the quick brown fox jumps over the lazy dog "
+                                "0123456789 alpha beta gamma";
+    butil::IOBuf bulk_buf;
+    {
+        butil::IOBufOutputStream s(bulk_buf);
+        s.write(payload.data(), payload.size());
+    }
+    butil::IOBuf byte_buf;
+    {
+        butil::IOBufOutputStream s(byte_buf);
+        for (char c : payload) {
+            s.put(c);
+        }
+    }
+    ASSERT_EQ(payload, bulk_buf.to_string());
+    ASSERT_EQ(payload, byte_buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_flush_shrinks_eagerly) {
+    // Without flush(), IOBuf::length() may exceed bytes-written because Next()
+    // over-claims the rest of the current block. flush() must reconcile it.
+    butil::IOBuf buf;
+    butil::IOBufOutputStream stream(buf);
+    stream << "abc";
+    stream.flush();
+    ASSERT_EQ(3u, buf.length());
+    ASSERT_EQ("abc", buf.to_string());
+
+    stream << "defg";
+    stream.flush();
+    ASSERT_EQ(7u, buf.length());
+    ASSERT_EQ("abcdefg", buf.to_string());
+}
+
+TEST_F(IOBufTest, as_output_stream_dedicated_block_size) {
+    // Passing block_size routes through create_block instead of TLS pool.
+    // Pick a small-but-valid block to force many allocations.
+    butil::IOBuf buf;
+    const std::string payload(4096, 'z');
+    {
+        butil::IOBufOutputStream stream(buf, /*block_size=*/256);
+        stream.write(payload.data(), payload.size());
+    }
+    ASSERT_EQ(payload, buf.to_string());
+    ASSERT_GT(buf.backing_block_num(), 1u);
+}
+
+TEST_F(IOBufTest, as_output_stream_round_trip_with_input_stream) {
+    // Write through OutputStream, read back through InputStream.
+    butil::IOBuf buf;
+    {
+        butil::IOBufOutputStream out(buf);
+        for (int i = 0; i < 1000; ++i) {
+            out << i << '\n';
+        }
+    }
+    butil::IOBufInputStream in(buf);
+    for (int i = 0; i < 1000; ++i) {
+        int v = -1;
+        in >> v;
+        ASSERT_EQ(i, v);
+    }
+}
+
+#if HAS_NLOHMANN_JSON
+// End-to-end test that the IOBuf <-> std::iostream adapters work with
+// nlohmann::json — the canonical "RPC handler reads JSON from an IOBuf body
+// and writes a JSON reply back to another IOBuf" flow.
+TEST_F(IOBufTest, as_stream_nlohmann_json_round_trip) {
+    // 1. Serialize a JSON object into an IOBuf via IOBufOutputStream.
+    nlohmann::json reply = {
+        {"status", "ok"},
+        {"code",   200},
+        {"items",  {1, 2, 3, 4, 5}},
+        {"nested", {{"a", "alpha"}, {"b", "beta"}}},
+    };
+    butil::IOBuf out;
+    {
+        butil::IOBufOutputStream os(out);
+        os << reply;
+    } // dtor runs shrink(); `out` now holds exactly the serialized bytes.
+
+    ASSERT_EQ(reply.dump(), out.to_string());
+
+    // 2. Parse the IOBuf back through IOBufInputStream and verify roundtrip.
+    butil::IOBufInputStream in(out);
+    nlohmann::json parsed = nlohmann::json::parse(in);
+    ASSERT_EQ(reply, parsed);
+    ASSERT_EQ("ok", parsed["status"]);
+    ASSERT_EQ(200, parsed["code"]);
+    ASSERT_EQ(5u, parsed["items"].size());
+    ASSERT_EQ("alpha", parsed["nested"]["a"]);
+
+    // 3. Pretty-print via std::setw, then re-parse — verifies formatting flags
+    // propagate through IOBufAsOutputStreamBuf correctly.
+    butil::IOBuf pretty;
+    {
+        butil::IOBufOutputStream os(pretty);
+        os << std::setw(2) << reply;
+    }
+    ASSERT_EQ(reply.dump(2), pretty.to_string());
+    butil::IOBufInputStream pretty_in(pretty);
+    ASSERT_EQ(reply, nlohmann::json::parse(pretty_in));
+}
+
+TEST_F(IOBufTest, as_stream_nlohmann_json_large_array) {
+    // Build a payload large enough to span multiple IOBuf blocks
+    // (DEFAULT_BLOCK_SIZE == 8192) and exercise xsputn/xsgetn across
+    // block boundaries.
+    nlohmann::json arr = nlohmann::json::array();
+    for (int i = 0; i < 5000; ++i) {
+        arr.push_back({{"i", i}, {"sq", i * i}});
+    }
+
+    butil::IOBuf buf;
+    {
+        butil::IOBufOutputStream os(buf);
+        os << arr;
+    }
+    ASSERT_GT(buf.backing_block_num(), 1u) << "payload should span >1 block";
+    ASSERT_EQ(arr.dump(), buf.to_string());
+
+    butil::IOBufInputStream in(buf);
+    nlohmann::json parsed = nlohmann::json::parse(in);
+    ASSERT_EQ(arr, parsed);
+    ASSERT_EQ(5000u, parsed.size());
+    ASSERT_EQ(4999, parsed[4999]["i"]);
+    ASSERT_EQ(4999 * 4999, parsed[4999]["sq"]);
+}
+#endif // HAS_NLOHMANN_JSON
 } // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to