[ https://issues.apache.org/jira/browse/PROTON-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17947062#comment-17947062 ]
ASF GitHub Bot commented on PROTON-1442: ---------------------------------------- astitcher commented on code in PR #437: URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2058501754 ########## cpp/include/proton/session.hpp: ########## @@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + PN_CPP_EXTERN void declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false); + + PN_CPP_EXTERN bool txn_is_empty(); + PN_CPP_EXTERN bool txn_is_declared(); + PN_CPP_EXTERN void txn_commit(); + PN_CPP_EXTERN void txn_abort(); + PN_CPP_EXTERN void txn_declare(); + PN_CPP_EXTERN void txn_handle_outcome(proton::tracker); + PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message msg); + PN_CPP_EXTERN void txn_accept(delivery &t); + PN_CPP_EXTERN proton::connection txn_connection() const; + /// @cond INTERNAL friend class internal::factory<session>; friend class session_iterator; + friend class transaction_impl; /// @endcond + + private: + // clean up txn internally + void txn_delete(); Review Comment: I'd be very surprised if this is needed here - I'd think it could only be called from the impl class ########## cpp/include/proton/session.hpp: ########## @@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + PN_CPP_EXTERN void declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false); + + PN_CPP_EXTERN bool txn_is_empty(); + PN_CPP_EXTERN bool txn_is_declared(); + PN_CPP_EXTERN void txn_commit(); + PN_CPP_EXTERN void txn_abort(); + PN_CPP_EXTERN void txn_declare(); + PN_CPP_EXTERN void txn_handle_outcome(proton::tracker); + PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message msg); + PN_CPP_EXTERN void txn_accept(delivery &t); + PN_CPP_EXTERN proton::connection txn_connection() const; Review Comment: Aren't these just internal functions? They should only be needed inside session_impl ########## cpp/include/proton/session.hpp: ########## @@ -36,6 +36,7 @@ struct pn_session_t; namespace proton { + class transaction_impl; Review Comment: Is this actually needed here? ########## cpp/src/messaging_adapter.cpp: ########## @@ -274,11 +299,13 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) { void on_link_remote_open(messaging_handler& handler, pn_event_t* event) { auto lnk = pn_event_link(event); - // Currently don't implement (transaction) coordinator - if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) { - auto error = pn_link_condition(lnk); - pn_condition_set_name(error, "amqp:not-implemented"); - pn_link_close(lnk); + if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) { + auto cond = pn_link_condition(lnk); + if (pn_condition_is_set(cond)) { + pn_condition_set_name(cond, "amqp:on_link_remote_open:FAILED"); + pn_link_close(lnk); + return; + } Review Comment: I think this change might be wrong - the C++ binding still doesn't support an incoming coordinator target. [Even though we do now use an outgoing transaction coordinator target] ########## cpp/src/session.cpp: ########## @@ -148,4 +158,231 @@ void* session::user_data() const { return sctx.user_data_; } + + +class transaction_impl { + public: + proton::sender txn_ctrl; + proton::transaction_handler *handler = nullptr; + proton::binary transaction_id; + bool failed = false; + enum State { + FREE, + DECLARING, + DECLARED, + DISCHARGING, + }; + enum State state = State::FREE; + std::vector<proton::tracker> pending; + + void commit(); + void abort(); + void declare(); + proton::tracker send(proton::sender s, proton::message msg); + + void discharge(bool failed); + void release_pending(); + void accept(delivery &d); + void update(tracker &d, uint64_t state); + + proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); + void handle_outcome(proton::tracker t); + transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge); + ~transaction_impl(); +}; + +void session::declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge) { + auto &txn_impl = session_context::get(pn_object())._txn_impl; + if (txn_impl == nullptr) { + // Create _txn_impl + proton::connection conn = this->connection(); + class InternalTransactionHandler : public proton::messaging_handler { + + void on_tracker_settle(proton::tracker &t) override { + if (!t.session().txn_is_empty()) { + t.session().txn_handle_outcome(t); + } + } + }; + + proton::target_options opts; + std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")}; + opts.capabilities(cap); + opts.mark_coordinator(); + + proton::sender_options so; + so.name("txn-ctrl"); + so.target(opts); + + static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it + so.handler(internal_handler); + + static proton::sender s = conn.open_sender("does not matter", so); + + settle_before_discharge = false; + + txn_impl = new transaction_impl(s, handler, settle_before_discharge); + } + // Declare txn + txn_impl->declare(); +} + +void session::txn_delete() { auto &_txn_impl = session_context::get(pn_object())._txn_impl; delete _txn_impl; _txn_impl = nullptr;} +void session::txn_commit() { session_context::get(pn_object())._txn_impl->commit(); } +void session::txn_abort() { session_context::get(pn_object())._txn_impl->abort(); } +void session::txn_declare() { session_context::get(pn_object())._txn_impl->declare(); } +bool session::txn_is_empty() { return session_context::get(pn_object())._txn_impl == NULL; } +bool session::txn_is_declared() { return (!txn_is_empty()) && session_context::get(pn_object())._txn_impl->state == transaction_impl::State::DECLARED; } +void session::txn_accept(delivery &t) { return session_context::get(pn_object())._txn_impl->accept(t); } +proton::tracker session::txn_send(proton::sender s, proton::message msg) { + return session_context::get(pn_object())._txn_impl->send(s, msg); +} +void session::txn_handle_outcome(proton::tracker t) { + session_context::get(pn_object())._txn_impl->handle_outcome(t); +} + +transaction_impl::transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge) + : txn_ctrl(_txn_ctrl), handler(&_handler) { +} +transaction_impl::~transaction_impl() {} + +void transaction_impl::commit() { + discharge(false); +} + +void transaction_impl::abort() { + discharge(true); +} + +void transaction_impl::declare() { + if (state != transaction_impl::State::FREE) + throw proton::error("This session has some associcated transaction already"); + state = State::DECLARING; + + proton::symbol descriptor("amqp:declare:list"); + std::list<proton::value> vd; + proton::value i_am_null; + vd.push_back(i_am_null); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +void transaction_impl::discharge(bool _failed) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared txn can be discharged."); + state = State::DISCHARGING; + + failed = _failed; + proton::symbol descriptor("amqp:discharge:list"); + std::list<proton::value> vd; + vd.push_back(transaction_id); + vd.push_back(failed); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::value _value) { + proton::value msg_value; + proton::codec::encoder enc(msg_value); + enc << proton::codec::start::described() + << descriptor + << _value + << proton::codec::finish(); + + + proton::message msg = msg_value; + proton::tracker delivery = txn_ctrl.send(msg); + return delivery; +} + +proton::tracker transaction_impl::send(proton::sender s, proton::message msg) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared transaction can send a message"); + proton::tracker tracker = s.send(msg); + update(tracker, 0x34); + return tracker; +} + +void transaction_impl::accept(delivery &t) { + t.settle(); Review Comment: Needs update(t, PN_ACCEPT); ########## cpp/src/session.cpp: ########## @@ -148,4 +158,231 @@ void* session::user_data() const { return sctx.user_data_; } + + +class transaction_impl { + public: + proton::sender txn_ctrl; + proton::transaction_handler *handler = nullptr; + proton::binary transaction_id; + bool failed = false; + enum State { + FREE, + DECLARING, + DECLARED, + DISCHARGING, + }; + enum State state = State::FREE; + std::vector<proton::tracker> pending; + + void commit(); + void abort(); + void declare(); + proton::tracker send(proton::sender s, proton::message msg); + + void discharge(bool failed); + void release_pending(); + void accept(delivery &d); + void update(tracker &d, uint64_t state); + + proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); + void handle_outcome(proton::tracker t); + transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge); + ~transaction_impl(); +}; + +void session::declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge) { + auto &txn_impl = session_context::get(pn_object())._txn_impl; + if (txn_impl == nullptr) { + // Create _txn_impl + proton::connection conn = this->connection(); + class InternalTransactionHandler : public proton::messaging_handler { + + void on_tracker_settle(proton::tracker &t) override { + if (!t.session().txn_is_empty()) { + t.session().txn_handle_outcome(t); + } + } + }; + + proton::target_options opts; + std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")}; + opts.capabilities(cap); + opts.mark_coordinator(); + + proton::sender_options so; + so.name("txn-ctrl"); + so.target(opts); + + static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it + so.handler(internal_handler); + + static proton::sender s = conn.open_sender("does not matter", so); + + settle_before_discharge = false; + + txn_impl = new transaction_impl(s, handler, settle_before_discharge); + } + // Declare txn + txn_impl->declare(); +} + +void session::txn_delete() { auto &_txn_impl = session_context::get(pn_object())._txn_impl; delete _txn_impl; _txn_impl = nullptr;} +void session::txn_commit() { session_context::get(pn_object())._txn_impl->commit(); } +void session::txn_abort() { session_context::get(pn_object())._txn_impl->abort(); } +void session::txn_declare() { session_context::get(pn_object())._txn_impl->declare(); } +bool session::txn_is_empty() { return session_context::get(pn_object())._txn_impl == NULL; } +bool session::txn_is_declared() { return (!txn_is_empty()) && session_context::get(pn_object())._txn_impl->state == transaction_impl::State::DECLARED; } +void session::txn_accept(delivery &t) { return session_context::get(pn_object())._txn_impl->accept(t); } +proton::tracker session::txn_send(proton::sender s, proton::message msg) { + return session_context::get(pn_object())._txn_impl->send(s, msg); +} +void session::txn_handle_outcome(proton::tracker t) { + session_context::get(pn_object())._txn_impl->handle_outcome(t); +} + +transaction_impl::transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge) + : txn_ctrl(_txn_ctrl), handler(&_handler) { +} +transaction_impl::~transaction_impl() {} + +void transaction_impl::commit() { + discharge(false); +} + +void transaction_impl::abort() { + discharge(true); +} + +void transaction_impl::declare() { + if (state != transaction_impl::State::FREE) + throw proton::error("This session has some associcated transaction already"); + state = State::DECLARING; + + proton::symbol descriptor("amqp:declare:list"); + std::list<proton::value> vd; + proton::value i_am_null; + vd.push_back(i_am_null); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +void transaction_impl::discharge(bool _failed) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared txn can be discharged."); + state = State::DISCHARGING; + + failed = _failed; + proton::symbol descriptor("amqp:discharge:list"); + std::list<proton::value> vd; + vd.push_back(transaction_id); + vd.push_back(failed); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::value _value) { + proton::value msg_value; + proton::codec::encoder enc(msg_value); + enc << proton::codec::start::described() + << descriptor + << _value + << proton::codec::finish(); + + + proton::message msg = msg_value; + proton::tracker delivery = txn_ctrl.send(msg); + return delivery; +} + +proton::tracker transaction_impl::send(proton::sender s, proton::message msg) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared transaction can send a message"); + proton::tracker tracker = s.send(msg); + update(tracker, 0x34); + return tracker; +} + +void transaction_impl::accept(delivery &t) { + t.settle(); +} + +void transaction_impl::update(tracker &t, uint64_t state) { Review Comment: I think this should be delivery not tracker ########## cpp/include/proton/session.hpp: ########## @@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + PN_CPP_EXTERN void declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false); + + PN_CPP_EXTERN bool txn_is_empty(); + PN_CPP_EXTERN bool txn_is_declared(); + PN_CPP_EXTERN void txn_commit(); + PN_CPP_EXTERN void txn_abort(); + PN_CPP_EXTERN void txn_declare(); + PN_CPP_EXTERN void txn_handle_outcome(proton::tracker); + PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message msg); + PN_CPP_EXTERN void txn_accept(delivery &t); + PN_CPP_EXTERN proton::connection txn_connection() const; + /// @cond INTERNAL friend class internal::factory<session>; friend class session_iterator; + friend class transaction_impl; Review Comment: I'd think it's more likely that transaction_impl needs to a friend of session_impl ########## cpp/include/proton/target_options.hpp: ########## @@ -60,6 +60,10 @@ class target_options { /// address is ignored if dynamic() is true. PN_CPP_EXTERN target_options& address(const std::string& addr); + /// Set the target be of type coordinator. + /// This immediately override the currently assigned type. + PN_CPP_EXTERN target_options& mark_coordinator(); Review Comment: Need a better name: set_coordinator? make_coordinator? It's possible this doesn't need to be a user visible option. ########## cpp/src/proactor_container_impl.cpp: ########## @@ -26,6 +26,7 @@ #include "proton/listener.hpp" #include "proton/reconnect_options.hpp" #include "proton/ssl.hpp" +#include "proton/target_options.hpp" Review Comment: Change Not needed ########## cpp/src/messaging_adapter.cpp: ########## @@ -116,7 +119,29 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { link_context& lctx = link_context::get(lnk); Tracing& ot = Tracing::getTracing(); - if (pn_link_is_receiver(lnk)) { + if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) { + if (pn_delivery_updated(dlv)) { + tracker t(make_wrapper<tracker>(dlv)); + ot.on_settled_span(t); + switch (pn_delivery_remote_state(dlv)) { + case PN_ACCEPTED: + handler.on_tracker_accept(t); + break; + case PN_REJECTED: + handler.on_tracker_reject(t); + break; + case PN_RELEASED: + case PN_MODIFIED: + handler.on_tracker_release(t); + break; + } + if (t.settled()) { + handler.on_tracker_settle(t); + if (lctx.auto_settle) + t.settle(); + } + } + } else if (pn_link_is_receiver(lnk)) { Review Comment: I don't understand why the handling of messages on a transaction coordinator link should be different from any other link. ########## cpp/src/proactor_container_impl.cpp: ########## @@ -34,11 +35,15 @@ #include "proton/proactor.h" #include "proton/transport.h" +#include "proton/delivery.h" + #include "contexts.hpp" #include "messaging_adapter.hpp" #include "reconnect_options_impl.hpp" #include "proton_bits.hpp" +#include <proton/types.hpp> + Review Comment: Change not needed ########## cpp/src/proactor_container_impl.cpp: ########## @@ -34,11 +35,15 @@ #include "proton/proactor.h" #include "proton/transport.h" +#include "proton/delivery.h" + Review Comment: Change not needed ########## cpp/examples/tx_send.cpp: ########## @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction_handler.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <atomic> +#include <chrono> +#include <thread> + +class tx_send : public proton::messaging_handler, proton::transaction_handler { + private: + proton::sender sender; + std::string url; + int total; + int batch_size; + int sent; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + + public: + tx_send(const std::string &s, int c, int b): + url(s), total(c), batch_size(b), sent(0) {} + + void on_container_start(proton::container &c) override { + sender = c.open_sender(url); + } + + void on_session_open(proton::session &s) override { + std::cout << "New session is open, declaring transaction now..." << std::endl; + s.declare_transaction(*this); + } + + void on_transaction_declare_failed(proton::session s) { + std::cout << "Transaction declarion failed" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_commit_failed(proton::session s) { + std::cout << "Transaction commit failed!" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_declared(proton::session s) override { + std::cout << "Transaction is declared" << std::endl; + send(); + } + + void on_sendable(proton::sender&) override { + send(); + } + + void send() { + std::atomic<int> unique_id(10000); + proton::session session = sender.session(); + while (session.txn_is_declared() && sender.credit() && + (committed + current_batch) < total) { + proton::message msg; + std::map<std::string, int> m; + m["sequence"] = committed + current_batch; + + msg.id(std::atomic_fetch_add(&unique_id, 1)); + msg.body(m); + std::cout << "Sending: " << msg << std::endl; + session.txn_send(sender, msg); Review Comment: I think the API we were aiming for would just use a sender object created from the transactioned session. > [c++] Support for transactions > ------------------------------ > > Key: PROTON-1442 > URL: https://issues.apache.org/jira/browse/PROTON-1442 > Project: Qpid Proton > Issue Type: Improvement > Components: cpp-binding > Reporter: Radim Kubis > Assignee: Rakhi Kumari > Priority: Major > > Support for transactions in Qpid Proton C++. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org