Hi in my use case I would like to read messages from a stream, selectively 
store  certain structs from some messages in a dictionary for later use. 
Unfortunately I cannot get it to work without leaking memory.  The total 
number of messages in the dictionary is bounded, as new messages with the 
same key would replace the old message. (Actually valgrind doesn't think it 
is leaking, but I can see my memory usage is increasing indefinitely).

Test code

messages are stored in a vector `data`


struct ProcessStream {
    using CapnpMsg = capnp_utils::CapnpMessage<Bar>;
    std::vector<std::shared_ptr<CapnpMsg>> data;


    bool process_message(
        kj::BufferedInputStreamWrapper& buffered_stream, kj::Array<capnp::
word>& scratch
    ) {
        capnp::InputStreamMessageReader reader(buffered_stream, capnp::
ReaderOptions(), scratch.asPtr());


        const auto ts_msg = reader.getRoot<Foo>();
        auto mir_msg = ts_msg.getBar();
        auto capnp_msg = std::make_shared<CapnpMsg>(mir_msg);
        data.emplace_back(capnp_msg);
        if (data.size() > 1000000) {
            return false;
        }


        return true;
    }


    void start() {
        // 1 MB of scratch.
        kj::Array<capnp::word> scratch = kj::heapArray<capnp::word>(1024 * 
1024 / sizeof(capnp::word));
        kj::FdInputStream fd_stream(fileno(stdin));
        kj::BufferedInputStreamWrapper buffered_stream(fd_stream);


        while (buffered_stream.tryGetReadBuffer() != nullptr) {
            if (!process_message(buffered_stream, scratch)) {
                break;
            }
        }
    }
};



void foo() {
    ProcessStream stream;
    stream.start();
    std::cout << "A" << std::endl;
    usleep(1000 * 1000 * 5);
    stream.data.clear();
}


int main(int argc, char* argv[]) {


    foo();
    std::cout << "B" << std::endl;
    usleep(1000 * 1000 * 10);
}


The following are the 3 ways to use a wrapper class to store the messages

 



namespace capnp_utils {

    /// 1 & 2 takes 10x more memory.

    template<typename R>
    class CapnpMessage2 {
        // TODO: Should be private, but leave it public for debugging.
        public:
            typename R::Reader msg_reader;
            std::unique_ptr<capnp::MallocMessageBuilder> msg_builder;

        public:
            CapnpMessage2(const typename R::Reader& reader) {
                msg_builder = std::make_unique<capnp::MallocMessageBuilder
>();
                msg_builder->setRoot(reader);
                msg_reader = msg_builder->getRoot<R>().asReader();
            }


            const typename R::Reader& get() const {
                return msg_reader;
            }
    };


    template<typename R>
    class CapnpMessage1 {

        public:
            typename R::Reader struct_reader;
            std::unique_ptr<capnp::MallocMessageBuilder> msg_builder;
        public:

            CapnpMessage1(const typename R::Reader& reader) {
                capnp::MallocMessageBuilder tmp;
                tmp.setRoot(reader);
                const auto raw = capnp::messageToFlatArray(tmp);


                msg_builder = std::make_unique<capnp::MallocMessageBuilder
>();
                capnp::initMessageBuilderFromFlatArrayCopy(raw, *msg_builder
);
                struct_reader = msg_builder->getRoot<R>().asReader();
            }


            const typename R::Reader& get() const {
                return struct_reader;
            }
    };




    template<typename R>
    class CapnpMessage {

        public:
            typename R::Reader struct_reader;
            // `struct_reader` is only valid when `msg_reader` is alive.
            std::unique_ptr<capnp::FlatArrayMessageReader> msg_reader;
            kj::Array<capnp::word> raw;
        public:


            CapnpMessage(const typename R::Reader& reader) {
                capnp::MallocMessageBuilder msg_builder;
                msg_builder.setRoot(reader);
                raw = capnp::messageToFlatArray(msg_builder);
                msg_reader = std::make_unique<capnp::FlatArrayMessageReader
>(raw);
                struct_reader = msg_reader->getRoot<R>();
            }

            const typename R::Reader& get() const {
                return struct_reader;
            }
    };

}
 
`CapnpMessage2` copies the message once, while `CapnpMessage1` and 
`CapnpMessage` both copy the message twice.

In terms of memory usage:
                                   A                    B
CapnpMessage2      8152MB      8136MB
CapnpMessage1      8166MB      8151MB
CapnpMessage        640MB        625MB


It looks that `CapnpMessage1` & `CapnpMessage2` use far more memory than 
`CapnpMessage`. Is it because the builder has reserved too much space even 
though I only need it to be as big as the reader?
The important part is that in all 3 cases, memory are still used at time B 
although  `data` is cleared. (Even if I reset the `unique_ptr` inside the 
wrapper class).

Is it somehow related to memory fragmentation?

What is the best approach in this case? I would very much appreciate any 
help to reduce the steady memory increase.

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
Visit this group at https://groups.google.com/group/capnproto.

Reply via email to