[ 
https://issues.apache.org/jira/browse/PROTON-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17947062#comment-17947062
 ] 

ASF GitHub Bot commented on PROTON-1442:
----------------------------------------

astitcher commented on code in PR #437:
URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2058501754


##########
cpp/include/proton/session.hpp:
##########
@@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public 
internal::object<pn_session_t>, public endp
     /// Get user data from this session.
     PN_CPP_EXTERN void* user_data() const;
 
+    PN_CPP_EXTERN void declare_transaction(proton::transaction_handler 
&handler, bool settle_before_discharge = false);
+
+    PN_CPP_EXTERN bool txn_is_empty();
+    PN_CPP_EXTERN bool txn_is_declared();
+    PN_CPP_EXTERN void txn_commit();
+    PN_CPP_EXTERN void txn_abort();
+    PN_CPP_EXTERN void txn_declare();
+    PN_CPP_EXTERN void txn_handle_outcome(proton::tracker);
+    PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message 
msg);
+    PN_CPP_EXTERN void txn_accept(delivery &t);
+    PN_CPP_EXTERN proton::connection txn_connection() const;
+
     /// @cond INTERNAL
   friend class internal::factory<session>;
   friend class session_iterator;
+  friend class transaction_impl;
     /// @endcond
+
+    private:
+    // clean up txn internally
+    void txn_delete();

Review Comment:
   I'd be very surprised if this is needed here - I'd think it could only be 
called from the impl class



##########
cpp/include/proton/session.hpp:
##########
@@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public 
internal::object<pn_session_t>, public endp
     /// Get user data from this session.
     PN_CPP_EXTERN void* user_data() const;
 
+    PN_CPP_EXTERN void declare_transaction(proton::transaction_handler 
&handler, bool settle_before_discharge = false);
+
+    PN_CPP_EXTERN bool txn_is_empty();
+    PN_CPP_EXTERN bool txn_is_declared();
+    PN_CPP_EXTERN void txn_commit();
+    PN_CPP_EXTERN void txn_abort();
+    PN_CPP_EXTERN void txn_declare();
+    PN_CPP_EXTERN void txn_handle_outcome(proton::tracker);
+    PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message 
msg);
+    PN_CPP_EXTERN void txn_accept(delivery &t);
+    PN_CPP_EXTERN proton::connection txn_connection() const;

Review Comment:
   Aren't these just internal functions? They should only be needed inside 
session_impl



##########
cpp/include/proton/session.hpp:
##########
@@ -36,6 +36,7 @@
 struct pn_session_t;
 
 namespace proton {
+  class transaction_impl;

Review Comment:
   Is this actually needed here?



##########
cpp/src/messaging_adapter.cpp:
##########
@@ -274,11 +299,13 @@ void on_link_local_open(messaging_handler& handler, 
pn_event_t* event) {
 
 void on_link_remote_open(messaging_handler& handler, pn_event_t* event) {
     auto lnk = pn_event_link(event);
-    // Currently don't implement (transaction) coordinator
-    if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
-      auto error = pn_link_condition(lnk);
-      pn_condition_set_name(error, "amqp:not-implemented");
-      pn_link_close(lnk);
+    if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) {
+      auto cond = pn_link_condition(lnk);
+      if (pn_condition_is_set(cond)) {
+            pn_condition_set_name(cond, "amqp:on_link_remote_open:FAILED");
+            pn_link_close(lnk);
+            return;
+        }

Review Comment:
   I think this change might be wrong - the C++ binding still doesn't support 
an incoming coordinator target. 
   [Even though we do now use an outgoing transaction coordinator target]



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,231 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+
+
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::transaction_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+    proton::tracker send(proton::sender s, proton::message msg);
+
+    void discharge(bool failed);
+    void release_pending();
+    void accept(delivery &d);
+    void update(tracker &d, uint64_t state);
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::transaction_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+void session::declare_transaction(proton::transaction_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!t.session().txn_is_empty()) {
+                    t.session().txn_handle_outcome(t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.mark_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+void session::txn_delete() { auto &_txn_impl = 
session_context::get(pn_object())._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+void session::txn_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::txn_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+void session::txn_declare() { 
session_context::get(pn_object())._txn_impl->declare(); }
+bool session::txn_is_empty() { return 
session_context::get(pn_object())._txn_impl == NULL; }
+bool session::txn_is_declared() { return (!txn_is_empty()) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+void session::txn_accept(delivery &t) { return 
session_context::get(pn_object())._txn_impl->accept(t); }
+proton::tracker session::txn_send(proton::sender s, proton::message msg) {
+    return session_context::get(pn_object())._txn_impl->send(s, msg);
+}
+void session::txn_handle_outcome(proton::tracker t) {
+    session_context::get(pn_object())._txn_impl->handle_outcome(t);
+}
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::transaction_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+proton::tracker transaction_impl::send(proton::sender s, proton::message msg) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared transaction can send a message");
+    proton::tracker tracker = s.send(msg);
+    update(tracker, 0x34);
+    return tracker;
+}
+
+void transaction_impl::accept(delivery &t) {
+    t.settle();

Review Comment:
   Needs update(t, PN_ACCEPT); 



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,231 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+
+
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::transaction_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+    proton::tracker send(proton::sender s, proton::message msg);
+
+    void discharge(bool failed);
+    void release_pending();
+    void accept(delivery &d);
+    void update(tracker &d, uint64_t state);
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::transaction_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+void session::declare_transaction(proton::transaction_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!t.session().txn_is_empty()) {
+                    t.session().txn_handle_outcome(t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.mark_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+void session::txn_delete() { auto &_txn_impl = 
session_context::get(pn_object())._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+void session::txn_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::txn_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+void session::txn_declare() { 
session_context::get(pn_object())._txn_impl->declare(); }
+bool session::txn_is_empty() { return 
session_context::get(pn_object())._txn_impl == NULL; }
+bool session::txn_is_declared() { return (!txn_is_empty()) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+void session::txn_accept(delivery &t) { return 
session_context::get(pn_object())._txn_impl->accept(t); }
+proton::tracker session::txn_send(proton::sender s, proton::message msg) {
+    return session_context::get(pn_object())._txn_impl->send(s, msg);
+}
+void session::txn_handle_outcome(proton::tracker t) {
+    session_context::get(pn_object())._txn_impl->handle_outcome(t);
+}
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::transaction_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+proton::tracker transaction_impl::send(proton::sender s, proton::message msg) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared transaction can send a message");
+    proton::tracker tracker = s.send(msg);
+    update(tracker, 0x34);
+    return tracker;
+}
+
+void transaction_impl::accept(delivery &t) {
+    t.settle();
+}
+
+void transaction_impl::update(tracker &t, uint64_t state) {

Review Comment:
   I think this should be delivery not tracker



##########
cpp/include/proton/session.hpp:
##########
@@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public 
internal::object<pn_session_t>, public endp
     /// Get user data from this session.
     PN_CPP_EXTERN void* user_data() const;
 
+    PN_CPP_EXTERN void declare_transaction(proton::transaction_handler 
&handler, bool settle_before_discharge = false);
+
+    PN_CPP_EXTERN bool txn_is_empty();
+    PN_CPP_EXTERN bool txn_is_declared();
+    PN_CPP_EXTERN void txn_commit();
+    PN_CPP_EXTERN void txn_abort();
+    PN_CPP_EXTERN void txn_declare();
+    PN_CPP_EXTERN void txn_handle_outcome(proton::tracker);
+    PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message 
msg);
+    PN_CPP_EXTERN void txn_accept(delivery &t);
+    PN_CPP_EXTERN proton::connection txn_connection() const;
+
     /// @cond INTERNAL
   friend class internal::factory<session>;
   friend class session_iterator;
+  friend class transaction_impl;

Review Comment:
   I'd think it's more likely that transaction_impl needs to a friend of 
session_impl



##########
cpp/include/proton/target_options.hpp:
##########
@@ -60,6 +60,10 @@ class target_options {
     /// address is ignored if dynamic() is true.
     PN_CPP_EXTERN target_options& address(const std::string& addr);
 
+    /// Set the target be of type coordinator.
+    /// This immediately override the currently assigned type.
+    PN_CPP_EXTERN target_options& mark_coordinator();

Review Comment:
   Need a better name: set_coordinator? make_coordinator? It's possible this 
doesn't need to be a user visible option.



##########
cpp/src/proactor_container_impl.cpp:
##########
@@ -26,6 +26,7 @@
 #include "proton/listener.hpp"
 #include "proton/reconnect_options.hpp"
 #include "proton/ssl.hpp"
+#include "proton/target_options.hpp"

Review Comment:
   Change Not needed



##########
cpp/src/messaging_adapter.cpp:
##########
@@ -116,7 +119,29 @@ void on_delivery(messaging_handler& handler, pn_event_t* 
event) {
     link_context& lctx = link_context::get(lnk);
     Tracing& ot = Tracing::getTracing();
 
-    if (pn_link_is_receiver(lnk)) {
+    if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) {
+        if (pn_delivery_updated(dlv)) {
+            tracker t(make_wrapper<tracker>(dlv));
+            ot.on_settled_span(t);
+            switch (pn_delivery_remote_state(dlv)) {
+            case PN_ACCEPTED:
+                handler.on_tracker_accept(t);
+                break;
+            case PN_REJECTED:
+                handler.on_tracker_reject(t);
+                break;
+            case PN_RELEASED:
+            case PN_MODIFIED:
+                handler.on_tracker_release(t);
+                break;
+            }
+            if (t.settled()) {
+                handler.on_tracker_settle(t);
+                if (lctx.auto_settle)
+                    t.settle();
+            }
+        }
+    } else if (pn_link_is_receiver(lnk)) {

Review Comment:
   I don't understand why the handling of messages on a transaction coordinator 
link should be different from any other link.



##########
cpp/src/proactor_container_impl.cpp:
##########
@@ -34,11 +35,15 @@
 #include "proton/proactor.h"
 #include "proton/transport.h"
 
+#include "proton/delivery.h"
+
 #include "contexts.hpp"
 #include "messaging_adapter.hpp"
 #include "reconnect_options_impl.hpp"
 #include "proton_bits.hpp"
 
+#include <proton/types.hpp>
+

Review Comment:
   Change not needed



##########
cpp/src/proactor_container_impl.cpp:
##########
@@ -34,11 +35,15 @@
 #include "proton/proactor.h"
 #include "proton/transport.h"
 
+#include "proton/delivery.h"
+

Review Comment:
   Change not needed



##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction_handler.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <atomic>
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+  private:
+    proton::sender sender;
+    std::string url;
+    int total;
+    int batch_size;
+    int sent;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+
+  public:
+    tx_send(const std::string &s, int c, int b):
+        url(s), total(c), batch_size(b), sent(0) {}
+
+    void on_container_start(proton::container &c) override {
+        sender = c.open_sender(url);
+    }
+
+    void on_session_open(proton::session &s) override {
+        std::cout << "New session is open, declaring transaction now..." << 
std::endl;
+        s.declare_transaction(*this);
+    }
+
+    void on_transaction_declare_failed(proton::session s) {
+        std::cout << "Transaction declarion failed" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_commit_failed(proton::session s) {
+        std::cout << "Transaction commit failed!" << std::endl;
+        s.connection().close();
+        exit(-1);
+    }
+
+    void on_transaction_declared(proton::session s) override {
+        std::cout << "Transaction is declared" << std::endl;
+        send();
+    }
+
+    void on_sendable(proton::sender&) override {
+        send();
+    }
+
+    void send() {
+        std::atomic<int> unique_id(10000);
+        proton::session session = sender.session();
+        while (session.txn_is_declared() && sender.credit() &&
+               (committed + current_batch) < total) {
+            proton::message msg;
+            std::map<std::string, int> m;
+            m["sequence"] = committed + current_batch;
+
+            msg.id(std::atomic_fetch_add(&unique_id, 1));
+            msg.body(m);
+            std::cout << "Sending: " << msg << std::endl;
+            session.txn_send(sender, msg);

Review Comment:
   I think the API we were aiming for would just use a sender object created 
from the transactioned session.





> [c++] Support for transactions
> ------------------------------
>
>                 Key: PROTON-1442
>                 URL: https://issues.apache.org/jira/browse/PROTON-1442
>             Project: Qpid Proton
>          Issue Type: Improvement
>          Components: cpp-binding
>            Reporter: Radim Kubis
>            Assignee: Rakhi Kumari
>            Priority: Major
>
> Support for transactions in Qpid Proton C++.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to