[ 
https://issues.apache.org/jira/browse/PROTON-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937227#comment-17937227
 ] 

ASF GitHub Bot commented on PROTON-1442:
----------------------------------------

astitcher commented on code in PR #437:
URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2006440001


##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::receiver receiver; 
+    std::string url;
+    int expected;
+    int batch_size;
+    int current_batch = 0;
+    int committed = 0;
+
+    proton::session session;
+  public:
+    tx_recv(const std::string &s, int c, int b):
+        url(s), expected(c), batch_size(b) {}
+
+    void on_container_start(proton::container &c) override {
+        receiver = c.open_receiver(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] txn called " << (&s)
+                  << std::endl;
+        receiver.add_credit(batch_size);
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) override {
+        std::cout<<"# MESSAGE: " << msg.id() <<": "  << msg.body() << 
std::endl;
+        session.txn_accept(d);

Review Comment:
   session == d.session()



##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::receiver receiver; 
+    std::string url;
+    int expected;
+    int batch_size;
+    int current_batch = 0;
+    int committed = 0;
+
+    proton::session session;
+  public:
+    tx_recv(const std::string &s, int c, int b):
+        url(s), expected(c), batch_size(b) {}
+
+    void on_container_start(proton::container &c) override {
+        receiver = c.open_receiver(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;

Review Comment:
   Probably not necessary



##########
cpp/include/proton/target_options.hpp:
##########
@@ -101,6 +103,39 @@ class target_options {
     /// @endcond
 };
 
+class coordinator_options {
+  public:
+    /// Create an empty set of options.
+    PN_CPP_EXTERN coordinator_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN coordinator_options(const coordinator_options&);
+
+    PN_CPP_EXTERN ~coordinator_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&);
+
+    /// Set the address for the coordinator.  It is unset by default.  The
+    /// address is ignored if dynamic() is true.
+    PN_CPP_EXTERN coordinator_options& address(const std::string& addr);
+
+    /// **Unsettled API** Extension capabilities that are supported/requested
+    PN_CPP_EXTERN coordinator_options& capabilities(const 
std::vector<symbol>&);
+
+  private:
+    void apply(coordinator&) const;
+
+    class impl;
+    std::unique_ptr<impl> impl_;
+
+    /// @cond INTERNAL
+  friend class coordinator;
+  friend class sender_options;
+  friend class receiver_options;
+    /// @endcond
+};

Review Comment:
   As coordinator this should be in it's own header file - but again maybe it's 
not needed to be visible to the API user anyway.



##########
cpp/include/proton/container.hpp:
##########
@@ -326,6 +326,7 @@ class PN_CPP_CLASS_EXTERN container {
   friend class receiver_options;
   friend class sender_options;
   friend class work_queue;
+  friend class transaction;

Review Comment:
   Don't like this as `transaction` is not a user visible class, so it 
shouldn't be mentioned in the API at all.



##########
cpp/include/proton/session.hpp:
##########
@@ -29,13 +29,15 @@
 #include "./sender.hpp"
 
 #include <string>
+#include <iostream>
 
 /// @file
 /// @copybrief proton::session
 
 struct pn_session_t;
 
 namespace proton {
+  class transaction_impl;

Review Comment:
   Not sure you need this. I think the `friend class` declaration below is 
sufficient.



##########
cpp/src/node_options.cpp:
##########
@@ -162,6 +162,7 @@ class target_options::impl {
     option<enum target::expiry_policy> expiry_policy;
     option<std::vector<symbol> > capabilities;
     option<target::dynamic_property_map> dynamic_properties;
+    option<int> type;

Review Comment:
   possibly replace this with `option<bool> is_coordinator;`?



##########
cpp/include/proton/fwd.hpp:
##########
@@ -52,9 +52,12 @@ class sender_options;
 class session;
 class session_options;
 class source_options;
+class coordinator_options;

Review Comment:
   Not sure this is really user visible - I don't think users currently need 
access to coordinator termini.



##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::receiver receiver; 
+    std::string url;
+    int expected;
+    int batch_size;
+    int current_batch = 0;
+    int committed = 0;
+
+    proton::session session;
+  public:
+    tx_recv(const std::string &s, int c, int b):
+        url(s), expected(c), batch_size(b) {}
+
+    void on_container_start(proton::container &c) override {
+        receiver = c.open_receiver(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] txn called " << (&s)
+                  << std::endl;
+        receiver.add_credit(batch_size);
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) override {
+        std::cout<<"# MESSAGE: " << msg.id() <<": "  << msg.body() << 
std::endl;
+        session.txn_accept(d);
+        current_batch += 1;
+        if(current_batch == batch_size) {
+        }
+    }
+
+    void on_transaction_committed(proton::session s) override {
+        committed += current_batch;
+        current_batch = 0;
+        std::cout<<"    [OnTxnCommitted] Committed:"<< committed<< std::endl;
+        if(committed == expected) {
+            std::cout << "All messages committed" << std::endl;
+            s.connection().close();
+        }
+        else {
+            session.declare_transaction(*this);

Review Comment:
   session == s



##########
cpp/include/proton/target.hpp:
##########
@@ -65,6 +65,26 @@ class target : public terminus {
     /// @endcond
 };
 
+class coordinator : public terminus {
+  public:
+    /// Create an empty coordinator.
+    coordinator() = default;
+
+    /// The address of the coordinator.
+    PN_CPP_EXTERN std::string address() const;
+  private:
+    coordinator(pn_terminus_t* t);
+    coordinator(const sender&);
+    coordinator(const receiver&);
+
+
+    /// @cond INTERNAL
+  friend class proton::internal::factory<coordinator>;
+  friend class sender;
+  friend class receiver;
+    /// @endcond
+};

Review Comment:
   Shouldn't this be in a new `coordinator.hpp` header if it is used visible at 
all? I think it's likely though that this class currently doesn't need to be 
use visible anyway as it has no current use for a use of the API.



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender; 
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int confirmed = 0;
+
+    proton::session session;

Review Comment:
   As tx_recv: I don't think you need to store away the session.



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender; 
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int confirmed = 0;
+
+    proton::session session;
+
+  public:
+    tx_send(const std::string &s, int c, int b):
+        url(s), total(c), batch_size(b), sent(0) {}
+
+    void on_container_start(proton::container &c) override {
+        sender = c.open_sender(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] Session: " << (&s)
+                  << std::endl;
+        std::cout << "[on_transaction_declared] txn is_empty " << 
(s.txn_is_empty())
+                  << "\t" << std::endl;
+        send(sender);
+    }
+
+    void on_sendable(proton::sender &s) override {
+        std::cout << "    [OnSendable] session: " << &session
+                  << std::endl;
+        send(s);
+    }
+
+    void send(proton::sender &s) {
+        static int unique_id = 10000;
+        while (session.txn_is_declared() && sender.credit() &&
+               (committed + current_batch) < total) {
+            proton::message msg;
+            std::map<std::string, int> m;
+            m["sequence"] = committed + current_batch;
+
+            msg.id(unique_id++);
+            msg.body(m);
+            std::cout << "[example] transaction send msg: "  << msg
+                      << std::endl;
+            session.txn_send(sender, msg);
+            current_batch += 1;
+            if(current_batch == batch_size)
+            {
+                std::cout << " >> Txn attempt commit" << std::endl;
+                if (batch_index % 2 == 0) {
+                    session.txn_commit();
+                } else {
+                    session.txn_abort();
+                }       
+                batch_index++;
+            }
+        }
+    }
+
+    void on_tracker_accept(proton::tracker &t) override {
+        confirmed += 1;
+        std::cout << "    [example] on_tracker_accept:" << confirmed
+                  << std::endl;
+    }
+
+    void on_transaction_committed(proton::session s) override {
+        committed += current_batch;
+        current_batch = 0;
+        std::cout<<"    [OnTxnCommitted] Committed:"<< committed<< std::endl;
+        if(committed == total) {
+            std::cout << "All messages committed" << std::endl;
+            s.connection().close();
+        }
+        else {
+            std::cout << "redlcaring txn " << std::endl;
+            session.declare_transaction(*this);
+        }
+    }
+
+    void on_transaction_aborted(proton::session s) override {
+        std::cout << "Meesages Aborted ....." << std::endl;

Review Comment:
   typo



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender; 
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int confirmed = 0;
+
+    proton::session session;
+
+  public:
+    tx_send(const std::string &s, int c, int b):
+        url(s), total(c), batch_size(b), sent(0) {}
+
+    void on_container_start(proton::container &c) override {
+        sender = c.open_sender(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] Session: " << (&s)
+                  << std::endl;
+        std::cout << "[on_transaction_declared] txn is_empty " << 
(s.txn_is_empty())
+                  << "\t" << std::endl;
+        send(sender);
+    }
+
+    void on_sendable(proton::sender &s) override {
+        std::cout << "    [OnSendable] session: " << &session
+                  << std::endl;
+        send(s);
+    }
+
+    void send(proton::sender &s) {
+        static int unique_id = 10000;
+        while (session.txn_is_declared() && sender.credit() &&
+               (committed + current_batch) < total) {
+            proton::message msg;
+            std::map<std::string, int> m;
+            m["sequence"] = committed + current_batch;
+
+            msg.id(unique_id++);
+            msg.body(m);
+            std::cout << "[example] transaction send msg: "  << msg
+                      << std::endl;
+            session.txn_send(sender, msg);
+            current_batch += 1;
+            if(current_batch == batch_size)
+            {
+                std::cout << " >> Txn attempt commit" << std::endl;
+                if (batch_index % 2 == 0) {
+                    session.txn_commit();
+                } else {
+                    session.txn_abort();
+                }       
+                batch_index++;
+            }
+        }
+    }
+
+    void on_tracker_accept(proton::tracker &t) override {
+        confirmed += 1;
+        std::cout << "    [example] on_tracker_accept:" << confirmed
+                  << std::endl;
+    }
+
+    void on_transaction_committed(proton::session s) override {
+        committed += current_batch;
+        current_batch = 0;
+        std::cout<<"    [OnTxnCommitted] Committed:"<< committed<< std::endl;
+        if(committed == total) {
+            std::cout << "All messages committed" << std::endl;
+            s.connection().close();
+        }
+        else {
+            std::cout << "redlcaring txn " << std::endl;

Review Comment:
   typo



##########
cpp/include/proton/transaction.hpp:
##########
@@ -0,0 +1,63 @@
+#ifndef PROTON_TRANSACTION_HPP
+#define PROTON_TRANSACTION_HPP
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "./fwd.hpp"
+#include "./internal/export.hpp"
+#include "./sender.hpp"
+#include "./tracker.hpp"
+#include "./container.hpp"
+
+/// @file
+/// @copybrief proton::transaction
+
+namespace proton {
+
+class transaction_handler;

Review Comment:
   Don't need a declaration immediately followed by its definition



##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::receiver receiver; 
+    std::string url;
+    int expected;
+    int batch_size;
+    int current_batch = 0;
+    int committed = 0;
+
+    proton::session session;

Review Comment:
   Not sure we need to store the session anywhere - I think it can always be 
derived inside any callback



##########
cpp/include/proton/transaction.hpp:
##########
@@ -0,0 +1,63 @@
+#ifndef PROTON_TRANSACTION_HPP

Review Comment:
   I think this file should be called `transaction_handler.hpp` as that is what 
it defines



##########
cpp/include/proton/terminus.hpp:
##########
@@ -122,6 +122,7 @@ class terminus {
   friend class internal::factory<terminus>;
   friend class source;
   friend class target;
+  friend class coordinator;

Review Comment:
   I don't really like this especially if coordinator ends up being purely 
internal to the binding



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender; 
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int confirmed = 0;
+
+    proton::session session;
+
+  public:
+    tx_send(const std::string &s, int c, int b):
+        url(s), total(c), batch_size(b), sent(0) {}
+
+    void on_container_start(proton::container &c) override {
+        sender = c.open_sender(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        session = s;
+        std::cout << "    [on_session_open] declare_txn started..." << 
std::endl;
+        s.declare_transaction(*this);
+        std::cout << "    [on_session_open] declare_txn ended..." << std::endl;
+    }
+
+    void on_transaction_declare_failed(proton::session) {}
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction Commit Failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "[on_transaction_declared] Session: " << (&s)
+                  << std::endl;
+        std::cout << "[on_transaction_declared] txn is_empty " << 
(s.txn_is_empty())
+                  << "\t" << std::endl;
+        send(sender);
+    }
+
+    void on_sendable(proton::sender &s) override {
+        std::cout << "    [OnSendable] session: " << &session
+                  << std::endl;
+        send(s);
+    }
+
+    void send(proton::sender &s) {
+        static int unique_id = 10000;

Review Comment:
   I think using `static` like this is bad practice in example code. It won't 
be thread safe if this code is copied into user code which uses multiple 
sending connections. Since we've got C++17 you could use `std::atomic_int` and 
`std::atomic_fetch_add`



##########
cpp/include/proton/fwd.hpp:
##########
@@ -52,9 +52,12 @@ class sender_options;
 class session;
 class session_options;
 class source_options;
+class coordinator_options;
 class ssl;
 class target_options;
 class tracker;
+class transaction;

Review Comment:
   There is no user visible transaaction class, so this can't be necessary.



##########
cpp/include/proton/target.hpp:
##########
@@ -65,6 +65,26 @@ class target : public terminus {
     /// @endcond
 };
 
+class coordinator : public terminus {
+  public:
+    /// Create an empty coordinator.
+    coordinator() = default;
+
+    /// The address of the coordinator.
+    PN_CPP_EXTERN std::string address() const;
+  private:
+    coordinator(pn_terminus_t* t);
+    coordinator(const sender&);
+    coordinator(const receiver&);
+
+
+    /// @cond INTERNAL
+  friend class proton::internal::factory<coordinator>;
+  friend class sender;
+  friend class receiver;
+    /// @endcond
+};

Review Comment:
   As I said above I'm coming to think this might actually be just an internal 
subclass or `target` or just represented as a transaction_coordinator boolean 
on a target, set by a similarly named target_option.



##########
cpp/src/proton_bits.hpp:
##########
@@ -98,11 +102,15 @@ template <> struct wrapped<sender> { typedef pn_link_t 
type; };
 template <> struct wrapped<receiver> { typedef pn_link_t type; };
 template <> struct wrapped<transfer> { typedef pn_delivery_t type; };
 template <> struct wrapped<tracker> { typedef pn_delivery_t type; };
+template <> struct wrapped<disposition> {
+    typedef pn_disposition_t type;
+};

Review Comment:
   Inconsistent formatting



##########
cpp/include/proton/sender_options.hpp:
##########
@@ -94,6 +94,9 @@ class sender_options {
     /// Options for the receiver node of the receiver.
     PN_CPP_EXTERN sender_options& target(const target_options&);
 
+    /// Options for the coordinator node of the receiver.
+    PN_CPP_EXTERN sender_options& coordinator(const coordinator_options&);

Review Comment:
   I've now come to think that I was wrong earlier when I suggested that 
`coordinator` is a peer class to `target` and `source`. Reading the spec more 
carefully, I think that a transaction coordinator is a special kind of target 
(that is how it's described in the spec) and could be modeled either as a 
subclass of` `target` or maybe just as a simple boolean `target_option` say 
`is_transaction_coordinator`.



##########
cpp/src/node_options.cpp:
##########
@@ -175,6 +176,9 @@ class target_options::impl {
             get(dynamic_properties.value, target_map);
             value(pn_terminus_properties(unwrap(t))) = target_map;
         }
+        if (type.set) {
+            pn_terminus_set_type(unwrap(t), pn_terminus_type_t(type.value));
+        }

Review Comment:
   use `is_coordinator`?



##########
cpp/src/proton_bits.hpp:
##########
@@ -111,6 +119,9 @@ template <> struct wrapper<pn_connection_t> { typedef 
connection type; };
 template <> struct wrapper<pn_session_t> { typedef session type; };
 template <> struct wrapper<pn_link_t> { typedef link type; };
 template <> struct wrapper<pn_delivery_t> { typedef transfer type; };
+template <> struct wrapper<pn_disposition_t> {
+    typedef disposition type;
+};

Review Comment:
   Inconsistent formatting



##########
cpp/src/sender_options.cpp:
##########
@@ -66,6 +66,7 @@ class sender_options::impl {
     option<bool> auto_settle;
     option<source_options> source;
     option<target_options> target;
+    option<coordinator_options> coordinator;

Review Comment:
   Not needed if coordinator is a kind of target



##########
cpp/src/sender_options.cpp:
##########
@@ -82,6 +83,10 @@ class sender_options::impl {
                 proton::target 
local_t(make_wrapper<proton::target>(pn_link_target(unwrap(s))));
                 target.value.apply(local_t);
             }
+            if (coordinator.set) {
+                proton::coordinator 
local_t(make_wrapper<proton::coordinator>(pn_link_target(unwrap(s))));
+                coordinator.value.apply(local_t);
+            }

Review Comment:
   Ditto



##########
cpp/src/transaction.cpp:
##########
@@ -0,0 +1,44 @@
+/*

Review Comment:
   Call this file transaction_handler.cpp



##########
cpp/include/proton/target_options.hpp:
##########
@@ -101,6 +103,39 @@ class target_options {
     /// @endcond
 };
 
+class coordinator_options {
+  public:
+    /// Create an empty set of options.
+    PN_CPP_EXTERN coordinator_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN coordinator_options(const coordinator_options&);
+
+    PN_CPP_EXTERN ~coordinator_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&);
+
+    /// Set the address for the coordinator.  It is unset by default.  The
+    /// address is ignored if dynamic() is true.
+    PN_CPP_EXTERN coordinator_options& address(const std::string& addr);
+
+    /// **Unsettled API** Extension capabilities that are supported/requested
+    PN_CPP_EXTERN coordinator_options& capabilities(const 
std::vector<symbol>&);
+
+  private:
+    void apply(coordinator&) const;
+
+    class impl;
+    std::unique_ptr<impl> impl_;
+
+    /// @cond INTERNAL
+  friend class coordinator;
+  friend class sender_options;
+  friend class receiver_options;
+    /// @endcond
+};

Review Comment:
   Again, probably an internal subclass of `target` or not needed at all 
because it's represented by a simple boolean in target.



##########
cpp/src/sender_options.cpp:
##########
@@ -118,6 +123,7 @@ sender_options& 
sender_options::delivery_mode(proton::delivery_mode m) {impl_->d
 sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; 
return *this; }
 sender_options& sender_options::source(const source_options &s) {impl_->source 
= s; return *this; }
 sender_options& sender_options::target(const target_options &s) {impl_->target 
= s; return *this; }
+sender_options& sender_options::coordinator(const coordinator_options &s) 
{impl_->coordinator = s; return *this; }

Review Comment:
   Ditto





> [c++] Support for transactions
> ------------------------------
>
>                 Key: PROTON-1442
>                 URL: https://issues.apache.org/jira/browse/PROTON-1442
>             Project: Qpid Proton
>          Issue Type: Improvement
>          Components: cpp-binding
>            Reporter: Radim Kubis
>            Assignee: Rakhi Kumari
>            Priority: Major
>
> Support for transactions in Qpid Proton C++.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org


Reply via email to