As a separate example, please take a look at the source and comments in
https://github.com/cliffjansen/senderciser.  The overall topology is
different, but the recovery concerns may be illustrative.  Regarding your
specific questions:

1. This appears to be the broker angry with something.  I am not familiar
with the specific behaviour it is expecting from the client.

2. Handlers are meant to flexible so its your choice: multiple or single
connections, sessions, sender/receivers per handler.  If more than one
connection share a single handler, you have to be careful of threading
issues.  The senderciser example does things two different ways and handles
the thread safety issues.

3. open_receiver will create a new connection for the receiver.  To group
senders/receivers under a single connection or session, create them from
those objects, possibly on a call back (eg on_session_open,
on_connection_open) or a work_queue.

4. Presumably you have turned the wifi back on before an idle_timeout has
killed the connection.  Note that the receiver error is not fatal to the
connection and you would have to decide the best course of action for you:
force a reconnect or recover some other way.

As an aside, the use of connection.open_receiver() is not thread safe and
should be made on a callback of that connection (or perhaps from its
work_queue) and not on an arbitrary thread (or alternate work_queue).

Cliff

On Wed, Mar 26, 2025 at 10:25 AM Roberto DeFeo <rde...@gmail.com> wrote:

> Hi,
>
> Using Qpid Proton C++ 0.39.0 on Windows, I am trying to deal with network
> disconnects/reconnects. The topology is a single server that sends on
> multiple topics. There are multiple client apps, each on a different host,
> acting as consumers/receivers - each client app is setup to only receive
> messages filtered for themselves. I can't get the reconnect logic to work
> successfully.
>
> App structure: (full source at the end)
> ==============
> class msg_handler : proton::messaging_handler
> - on_receiver methods - just log
> class client : proton::messaging_handler
> - on_container_start
>   - calls container.connect with just server/port for the connection url
> and passes in auth args to connection_options, sets handler to itself,
> reconnect(default opts + delay/max_delay)
>   - saves the connection handle returned by connect() for later use
> - on_transport_*, on_connection_* methods - just log
> - subscribe(topic) method
>   - calls connection.open_receiver with unique msg_handler per topic, sets
> filters, durability
>
> main()
> - create client
> - create container(client), call auto_stop(false)
> - create work_queue(container)
> - start new thread to call container.run()
> - work_queue.add( client.subscribe(topic1) )
> - work_queue.add( client.subscribe(topic2) )
> - idle loop
>
> Running:
> ========
> 13:25:42.896 : on_container_start
> 13:25:43.124 : on_transport_open
> 13:25:43.143 : on_connection_open
> 13:25:43.900 : subscribing to Topic1
> 13:25:43.900 : subscribing to Topic2
> 13:25:58.194 : on_receiver_open : Topic1
> 13:25:58.220 : on_receiver_open : Topic2
> 13:26:32.398 : on_transport_error              <-- WiFi turned off just
> before here
> 13:26:32.402 : on_transport_error
> 13:26:32.912 : on_transport_error
> 13:26:33.581 : on_transport_error
> 13:26:34.736 : on_transport_error
> 13:26:36.137 : on_transport_error              <-- WiFi turned back on
> around here
> 13:26:37.936 : on_transport_error
> 13:26:40.477 : on_transport_open
> 13:26:40.478 : on_connection_open
> 13:26:40.478 : on_receiver_open : Topic1
> 13:26:40.478 : on_receiver_open : Topic2
> 13:26:40.479 : on_receiver_error : Topic1 - AMQ219005: error creating
> consumer, AMQ119200: Maximum Consumer Limit Reached on
>
> Queue:(address=jms.topic.Topic1,queue=5cdf17ed-185e-4939-a27c-c460b5f6b9fe.ROBERTO)
> 13:26:40.479 : on_receiver_close : Topic1
> 13:26:40.480 : on_receiver_error : Topic2 - AMQ219005: error creating
> consumer, AMQ119200: Maximum Consumer Limit Reached on
>
> Queue:(address=jms.topic.Topic2,queue=5cdf17ed-185e-4939-a27c-c460b5f6b9fe.ROBERTOTopic2)
> 13:26:40.480 : on_receiver_close : Topic2
> 13:26:48.329 : CTRL-C shutting down!
> 13:26:48.339 : on_transport_error
> 13:26:48.359 : on_container_stop
>
> 1. The reconnect logic seems to work fine, in that the proton library
> automatically tries to re-open the receivers. However, we hit the Max
> Consumer Limit error and they shutdown. Why? Is my app's overall structure
> correct for dealing with multiple topics from the same server?
> 2. What is the recommended way to use handlers for multiple receivers? (My
> error occurs whether using a single handler for both topics vs a handler
> per topic.)
> 3. Do I even need to call connect() and save the handle? Seems like I can
> also just call container.open_receiver(), and the addr_url will be the same
> server:port, but a different path that reflects the second topic name. Will
> it re-use the same underlying connection?
> 4. If I turn off WiFi after the receivers are open, but turn it back on
> before I receive my first on_transport_error, no other callbacks are
> called, so I assume everything is good?
>
> Thank you for your help!
> Roberto
>
> rde...@gmail.com
> Roberto De Feo
>
> Full source of example (w/ server/user/pass removed):
> #include "stdafx.h"
>
> #include <Windows.h>
> #include <proton/connection_options.hpp>
> #include <proton/duration.hpp>
> #include <proton/receiver_options.hpp>
> #include <proton/source_options.hpp>
> #include <proton/transport.hpp>
> #include <proton/connection.hpp>
> #include <proton/messaging_handler.hpp>
> #include <proton/work_queue.hpp>
> #include <proton/ssl.hpp>
> #include <proton/container.hpp>
> #include <proton/reconnect_options.hpp>
> #include <iostream>
> #include <thread>
> #include <unordered_map>
>
> namespace {
>    proton::container* g_container;
>    proton::work_queue* g_work_queue;
>    bool running = false;
>
>    auto NOW() {
>       SYSTEMTIME st;
>       GetSystemTime(&st);
>       char buf[32] = "";
>       sprintf(buf, "%.2d:%.2d:%.2d.%.3d : ", st.wHour, st.wMinute,
> st.wSecond, st.wMilliseconds);
>       return std::string(buf);
>    }
> }
>
> class msg_handler : public proton::messaging_handler {
> private:
>    std::string topic;
>
> public:
>    msg_handler(const std::string& t) : topic(t) {}
>
>    void on_receiver_open(proton::receiver& r) override {
>       std::cout << NOW() << "on_receiver_open : " << topic << std::endl;
>    }
>    void on_receiver_error(proton::receiver& r) override {
>       std::cout << NOW() << "on_receiver_error : " << topic << " - " <<
> r.error().description() << std::endl;
>    }
>    void on_receiver_close(proton::receiver& r) override {
>       std::cout << NOW() << "on_receiver_close : " << topic << std::endl;
>    }
>    void on_message(proton::delivery &d, proton::message &msg) override {
>       (void)0; // do nothing
>    }
> };
>
> class client : public proton::messaging_handler {
> private:
>    proton::connection connection;
>    std::string url;
>    std::string user;
>    std::string password;
>    std::unordered_map<std::string, std::unique_ptr<msg_handler>> handlers;
>
> public:
>    client(const std::string &s, const std::string &u, const std::string &p)
> :
>       url(s), user(u), password(p) {}
>
>    void on_container_start(proton::container &container) override {
>       std::cout << NOW() << "on_container_start" << std::endl;
>       connection = container.connect(url,
>          proton::connection_options()
>          .idle_timeout(proton::duration::MINUTE * 2)
>          .ssl_client_options(proton::ssl_client_options("lmss:root",
> proton::ssl::verify_mode::VERIFY_PEER))
>          .sasl_allow_insecure_mechs(true).sasl_allowed_mechs("PLAIN")
>          .user(user).password(password)
>          .reconnect(proton::reconnect_options()
>             .delay(proton::duration::MILLISECOND * 500)
>             .max_delay(proton::duration::SECOND * 2))
>          .handler(*this)
>       );
>    }
>    void on_container_stop(proton::container &container) override {
>       std::cout << NOW() << "on_container_stop" << std::endl;
>       ::running = false;
>    }
>    void on_connection_open(proton::connection &c) override {
>       std::cout << NOW() << "on_connection_open" << std::endl;
>    }
>    void on_connection_error(proton::connection &c) override {
>       std::cout << NOW() << "on_connection_error" << std::endl;
>    }
>    void on_connection_close(proton::connection &c) override {
>       std::cout << NOW() << "on_connection_close" << std::endl;
>    }
>    void on_transport_open(proton::transport &c) override {
>       std::cout << NOW() << "on_transport_open" << std::endl;
>    }
>    void on_transport_error(proton::transport &c) override {
>       std::cout << NOW() << "on_transport_error" << std::endl;
>    }
>    void on_transport_close(proton::transport &c) override {
>       std::cout << NOW() << "on_transport_close" << std::endl;
>    }
>
>    void subscribe(const std::string& topic) {
>       std::cout << NOW() << "subscribing to " << topic << std::endl;
>
>       handlers.emplace(topic, std::make_unique<msg_handler>(topic));
>
>       proton::source::filter_map filterMap;
>       const std::string filterString = "(originator <> '" + user + "' AND
> filter = 'N') OR " + user + " = 'Y'";
>       proton::value filterValue;
>       proton::codec::encoder enc(filterValue);
>       enc << proton::codec::start::described()
>          << proton::symbol("apache.org:selector-filter:string")
>          << filterString
>          << proton::codec::finish();
>       filterMap.put(proton::symbol("selector"), filterValue);
>
>       const std::string connection_name{ (topic == "Topic1") ? "" : topic
> };
>       connection.open_receiver("jms.topic." + topic,
>          proton::receiver_options()
>          .name(user + connection_name)
>          .source(proton::source_options()
>             .durability_mode(proton::terminus().CONFIGURATION)
>             .filters(filterMap))
>          .credit_window(1000)
>          .handler(*handlers.at(topic)));
>    }
> };
>
> int main() {
>    std::string address("my_server_addr:port");
>    std::string user("ROBERTO");
>    std::string password("password");
>
>    client cl(address, user, password);
>    proton::container container(cl);
>    proton::work_queue work_queue(container);
>    container.auto_stop(false);
>
>    // we can't use the lambda capture when using a simple function ptr,
>    // so we use a global ptr for our console handler
>    g_container = &container;
>    g_work_queue = &work_queue;
>    SetConsoleCtrlHandler([](DWORD signal) {
>       if (signal == CTRL_C_EVENT) {
>          std::cout << NOW() << "CTRL-C shutting down!" << std::endl;
>          g_work_queue->add([&]() { g_container->stop(); });
>       }
>       return TRUE;
>    }, TRUE);
>
>    auto workerThread = std::thread([&]() { container.run(); });
>
>    work_queue.add([&]() { cl.subscribe("Topic1"); });
>    work_queue.add([&]() { cl.subscribe("Topic2"); });
>
>    ::running = true;
>    while (::running) { // idle loop
>       std::this_thread::sleep_for(std::chrono::seconds(1));
>    }
>    return 0;
> }
>

Reply via email to