Hi,

Using Qpid Proton C++ 0.39.0 on Windows, I am trying to deal with network
disconnects/reconnects. The topology is a single server that sends on
multiple topics. There are multiple client apps, each on a different host,
acting as consumers/receivers - each client app is setup to only receive
messages filtered for themselves. I can't get the reconnect logic to work
successfully.

App structure: (full source at the end)
==============
class msg_handler : proton::messaging_handler
- on_receiver methods - just log
class client : proton::messaging_handler
- on_container_start
  - calls container.connect with just server/port for the connection url
and passes in auth args to connection_options, sets handler to itself,
reconnect(default opts + delay/max_delay)
  - saves the connection handle returned by connect() for later use
- on_transport_*, on_connection_* methods - just log
- subscribe(topic) method
  - calls connection.open_receiver with unique msg_handler per topic, sets
filters, durability

main()
- create client
- create container(client), call auto_stop(false)
- create work_queue(container)
- start new thread to call container.run()
- work_queue.add( client.subscribe(topic1) )
- work_queue.add( client.subscribe(topic2) )
- idle loop

Running:
========
13:25:42.896 : on_container_start
13:25:43.124 : on_transport_open
13:25:43.143 : on_connection_open
13:25:43.900 : subscribing to Topic1
13:25:43.900 : subscribing to Topic2
13:25:58.194 : on_receiver_open : Topic1
13:25:58.220 : on_receiver_open : Topic2
13:26:32.398 : on_transport_error              <-- WiFi turned off just
before here
13:26:32.402 : on_transport_error
13:26:32.912 : on_transport_error
13:26:33.581 : on_transport_error
13:26:34.736 : on_transport_error
13:26:36.137 : on_transport_error              <-- WiFi turned back on
around here
13:26:37.936 : on_transport_error
13:26:40.477 : on_transport_open
13:26:40.478 : on_connection_open
13:26:40.478 : on_receiver_open : Topic1
13:26:40.478 : on_receiver_open : Topic2
13:26:40.479 : on_receiver_error : Topic1 - AMQ219005: error creating
consumer, AMQ119200: Maximum Consumer Limit Reached on
Queue:(address=jms.topic.Topic1,queue=5cdf17ed-185e-4939-a27c-c460b5f6b9fe.ROBERTO)
13:26:40.479 : on_receiver_close : Topic1
13:26:40.480 : on_receiver_error : Topic2 - AMQ219005: error creating
consumer, AMQ119200: Maximum Consumer Limit Reached on
Queue:(address=jms.topic.Topic2,queue=5cdf17ed-185e-4939-a27c-c460b5f6b9fe.ROBERTOTopic2)
13:26:40.480 : on_receiver_close : Topic2
13:26:48.329 : CTRL-C shutting down!
13:26:48.339 : on_transport_error
13:26:48.359 : on_container_stop

1. The reconnect logic seems to work fine, in that the proton library
automatically tries to re-open the receivers. However, we hit the Max
Consumer Limit error and they shutdown. Why? Is my app's overall structure
correct for dealing with multiple topics from the same server?
2. What is the recommended way to use handlers for multiple receivers? (My
error occurs whether using a single handler for both topics vs a handler
per topic.)
3. Do I even need to call connect() and save the handle? Seems like I can
also just call container.open_receiver(), and the addr_url will be the same
server:port, but a different path that reflects the second topic name. Will
it re-use the same underlying connection?
4. If I turn off WiFi after the receivers are open, but turn it back on
before I receive my first on_transport_error, no other callbacks are
called, so I assume everything is good?

Thank you for your help!
Roberto

rde...@gmail.com
Roberto De Feo

Full source of example (w/ server/user/pass removed):
#include "stdafx.h"

#include <Windows.h>
#include <proton/connection_options.hpp>
#include <proton/duration.hpp>
#include <proton/receiver_options.hpp>
#include <proton/source_options.hpp>
#include <proton/transport.hpp>
#include <proton/connection.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/work_queue.hpp>
#include <proton/ssl.hpp>
#include <proton/container.hpp>
#include <proton/reconnect_options.hpp>
#include <iostream>
#include <thread>
#include <unordered_map>

namespace {
   proton::container* g_container;
   proton::work_queue* g_work_queue;
   bool running = false;

   auto NOW() {
      SYSTEMTIME st;
      GetSystemTime(&st);
      char buf[32] = "";
      sprintf(buf, "%.2d:%.2d:%.2d.%.3d : ", st.wHour, st.wMinute,
st.wSecond, st.wMilliseconds);
      return std::string(buf);
   }
}

class msg_handler : public proton::messaging_handler {
private:
   std::string topic;

public:
   msg_handler(const std::string& t) : topic(t) {}

   void on_receiver_open(proton::receiver& r) override {
      std::cout << NOW() << "on_receiver_open : " << topic << std::endl;
   }
   void on_receiver_error(proton::receiver& r) override {
      std::cout << NOW() << "on_receiver_error : " << topic << " - " <<
r.error().description() << std::endl;
   }
   void on_receiver_close(proton::receiver& r) override {
      std::cout << NOW() << "on_receiver_close : " << topic << std::endl;
   }
   void on_message(proton::delivery &d, proton::message &msg) override {
      (void)0; // do nothing
   }
};

class client : public proton::messaging_handler {
private:
   proton::connection connection;
   std::string url;
   std::string user;
   std::string password;
   std::unordered_map<std::string, std::unique_ptr<msg_handler>> handlers;

public:
   client(const std::string &s, const std::string &u, const std::string &p)
:
      url(s), user(u), password(p) {}

   void on_container_start(proton::container &container) override {
      std::cout << NOW() << "on_container_start" << std::endl;
      connection = container.connect(url,
         proton::connection_options()
         .idle_timeout(proton::duration::MINUTE * 2)
         .ssl_client_options(proton::ssl_client_options("lmss:root",
proton::ssl::verify_mode::VERIFY_PEER))
         .sasl_allow_insecure_mechs(true).sasl_allowed_mechs("PLAIN")
         .user(user).password(password)
         .reconnect(proton::reconnect_options()
            .delay(proton::duration::MILLISECOND * 500)
            .max_delay(proton::duration::SECOND * 2))
         .handler(*this)
      );
   }
   void on_container_stop(proton::container &container) override {
      std::cout << NOW() << "on_container_stop" << std::endl;
      ::running = false;
   }
   void on_connection_open(proton::connection &c) override {
      std::cout << NOW() << "on_connection_open" << std::endl;
   }
   void on_connection_error(proton::connection &c) override {
      std::cout << NOW() << "on_connection_error" << std::endl;
   }
   void on_connection_close(proton::connection &c) override {
      std::cout << NOW() << "on_connection_close" << std::endl;
   }
   void on_transport_open(proton::transport &c) override {
      std::cout << NOW() << "on_transport_open" << std::endl;
   }
   void on_transport_error(proton::transport &c) override {
      std::cout << NOW() << "on_transport_error" << std::endl;
   }
   void on_transport_close(proton::transport &c) override {
      std::cout << NOW() << "on_transport_close" << std::endl;
   }

   void subscribe(const std::string& topic) {
      std::cout << NOW() << "subscribing to " << topic << std::endl;

      handlers.emplace(topic, std::make_unique<msg_handler>(topic));

      proton::source::filter_map filterMap;
      const std::string filterString = "(originator <> '" + user + "' AND
filter = 'N') OR " + user + " = 'Y'";
      proton::value filterValue;
      proton::codec::encoder enc(filterValue);
      enc << proton::codec::start::described()
         << proton::symbol("apache.org:selector-filter:string")
         << filterString
         << proton::codec::finish();
      filterMap.put(proton::symbol("selector"), filterValue);

      const std::string connection_name{ (topic == "Topic1") ? "" : topic };
      connection.open_receiver("jms.topic." + topic,
         proton::receiver_options()
         .name(user + connection_name)
         .source(proton::source_options()
            .durability_mode(proton::terminus().CONFIGURATION)
            .filters(filterMap))
         .credit_window(1000)
         .handler(*handlers.at(topic)));
   }
};

int main() {
   std::string address("my_server_addr:port");
   std::string user("ROBERTO");
   std::string password("password");

   client cl(address, user, password);
   proton::container container(cl);
   proton::work_queue work_queue(container);
   container.auto_stop(false);

   // we can't use the lambda capture when using a simple function ptr,
   // so we use a global ptr for our console handler
   g_container = &container;
   g_work_queue = &work_queue;
   SetConsoleCtrlHandler([](DWORD signal) {
      if (signal == CTRL_C_EVENT) {
         std::cout << NOW() << "CTRL-C shutting down!" << std::endl;
         g_work_queue->add([&]() { g_container->stop(); });
      }
      return TRUE;
   }, TRUE);

   auto workerThread = std::thread([&]() { container.run(); });

   work_queue.add([&]() { cl.subscribe("Topic1"); });
   work_queue.add([&]() { cl.subscribe("Topic2"); });

   ::running = true;
   while (::running) { // idle loop
      std::this_thread::sleep_for(std::chrono::seconds(1));
   }
   return 0;
}

Reply via email to