Hi, Using Qpid Proton C++ 0.39.0 on Windows, I am trying to deal with network disconnects/reconnects. The topology is a single server that sends on multiple topics. There are multiple client apps, each on a different host, acting as consumers/receivers - each client app is setup to only receive messages filtered for themselves. I can't get the reconnect logic to work successfully.
App structure: (full source at the end) ============== class msg_handler : proton::messaging_handler - on_receiver methods - just log class client : proton::messaging_handler - on_container_start - calls container.connect with just server/port for the connection url and passes in auth args to connection_options, sets handler to itself, reconnect(default opts + delay/max_delay) - saves the connection handle returned by connect() for later use - on_transport_*, on_connection_* methods - just log - subscribe(topic) method - calls connection.open_receiver with unique msg_handler per topic, sets filters, durability main() - create client - create container(client), call auto_stop(false) - create work_queue(container) - start new thread to call container.run() - work_queue.add( client.subscribe(topic1) ) - work_queue.add( client.subscribe(topic2) ) - idle loop Running: ======== 13:25:42.896 : on_container_start 13:25:43.124 : on_transport_open 13:25:43.143 : on_connection_open 13:25:43.900 : subscribing to Topic1 13:25:43.900 : subscribing to Topic2 13:25:58.194 : on_receiver_open : Topic1 13:25:58.220 : on_receiver_open : Topic2 13:26:32.398 : on_transport_error <-- WiFi turned off just before here 13:26:32.402 : on_transport_error 13:26:32.912 : on_transport_error 13:26:33.581 : on_transport_error 13:26:34.736 : on_transport_error 13:26:36.137 : on_transport_error <-- WiFi turned back on around here 13:26:37.936 : on_transport_error 13:26:40.477 : on_transport_open 13:26:40.478 : on_connection_open 13:26:40.478 : on_receiver_open : Topic1 13:26:40.478 : on_receiver_open : Topic2 13:26:40.479 : on_receiver_error : Topic1 - AMQ219005: error creating consumer, AMQ119200: Maximum Consumer Limit Reached on Queue:(address=jms.topic.Topic1,queue=5cdf17ed-185e-4939-a27c-c460b5f6b9fe.ROBERTO) 13:26:40.479 : on_receiver_close : Topic1 13:26:40.480 : on_receiver_error : Topic2 - AMQ219005: error creating consumer, AMQ119200: Maximum Consumer Limit Reached on Queue:(address=jms.topic.Topic2,queue=5cdf17ed-185e-4939-a27c-c460b5f6b9fe.ROBERTOTopic2) 13:26:40.480 : on_receiver_close : Topic2 13:26:48.329 : CTRL-C shutting down! 13:26:48.339 : on_transport_error 13:26:48.359 : on_container_stop 1. The reconnect logic seems to work fine, in that the proton library automatically tries to re-open the receivers. However, we hit the Max Consumer Limit error and they shutdown. Why? Is my app's overall structure correct for dealing with multiple topics from the same server? 2. What is the recommended way to use handlers for multiple receivers? (My error occurs whether using a single handler for both topics vs a handler per topic.) 3. Do I even need to call connect() and save the handle? Seems like I can also just call container.open_receiver(), and the addr_url will be the same server:port, but a different path that reflects the second topic name. Will it re-use the same underlying connection? 4. If I turn off WiFi after the receivers are open, but turn it back on before I receive my first on_transport_error, no other callbacks are called, so I assume everything is good? Thank you for your help! Roberto rde...@gmail.com Roberto De Feo Full source of example (w/ server/user/pass removed): #include "stdafx.h" #include <Windows.h> #include <proton/connection_options.hpp> #include <proton/duration.hpp> #include <proton/receiver_options.hpp> #include <proton/source_options.hpp> #include <proton/transport.hpp> #include <proton/connection.hpp> #include <proton/messaging_handler.hpp> #include <proton/work_queue.hpp> #include <proton/ssl.hpp> #include <proton/container.hpp> #include <proton/reconnect_options.hpp> #include <iostream> #include <thread> #include <unordered_map> namespace { proton::container* g_container; proton::work_queue* g_work_queue; bool running = false; auto NOW() { SYSTEMTIME st; GetSystemTime(&st); char buf[32] = ""; sprintf(buf, "%.2d:%.2d:%.2d.%.3d : ", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); return std::string(buf); } } class msg_handler : public proton::messaging_handler { private: std::string topic; public: msg_handler(const std::string& t) : topic(t) {} void on_receiver_open(proton::receiver& r) override { std::cout << NOW() << "on_receiver_open : " << topic << std::endl; } void on_receiver_error(proton::receiver& r) override { std::cout << NOW() << "on_receiver_error : " << topic << " - " << r.error().description() << std::endl; } void on_receiver_close(proton::receiver& r) override { std::cout << NOW() << "on_receiver_close : " << topic << std::endl; } void on_message(proton::delivery &d, proton::message &msg) override { (void)0; // do nothing } }; class client : public proton::messaging_handler { private: proton::connection connection; std::string url; std::string user; std::string password; std::unordered_map<std::string, std::unique_ptr<msg_handler>> handlers; public: client(const std::string &s, const std::string &u, const std::string &p) : url(s), user(u), password(p) {} void on_container_start(proton::container &container) override { std::cout << NOW() << "on_container_start" << std::endl; connection = container.connect(url, proton::connection_options() .idle_timeout(proton::duration::MINUTE * 2) .ssl_client_options(proton::ssl_client_options("lmss:root", proton::ssl::verify_mode::VERIFY_PEER)) .sasl_allow_insecure_mechs(true).sasl_allowed_mechs("PLAIN") .user(user).password(password) .reconnect(proton::reconnect_options() .delay(proton::duration::MILLISECOND * 500) .max_delay(proton::duration::SECOND * 2)) .handler(*this) ); } void on_container_stop(proton::container &container) override { std::cout << NOW() << "on_container_stop" << std::endl; ::running = false; } void on_connection_open(proton::connection &c) override { std::cout << NOW() << "on_connection_open" << std::endl; } void on_connection_error(proton::connection &c) override { std::cout << NOW() << "on_connection_error" << std::endl; } void on_connection_close(proton::connection &c) override { std::cout << NOW() << "on_connection_close" << std::endl; } void on_transport_open(proton::transport &c) override { std::cout << NOW() << "on_transport_open" << std::endl; } void on_transport_error(proton::transport &c) override { std::cout << NOW() << "on_transport_error" << std::endl; } void on_transport_close(proton::transport &c) override { std::cout << NOW() << "on_transport_close" << std::endl; } void subscribe(const std::string& topic) { std::cout << NOW() << "subscribing to " << topic << std::endl; handlers.emplace(topic, std::make_unique<msg_handler>(topic)); proton::source::filter_map filterMap; const std::string filterString = "(originator <> '" + user + "' AND filter = 'N') OR " + user + " = 'Y'"; proton::value filterValue; proton::codec::encoder enc(filterValue); enc << proton::codec::start::described() << proton::symbol("apache.org:selector-filter:string") << filterString << proton::codec::finish(); filterMap.put(proton::symbol("selector"), filterValue); const std::string connection_name{ (topic == "Topic1") ? "" : topic }; connection.open_receiver("jms.topic." + topic, proton::receiver_options() .name(user + connection_name) .source(proton::source_options() .durability_mode(proton::terminus().CONFIGURATION) .filters(filterMap)) .credit_window(1000) .handler(*handlers.at(topic))); } }; int main() { std::string address("my_server_addr:port"); std::string user("ROBERTO"); std::string password("password"); client cl(address, user, password); proton::container container(cl); proton::work_queue work_queue(container); container.auto_stop(false); // we can't use the lambda capture when using a simple function ptr, // so we use a global ptr for our console handler g_container = &container; g_work_queue = &work_queue; SetConsoleCtrlHandler([](DWORD signal) { if (signal == CTRL_C_EVENT) { std::cout << NOW() << "CTRL-C shutting down!" << std::endl; g_work_queue->add([&]() { g_container->stop(); }); } return TRUE; }, TRUE); auto workerThread = std::thread([&]() { container.run(); }); work_queue.add([&]() { cl.subscribe("Topic1"); }); work_queue.add([&]() { cl.subscribe("Topic2"); }); ::running = true; while (::running) { // idle loop std::this_thread::sleep_for(std::chrono::seconds(1)); } return 0; }