astitcher commented on code in PR #437:
URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2006440001


##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::receiver receiver; 
+    std::string url;
+    int expected;
+    int batch_size;
+    int current_batch = 0;
+    int committed = 0;
+
+    proton::session session;
+  public:
+    tx_recv(const std::string &s, int c, int b):
+        url(s), expected(c), batch_size(b) {}
+
+    void on_container_start(proton::container &c) override {
+        receiver = c.open_receiver(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] txn called " << (&s)
+                  << std::endl;
+        receiver.add_credit(batch_size);
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) override {
+        std::cout<<"# MESSAGE: " << msg.id() <<": "  << msg.body() << 
std::endl;
+        session.txn_accept(d);

Review Comment:
   session == d.session()



##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::receiver receiver; 
+    std::string url;
+    int expected;
+    int batch_size;
+    int current_batch = 0;
+    int committed = 0;
+
+    proton::session session;
+  public:
+    tx_recv(const std::string &s, int c, int b):
+        url(s), expected(c), batch_size(b) {}
+
+    void on_container_start(proton::container &c) override {
+        receiver = c.open_receiver(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;

Review Comment:
   Probably not necessary



##########
cpp/include/proton/target_options.hpp:
##########
@@ -101,6 +103,39 @@ class target_options {
     /// @endcond
 };
 
+class coordinator_options {
+  public:
+    /// Create an empty set of options.
+    PN_CPP_EXTERN coordinator_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN coordinator_options(const coordinator_options&);
+
+    PN_CPP_EXTERN ~coordinator_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&);
+
+    /// Set the address for the coordinator.  It is unset by default.  The
+    /// address is ignored if dynamic() is true.
+    PN_CPP_EXTERN coordinator_options& address(const std::string& addr);
+
+    /// **Unsettled API** Extension capabilities that are supported/requested
+    PN_CPP_EXTERN coordinator_options& capabilities(const 
std::vector<symbol>&);
+
+  private:
+    void apply(coordinator&) const;
+
+    class impl;
+    std::unique_ptr<impl> impl_;
+
+    /// @cond INTERNAL
+  friend class coordinator;
+  friend class sender_options;
+  friend class receiver_options;
+    /// @endcond
+};

Review Comment:
   As coordinator this should be in it's own header file - but again maybe it's 
not needed to be visible to the API user anyway.



##########
cpp/include/proton/container.hpp:
##########
@@ -326,6 +326,7 @@ class PN_CPP_CLASS_EXTERN container {
   friend class receiver_options;
   friend class sender_options;
   friend class work_queue;
+  friend class transaction;

Review Comment:
   Don't like this as `transaction` is not a user visible class, so it 
shouldn't be mentioned in the API at all.



##########
cpp/include/proton/session.hpp:
##########
@@ -29,13 +29,15 @@
 #include "./sender.hpp"
 
 #include <string>
+#include <iostream>
 
 /// @file
 /// @copybrief proton::session
 
 struct pn_session_t;
 
 namespace proton {
+  class transaction_impl;

Review Comment:
   Not sure you need this. I think the `friend class` declaration below is 
sufficient.



##########
cpp/src/node_options.cpp:
##########
@@ -162,6 +162,7 @@ class target_options::impl {
     option<enum target::expiry_policy> expiry_policy;
     option<std::vector<symbol> > capabilities;
     option<target::dynamic_property_map> dynamic_properties;
+    option<int> type;

Review Comment:
   possibly replace this with `option<bool> is_coordinator;`?



##########
cpp/include/proton/fwd.hpp:
##########
@@ -52,9 +52,12 @@ class sender_options;
 class session;
 class session_options;
 class source_options;
+class coordinator_options;

Review Comment:
   Not sure this is really user visible - I don't think users currently need 
access to coordinator termini.



##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::receiver receiver; 
+    std::string url;
+    int expected;
+    int batch_size;
+    int current_batch = 0;
+    int committed = 0;
+
+    proton::session session;
+  public:
+    tx_recv(const std::string &s, int c, int b):
+        url(s), expected(c), batch_size(b) {}
+
+    void on_container_start(proton::container &c) override {
+        receiver = c.open_receiver(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] txn called " << (&s)
+                  << std::endl;
+        receiver.add_credit(batch_size);
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) override {
+        std::cout<<"# MESSAGE: " << msg.id() <<": "  << msg.body() << 
std::endl;
+        session.txn_accept(d);
+        current_batch += 1;
+        if(current_batch == batch_size) {
+        }
+    }
+
+    void on_transaction_committed(proton::session s) override {
+        committed += current_batch;
+        current_batch = 0;
+        std::cout<<"    [OnTxnCommitted] Committed:"<< committed<< std::endl;
+        if(committed == expected) {
+            std::cout << "All messages committed" << std::endl;
+            s.connection().close();
+        }
+        else {
+            session.declare_transaction(*this);

Review Comment:
   session == s



##########
cpp/include/proton/target.hpp:
##########
@@ -65,6 +65,26 @@ class target : public terminus {
     /// @endcond
 };
 
+class coordinator : public terminus {
+  public:
+    /// Create an empty coordinator.
+    coordinator() = default;
+
+    /// The address of the coordinator.
+    PN_CPP_EXTERN std::string address() const;
+  private:
+    coordinator(pn_terminus_t* t);
+    coordinator(const sender&);
+    coordinator(const receiver&);
+
+
+    /// @cond INTERNAL
+  friend class proton::internal::factory<coordinator>;
+  friend class sender;
+  friend class receiver;
+    /// @endcond
+};

Review Comment:
   Shouldn't this be in a new `coordinator.hpp` header if it is used visible at 
all? I think it's likely though that this class currently doesn't need to be 
use visible anyway as it has no current use for a use of the API.



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender; 
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int confirmed = 0;
+
+    proton::session session;

Review Comment:
   As tx_recv: I don't think you need to store away the session.



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender; 
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int confirmed = 0;
+
+    proton::session session;
+
+  public:
+    tx_send(const std::string &s, int c, int b):
+        url(s), total(c), batch_size(b), sent(0) {}
+
+    void on_container_start(proton::container &c) override {
+        sender = c.open_sender(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] Session: " << (&s)
+                  << std::endl;
+        std::cout << "[on_transaction_declared] txn is_empty " << 
(s.txn_is_empty())
+                  << "\t" << std::endl;
+        send(sender);
+    }
+
+    void on_sendable(proton::sender &s) override {
+        std::cout << "    [OnSendable] session: " << &session
+                  << std::endl;
+        send(s);
+    }
+
+    void send(proton::sender &s) {
+        static int unique_id = 10000;
+        while (session.txn_is_declared() && sender.credit() &&
+               (committed + current_batch) < total) {
+            proton::message msg;
+            std::map<std::string, int> m;
+            m["sequence"] = committed + current_batch;
+
+            msg.id(unique_id++);
+            msg.body(m);
+            std::cout << "[example] transaction send msg: "  << msg
+                      << std::endl;
+            session.txn_send(sender, msg);
+            current_batch += 1;
+            if(current_batch == batch_size)
+            {
+                std::cout << " >> Txn attempt commit" << std::endl;
+                if (batch_index % 2 == 0) {
+                    session.txn_commit();
+                } else {
+                    session.txn_abort();
+                }       
+                batch_index++;
+            }
+        }
+    }
+
+    void on_tracker_accept(proton::tracker &t) override {
+        confirmed += 1;
+        std::cout << "    [example] on_tracker_accept:" << confirmed
+                  << std::endl;
+    }
+
+    void on_transaction_committed(proton::session s) override {
+        committed += current_batch;
+        current_batch = 0;
+        std::cout<<"    [OnTxnCommitted] Committed:"<< committed<< std::endl;
+        if(committed == total) {
+            std::cout << "All messages committed" << std::endl;
+            s.connection().close();
+        }
+        else {
+            std::cout << "redlcaring txn " << std::endl;
+            session.declare_transaction(*this);
+        }
+    }
+
+    void on_transaction_aborted(proton::session s) override {
+        std::cout << "Meesages Aborted ....." << std::endl;

Review Comment:
   typo



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender; 
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int confirmed = 0;
+
+    proton::session session;
+
+  public:
+    tx_send(const std::string &s, int c, int b):
+        url(s), total(c), batch_size(b), sent(0) {}
+
+    void on_container_start(proton::container &c) override {
+        sender = c.open_sender(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] Session: " << (&s)
+                  << std::endl;
+        std::cout << "[on_transaction_declared] txn is_empty " << 
(s.txn_is_empty())
+                  << "\t" << std::endl;
+        send(sender);
+    }
+
+    void on_sendable(proton::sender &s) override {
+        std::cout << "    [OnSendable] session: " << &session
+                  << std::endl;
+        send(s);
+    }
+
+    void send(proton::sender &s) {
+        static int unique_id = 10000;
+        while (session.txn_is_declared() && sender.credit() &&
+               (committed + current_batch) < total) {
+            proton::message msg;
+            std::map<std::string, int> m;
+            m["sequence"] = committed + current_batch;
+
+            msg.id(unique_id++);
+            msg.body(m);
+            std::cout << "[example] transaction send msg: "  << msg
+                      << std::endl;
+            session.txn_send(sender, msg);
+            current_batch += 1;
+            if(current_batch == batch_size)
+            {
+                std::cout << " >> Txn attempt commit" << std::endl;
+                if (batch_index % 2 == 0) {
+                    session.txn_commit();
+                } else {
+                    session.txn_abort();
+                }       
+                batch_index++;
+            }
+        }
+    }
+
+    void on_tracker_accept(proton::tracker &t) override {
+        confirmed += 1;
+        std::cout << "    [example] on_tracker_accept:" << confirmed
+                  << std::endl;
+    }
+
+    void on_transaction_committed(proton::session s) override {
+        committed += current_batch;
+        current_batch = 0;
+        std::cout<<"    [OnTxnCommitted] Committed:"<< committed<< std::endl;
+        if(committed == total) {
+            std::cout << "All messages committed" << std::endl;
+            s.connection().close();
+        }
+        else {
+            std::cout << "redlcaring txn " << std::endl;

Review Comment:
   typo



##########
cpp/include/proton/transaction.hpp:
##########
@@ -0,0 +1,63 @@
+#ifndef PROTON_TRANSACTION_HPP
+#define PROTON_TRANSACTION_HPP
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "./fwd.hpp"
+#include "./internal/export.hpp"
+#include "./sender.hpp"
+#include "./tracker.hpp"
+#include "./container.hpp"
+
+/// @file
+/// @copybrief proton::transaction
+
+namespace proton {
+
+class transaction_handler;

Review Comment:
   Don't need a declaration immediately followed by its definition



##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::receiver receiver; 
+    std::string url;
+    int expected;
+    int batch_size;
+    int current_batch = 0;
+    int committed = 0;
+
+    proton::session session;

Review Comment:
   Not sure we need to store the session anywhere - I think it can always be 
derived inside any callback



##########
cpp/include/proton/transaction.hpp:
##########
@@ -0,0 +1,63 @@
+#ifndef PROTON_TRANSACTION_HPP

Review Comment:
   I think this file should be called `transaction_handler.hpp` as that is what 
it defines



##########
cpp/include/proton/terminus.hpp:
##########
@@ -122,6 +122,7 @@ class terminus {
   friend class internal::factory<terminus>;
   friend class source;
   friend class target;
+  friend class coordinator;

Review Comment:
   I don't really like this especially if coordinator ends up being purely 
internal to the binding



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender; 
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int confirmed = 0;
+
+    proton::session session;
+
+  public:
+    tx_send(const std::string &s, int c, int b):
+        url(s), total(c), batch_size(b), sent(0) {}
+
+    void on_container_start(proton::container &c) override {
+        sender = c.open_sender(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] Session: " << (&s)
+                  << std::endl;
+        std::cout << "[on_transaction_declared] txn is_empty " << 
(s.txn_is_empty())
+                  << "\t" << std::endl;
+        send(sender);
+    }
+
+    void on_sendable(proton::sender &s) override {
+        std::cout << "    [OnSendable] session: " << &session
+                  << std::endl;
+        send(s);
+    }
+
+    void send(proton::sender &s) {
+        static int unique_id = 10000;

Review Comment:
   I think using `static` like this is bad practice in example code. It won't 
be thread safe if this code is copied into user code which uses multiple 
sending connections. Since we've got C++17 you could use `std::atomic_int` and 
`std::atomic_fetch_add`



##########
cpp/include/proton/fwd.hpp:
##########
@@ -52,9 +52,12 @@ class sender_options;
 class session;
 class session_options;
 class source_options;
+class coordinator_options;
 class ssl;
 class target_options;
 class tracker;
+class transaction;

Review Comment:
   There is no user visible transaaction class, so this can't be necessary.



##########
cpp/include/proton/target.hpp:
##########
@@ -65,6 +65,26 @@ class target : public terminus {
     /// @endcond
 };
 
+class coordinator : public terminus {
+  public:
+    /// Create an empty coordinator.
+    coordinator() = default;
+
+    /// The address of the coordinator.
+    PN_CPP_EXTERN std::string address() const;
+  private:
+    coordinator(pn_terminus_t* t);
+    coordinator(const sender&);
+    coordinator(const receiver&);
+
+
+    /// @cond INTERNAL
+  friend class proton::internal::factory<coordinator>;
+  friend class sender;
+  friend class receiver;
+    /// @endcond
+};

Review Comment:
   As I said above I'm coming to think this might actually be just an internal 
subclass or `target` or just represented as a transaction_coordinator boolean 
on a target, set by a similarly named target_option.



##########
cpp/src/proton_bits.hpp:
##########
@@ -98,11 +102,15 @@ template <> struct wrapped<sender> { typedef pn_link_t 
type; };
 template <> struct wrapped<receiver> { typedef pn_link_t type; };
 template <> struct wrapped<transfer> { typedef pn_delivery_t type; };
 template <> struct wrapped<tracker> { typedef pn_delivery_t type; };
+template <> struct wrapped<disposition> {
+    typedef pn_disposition_t type;
+};

Review Comment:
   Inconsistent formatting



##########
cpp/include/proton/sender_options.hpp:
##########
@@ -94,6 +94,9 @@ class sender_options {
     /// Options for the receiver node of the receiver.
     PN_CPP_EXTERN sender_options& target(const target_options&);
 
+    /// Options for the coordinator node of the receiver.
+    PN_CPP_EXTERN sender_options& coordinator(const coordinator_options&);

Review Comment:
   I've now come to think that I was wrong earlier when I suggested that 
`coordinator` is a peer class to `target` and `source`. Reading the spec more 
carefully, I think that a transaction coordinator is a special kind of target 
(that is how it's described in the spec) and could be modeled either as a 
subclass of` `target` or maybe just as a simple boolean `target_option` say 
`is_transaction_coordinator`.



##########
cpp/src/node_options.cpp:
##########
@@ -175,6 +176,9 @@ class target_options::impl {
             get(dynamic_properties.value, target_map);
             value(pn_terminus_properties(unwrap(t))) = target_map;
         }
+        if (type.set) {
+            pn_terminus_set_type(unwrap(t), pn_terminus_type_t(type.value));
+        }

Review Comment:
   use `is_coordinator`?



##########
cpp/src/proton_bits.hpp:
##########
@@ -111,6 +119,9 @@ template <> struct wrapper<pn_connection_t> { typedef 
connection type; };
 template <> struct wrapper<pn_session_t> { typedef session type; };
 template <> struct wrapper<pn_link_t> { typedef link type; };
 template <> struct wrapper<pn_delivery_t> { typedef transfer type; };
+template <> struct wrapper<pn_disposition_t> {
+    typedef disposition type;
+};

Review Comment:
   Inconsistent formatting



##########
cpp/src/sender_options.cpp:
##########
@@ -66,6 +66,7 @@ class sender_options::impl {
     option<bool> auto_settle;
     option<source_options> source;
     option<target_options> target;
+    option<coordinator_options> coordinator;

Review Comment:
   Not needed if coordinator is a kind of target



##########
cpp/src/sender_options.cpp:
##########
@@ -82,6 +83,10 @@ class sender_options::impl {
                 proton::target 
local_t(make_wrapper<proton::target>(pn_link_target(unwrap(s))));
                 target.value.apply(local_t);
             }
+            if (coordinator.set) {
+                proton::coordinator 
local_t(make_wrapper<proton::coordinator>(pn_link_target(unwrap(s))));
+                coordinator.value.apply(local_t);
+            }

Review Comment:
   Ditto



##########
cpp/src/transaction.cpp:
##########
@@ -0,0 +1,44 @@
+/*

Review Comment:
   Call this file transaction_handler.cpp



##########
cpp/include/proton/target_options.hpp:
##########
@@ -101,6 +103,39 @@ class target_options {
     /// @endcond
 };
 
+class coordinator_options {
+  public:
+    /// Create an empty set of options.
+    PN_CPP_EXTERN coordinator_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN coordinator_options(const coordinator_options&);
+
+    PN_CPP_EXTERN ~coordinator_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&);
+
+    /// Set the address for the coordinator.  It is unset by default.  The
+    /// address is ignored if dynamic() is true.
+    PN_CPP_EXTERN coordinator_options& address(const std::string& addr);
+
+    /// **Unsettled API** Extension capabilities that are supported/requested
+    PN_CPP_EXTERN coordinator_options& capabilities(const 
std::vector<symbol>&);
+
+  private:
+    void apply(coordinator&) const;
+
+    class impl;
+    std::unique_ptr<impl> impl_;
+
+    /// @cond INTERNAL
+  friend class coordinator;
+  friend class sender_options;
+  friend class receiver_options;
+    /// @endcond
+};

Review Comment:
   Again, probably an internal subclass of `target` or not needed at all 
because it's represented by a simple boolean in target.



##########
cpp/src/sender_options.cpp:
##########
@@ -118,6 +123,7 @@ sender_options& 
sender_options::delivery_mode(proton::delivery_mode m) {impl_->d
 sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; 
return *this; }
 sender_options& sender_options::source(const source_options &s) {impl_->source 
= s; return *this; }
 sender_options& sender_options::target(const target_options &s) {impl_->target 
= s; return *this; }
+sender_options& sender_options::coordinator(const coordinator_options &s) 
{impl_->coordinator = s; return *this; }

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org


Reply via email to