As a separate example, please take a look at the source and comments in https://github.com/cliffjansen/senderciser. The overall topology is different, but the recovery concerns may be illustrative. Regarding your specific questions:
1. This appears to be the broker angry with something. I am not familiar with the specific behaviour it is expecting from the client. 2. Handlers are meant to flexible so its your choice: multiple or single connections, sessions, sender/receivers per handler. If more than one connection share a single handler, you have to be careful of threading issues. The senderciser example does things two different ways and handles the thread safety issues. 3. open_receiver will create a new connection for the receiver. To group senders/receivers under a single connection or session, create them from those objects, possibly on a call back (eg on_session_open, on_connection_open) or a work_queue. 4. Presumably you have turned the wifi back on before an idle_timeout has killed the connection. Note that the receiver error is not fatal to the connection and you would have to decide the best course of action for you: force a reconnect or recover some other way. As an aside, the use of connection.open_receiver() is not thread safe and should be made on a callback of that connection (or perhaps from its work_queue) and not on an arbitrary thread (or alternate work_queue). Cliff On Wed, Mar 26, 2025 at 10:25 AM Roberto DeFeo <rde...@gmail.com> wrote: > Hi, > > Using Qpid Proton C++ 0.39.0 on Windows, I am trying to deal with network > disconnects/reconnects. The topology is a single server that sends on > multiple topics. There are multiple client apps, each on a different host, > acting as consumers/receivers - each client app is setup to only receive > messages filtered for themselves. I can't get the reconnect logic to work > successfully. > > App structure: (full source at the end) > ============== > class msg_handler : proton::messaging_handler > - on_receiver methods - just log > class client : proton::messaging_handler > - on_container_start > - calls container.connect with just server/port for the connection url > and passes in auth args to connection_options, sets handler to itself, > reconnect(default opts + delay/max_delay) > - saves the connection handle returned by connect() for later use > - on_transport_*, on_connection_* methods - just log > - subscribe(topic) method > - calls connection.open_receiver with unique msg_handler per topic, sets > filters, durability > > main() > - create client > - create container(client), call auto_stop(false) > - create work_queue(container) > - start new thread to call container.run() > - work_queue.add( client.subscribe(topic1) ) > - work_queue.add( client.subscribe(topic2) ) > - idle loop > > Running: > ======== > 13:25:42.896 : on_container_start > 13:25:43.124 : on_transport_open > 13:25:43.143 : on_connection_open > 13:25:43.900 : subscribing to Topic1 > 13:25:43.900 : subscribing to Topic2 > 13:25:58.194 : on_receiver_open : Topic1 > 13:25:58.220 : on_receiver_open : Topic2 > 13:26:32.398 : on_transport_error <-- WiFi turned off just > before here > 13:26:32.402 : on_transport_error > 13:26:32.912 : on_transport_error > 13:26:33.581 : on_transport_error > 13:26:34.736 : on_transport_error > 13:26:36.137 : on_transport_error <-- WiFi turned back on > around here > 13:26:37.936 : on_transport_error > 13:26:40.477 : on_transport_open > 13:26:40.478 : on_connection_open > 13:26:40.478 : on_receiver_open : Topic1 > 13:26:40.478 : on_receiver_open : Topic2 > 13:26:40.479 : on_receiver_error : Topic1 - AMQ219005: error creating > consumer, AMQ119200: Maximum Consumer Limit Reached on > > Queue:(address=jms.topic.Topic1,queue=5cdf17ed-185e-4939-a27c-c460b5f6b9fe.ROBERTO) > 13:26:40.479 : on_receiver_close : Topic1 > 13:26:40.480 : on_receiver_error : Topic2 - AMQ219005: error creating > consumer, AMQ119200: Maximum Consumer Limit Reached on > > Queue:(address=jms.topic.Topic2,queue=5cdf17ed-185e-4939-a27c-c460b5f6b9fe.ROBERTOTopic2) > 13:26:40.480 : on_receiver_close : Topic2 > 13:26:48.329 : CTRL-C shutting down! > 13:26:48.339 : on_transport_error > 13:26:48.359 : on_container_stop > > 1. The reconnect logic seems to work fine, in that the proton library > automatically tries to re-open the receivers. However, we hit the Max > Consumer Limit error and they shutdown. Why? Is my app's overall structure > correct for dealing with multiple topics from the same server? > 2. What is the recommended way to use handlers for multiple receivers? (My > error occurs whether using a single handler for both topics vs a handler > per topic.) > 3. Do I even need to call connect() and save the handle? Seems like I can > also just call container.open_receiver(), and the addr_url will be the same > server:port, but a different path that reflects the second topic name. Will > it re-use the same underlying connection? > 4. If I turn off WiFi after the receivers are open, but turn it back on > before I receive my first on_transport_error, no other callbacks are > called, so I assume everything is good? > > Thank you for your help! > Roberto > > rde...@gmail.com > Roberto De Feo > > Full source of example (w/ server/user/pass removed): > #include "stdafx.h" > > #include <Windows.h> > #include <proton/connection_options.hpp> > #include <proton/duration.hpp> > #include <proton/receiver_options.hpp> > #include <proton/source_options.hpp> > #include <proton/transport.hpp> > #include <proton/connection.hpp> > #include <proton/messaging_handler.hpp> > #include <proton/work_queue.hpp> > #include <proton/ssl.hpp> > #include <proton/container.hpp> > #include <proton/reconnect_options.hpp> > #include <iostream> > #include <thread> > #include <unordered_map> > > namespace { > proton::container* g_container; > proton::work_queue* g_work_queue; > bool running = false; > > auto NOW() { > SYSTEMTIME st; > GetSystemTime(&st); > char buf[32] = ""; > sprintf(buf, "%.2d:%.2d:%.2d.%.3d : ", st.wHour, st.wMinute, > st.wSecond, st.wMilliseconds); > return std::string(buf); > } > } > > class msg_handler : public proton::messaging_handler { > private: > std::string topic; > > public: > msg_handler(const std::string& t) : topic(t) {} > > void on_receiver_open(proton::receiver& r) override { > std::cout << NOW() << "on_receiver_open : " << topic << std::endl; > } > void on_receiver_error(proton::receiver& r) override { > std::cout << NOW() << "on_receiver_error : " << topic << " - " << > r.error().description() << std::endl; > } > void on_receiver_close(proton::receiver& r) override { > std::cout << NOW() << "on_receiver_close : " << topic << std::endl; > } > void on_message(proton::delivery &d, proton::message &msg) override { > (void)0; // do nothing > } > }; > > class client : public proton::messaging_handler { > private: > proton::connection connection; > std::string url; > std::string user; > std::string password; > std::unordered_map<std::string, std::unique_ptr<msg_handler>> handlers; > > public: > client(const std::string &s, const std::string &u, const std::string &p) > : > url(s), user(u), password(p) {} > > void on_container_start(proton::container &container) override { > std::cout << NOW() << "on_container_start" << std::endl; > connection = container.connect(url, > proton::connection_options() > .idle_timeout(proton::duration::MINUTE * 2) > .ssl_client_options(proton::ssl_client_options("lmss:root", > proton::ssl::verify_mode::VERIFY_PEER)) > .sasl_allow_insecure_mechs(true).sasl_allowed_mechs("PLAIN") > .user(user).password(password) > .reconnect(proton::reconnect_options() > .delay(proton::duration::MILLISECOND * 500) > .max_delay(proton::duration::SECOND * 2)) > .handler(*this) > ); > } > void on_container_stop(proton::container &container) override { > std::cout << NOW() << "on_container_stop" << std::endl; > ::running = false; > } > void on_connection_open(proton::connection &c) override { > std::cout << NOW() << "on_connection_open" << std::endl; > } > void on_connection_error(proton::connection &c) override { > std::cout << NOW() << "on_connection_error" << std::endl; > } > void on_connection_close(proton::connection &c) override { > std::cout << NOW() << "on_connection_close" << std::endl; > } > void on_transport_open(proton::transport &c) override { > std::cout << NOW() << "on_transport_open" << std::endl; > } > void on_transport_error(proton::transport &c) override { > std::cout << NOW() << "on_transport_error" << std::endl; > } > void on_transport_close(proton::transport &c) override { > std::cout << NOW() << "on_transport_close" << std::endl; > } > > void subscribe(const std::string& topic) { > std::cout << NOW() << "subscribing to " << topic << std::endl; > > handlers.emplace(topic, std::make_unique<msg_handler>(topic)); > > proton::source::filter_map filterMap; > const std::string filterString = "(originator <> '" + user + "' AND > filter = 'N') OR " + user + " = 'Y'"; > proton::value filterValue; > proton::codec::encoder enc(filterValue); > enc << proton::codec::start::described() > << proton::symbol("apache.org:selector-filter:string") > << filterString > << proton::codec::finish(); > filterMap.put(proton::symbol("selector"), filterValue); > > const std::string connection_name{ (topic == "Topic1") ? "" : topic > }; > connection.open_receiver("jms.topic." + topic, > proton::receiver_options() > .name(user + connection_name) > .source(proton::source_options() > .durability_mode(proton::terminus().CONFIGURATION) > .filters(filterMap)) > .credit_window(1000) > .handler(*handlers.at(topic))); > } > }; > > int main() { > std::string address("my_server_addr:port"); > std::string user("ROBERTO"); > std::string password("password"); > > client cl(address, user, password); > proton::container container(cl); > proton::work_queue work_queue(container); > container.auto_stop(false); > > // we can't use the lambda capture when using a simple function ptr, > // so we use a global ptr for our console handler > g_container = &container; > g_work_queue = &work_queue; > SetConsoleCtrlHandler([](DWORD signal) { > if (signal == CTRL_C_EVENT) { > std::cout << NOW() << "CTRL-C shutting down!" << std::endl; > g_work_queue->add([&]() { g_container->stop(); }); > } > return TRUE; > }, TRUE); > > auto workerThread = std::thread([&]() { container.run(); }); > > work_queue.add([&]() { cl.subscribe("Topic1"); }); > work_queue.add([&]() { cl.subscribe("Topic2"); }); > > ::running = true; > while (::running) { // idle loop > std::this_thread::sleep_for(std::chrono::seconds(1)); > } > return 0; > } >