astitcher commented on code in PR #437: URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2058501754
########## cpp/include/proton/session.hpp: ########## @@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + PN_CPP_EXTERN void declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false); + + PN_CPP_EXTERN bool txn_is_empty(); + PN_CPP_EXTERN bool txn_is_declared(); + PN_CPP_EXTERN void txn_commit(); + PN_CPP_EXTERN void txn_abort(); + PN_CPP_EXTERN void txn_declare(); + PN_CPP_EXTERN void txn_handle_outcome(proton::tracker); + PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message msg); + PN_CPP_EXTERN void txn_accept(delivery &t); + PN_CPP_EXTERN proton::connection txn_connection() const; + /// @cond INTERNAL friend class internal::factory<session>; friend class session_iterator; + friend class transaction_impl; /// @endcond + + private: + // clean up txn internally + void txn_delete(); Review Comment: I'd be very surprised if this is needed here - I'd think it could only be called from the impl class ########## cpp/include/proton/session.hpp: ########## @@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + PN_CPP_EXTERN void declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false); + + PN_CPP_EXTERN bool txn_is_empty(); + PN_CPP_EXTERN bool txn_is_declared(); + PN_CPP_EXTERN void txn_commit(); + PN_CPP_EXTERN void txn_abort(); + PN_CPP_EXTERN void txn_declare(); + PN_CPP_EXTERN void txn_handle_outcome(proton::tracker); + PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message msg); + PN_CPP_EXTERN void txn_accept(delivery &t); + PN_CPP_EXTERN proton::connection txn_connection() const; Review Comment: Aren't these just internal functions? They should only be needed inside session_impl ########## cpp/include/proton/session.hpp: ########## @@ -36,6 +36,7 @@ struct pn_session_t; namespace proton { + class transaction_impl; Review Comment: Is this actually needed here? ########## cpp/src/messaging_adapter.cpp: ########## @@ -274,11 +299,13 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) { void on_link_remote_open(messaging_handler& handler, pn_event_t* event) { auto lnk = pn_event_link(event); - // Currently don't implement (transaction) coordinator - if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) { - auto error = pn_link_condition(lnk); - pn_condition_set_name(error, "amqp:not-implemented"); - pn_link_close(lnk); + if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) { + auto cond = pn_link_condition(lnk); + if (pn_condition_is_set(cond)) { + pn_condition_set_name(cond, "amqp:on_link_remote_open:FAILED"); + pn_link_close(lnk); + return; + } Review Comment: I think this change might be wrong - the C++ binding still doesn't support an incoming coordinator target. [Even though we do now use an outgoing transaction coordinator target] ########## cpp/src/session.cpp: ########## @@ -148,4 +158,231 @@ void* session::user_data() const { return sctx.user_data_; } + + +class transaction_impl { + public: + proton::sender txn_ctrl; + proton::transaction_handler *handler = nullptr; + proton::binary transaction_id; + bool failed = false; + enum State { + FREE, + DECLARING, + DECLARED, + DISCHARGING, + }; + enum State state = State::FREE; + std::vector<proton::tracker> pending; + + void commit(); + void abort(); + void declare(); + proton::tracker send(proton::sender s, proton::message msg); + + void discharge(bool failed); + void release_pending(); + void accept(delivery &d); + void update(tracker &d, uint64_t state); + + proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); + void handle_outcome(proton::tracker t); + transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge); + ~transaction_impl(); +}; + +void session::declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge) { + auto &txn_impl = session_context::get(pn_object())._txn_impl; + if (txn_impl == nullptr) { + // Create _txn_impl + proton::connection conn = this->connection(); + class InternalTransactionHandler : public proton::messaging_handler { + + void on_tracker_settle(proton::tracker &t) override { + if (!t.session().txn_is_empty()) { + t.session().txn_handle_outcome(t); + } + } + }; + + proton::target_options opts; + std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")}; + opts.capabilities(cap); + opts.mark_coordinator(); + + proton::sender_options so; + so.name("txn-ctrl"); + so.target(opts); + + static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it + so.handler(internal_handler); + + static proton::sender s = conn.open_sender("does not matter", so); + + settle_before_discharge = false; + + txn_impl = new transaction_impl(s, handler, settle_before_discharge); + } + // Declare txn + txn_impl->declare(); +} + +void session::txn_delete() { auto &_txn_impl = session_context::get(pn_object())._txn_impl; delete _txn_impl; _txn_impl = nullptr;} +void session::txn_commit() { session_context::get(pn_object())._txn_impl->commit(); } +void session::txn_abort() { session_context::get(pn_object())._txn_impl->abort(); } +void session::txn_declare() { session_context::get(pn_object())._txn_impl->declare(); } +bool session::txn_is_empty() { return session_context::get(pn_object())._txn_impl == NULL; } +bool session::txn_is_declared() { return (!txn_is_empty()) && session_context::get(pn_object())._txn_impl->state == transaction_impl::State::DECLARED; } +void session::txn_accept(delivery &t) { return session_context::get(pn_object())._txn_impl->accept(t); } +proton::tracker session::txn_send(proton::sender s, proton::message msg) { + return session_context::get(pn_object())._txn_impl->send(s, msg); +} +void session::txn_handle_outcome(proton::tracker t) { + session_context::get(pn_object())._txn_impl->handle_outcome(t); +} + +transaction_impl::transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge) + : txn_ctrl(_txn_ctrl), handler(&_handler) { +} +transaction_impl::~transaction_impl() {} + +void transaction_impl::commit() { + discharge(false); +} + +void transaction_impl::abort() { + discharge(true); +} + +void transaction_impl::declare() { + if (state != transaction_impl::State::FREE) + throw proton::error("This session has some associcated transaction already"); + state = State::DECLARING; + + proton::symbol descriptor("amqp:declare:list"); + std::list<proton::value> vd; + proton::value i_am_null; + vd.push_back(i_am_null); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +void transaction_impl::discharge(bool _failed) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared txn can be discharged."); + state = State::DISCHARGING; + + failed = _failed; + proton::symbol descriptor("amqp:discharge:list"); + std::list<proton::value> vd; + vd.push_back(transaction_id); + vd.push_back(failed); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::value _value) { + proton::value msg_value; + proton::codec::encoder enc(msg_value); + enc << proton::codec::start::described() + << descriptor + << _value + << proton::codec::finish(); + + + proton::message msg = msg_value; + proton::tracker delivery = txn_ctrl.send(msg); + return delivery; +} + +proton::tracker transaction_impl::send(proton::sender s, proton::message msg) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared transaction can send a message"); + proton::tracker tracker = s.send(msg); + update(tracker, 0x34); + return tracker; +} + +void transaction_impl::accept(delivery &t) { + t.settle(); Review Comment: Needs update(t, PN_ACCEPT); ########## cpp/src/session.cpp: ########## @@ -148,4 +158,231 @@ void* session::user_data() const { return sctx.user_data_; } + + +class transaction_impl { + public: + proton::sender txn_ctrl; + proton::transaction_handler *handler = nullptr; + proton::binary transaction_id; + bool failed = false; + enum State { + FREE, + DECLARING, + DECLARED, + DISCHARGING, + }; + enum State state = State::FREE; + std::vector<proton::tracker> pending; + + void commit(); + void abort(); + void declare(); + proton::tracker send(proton::sender s, proton::message msg); + + void discharge(bool failed); + void release_pending(); + void accept(delivery &d); + void update(tracker &d, uint64_t state); + + proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value); + void handle_outcome(proton::tracker t); + transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge); + ~transaction_impl(); +}; + +void session::declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge) { + auto &txn_impl = session_context::get(pn_object())._txn_impl; + if (txn_impl == nullptr) { + // Create _txn_impl + proton::connection conn = this->connection(); + class InternalTransactionHandler : public proton::messaging_handler { + + void on_tracker_settle(proton::tracker &t) override { + if (!t.session().txn_is_empty()) { + t.session().txn_handle_outcome(t); + } + } + }; + + proton::target_options opts; + std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")}; + opts.capabilities(cap); + opts.mark_coordinator(); + + proton::sender_options so; + so.name("txn-ctrl"); + so.target(opts); + + static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it + so.handler(internal_handler); + + static proton::sender s = conn.open_sender("does not matter", so); + + settle_before_discharge = false; + + txn_impl = new transaction_impl(s, handler, settle_before_discharge); + } + // Declare txn + txn_impl->declare(); +} + +void session::txn_delete() { auto &_txn_impl = session_context::get(pn_object())._txn_impl; delete _txn_impl; _txn_impl = nullptr;} +void session::txn_commit() { session_context::get(pn_object())._txn_impl->commit(); } +void session::txn_abort() { session_context::get(pn_object())._txn_impl->abort(); } +void session::txn_declare() { session_context::get(pn_object())._txn_impl->declare(); } +bool session::txn_is_empty() { return session_context::get(pn_object())._txn_impl == NULL; } +bool session::txn_is_declared() { return (!txn_is_empty()) && session_context::get(pn_object())._txn_impl->state == transaction_impl::State::DECLARED; } +void session::txn_accept(delivery &t) { return session_context::get(pn_object())._txn_impl->accept(t); } +proton::tracker session::txn_send(proton::sender s, proton::message msg) { + return session_context::get(pn_object())._txn_impl->send(s, msg); +} +void session::txn_handle_outcome(proton::tracker t) { + session_context::get(pn_object())._txn_impl->handle_outcome(t); +} + +transaction_impl::transaction_impl(proton::sender &_txn_ctrl, + proton::transaction_handler &_handler, + bool _settle_before_discharge) + : txn_ctrl(_txn_ctrl), handler(&_handler) { +} +transaction_impl::~transaction_impl() {} + +void transaction_impl::commit() { + discharge(false); +} + +void transaction_impl::abort() { + discharge(true); +} + +void transaction_impl::declare() { + if (state != transaction_impl::State::FREE) + throw proton::error("This session has some associcated transaction already"); + state = State::DECLARING; + + proton::symbol descriptor("amqp:declare:list"); + std::list<proton::value> vd; + proton::value i_am_null; + vd.push_back(i_am_null); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +void transaction_impl::discharge(bool _failed) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared txn can be discharged."); + state = State::DISCHARGING; + + failed = _failed; + proton::symbol descriptor("amqp:discharge:list"); + std::list<proton::value> vd; + vd.push_back(transaction_id); + vd.push_back(failed); + proton::value _value = vd; + send_ctrl(descriptor, _value); +} + +proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::value _value) { + proton::value msg_value; + proton::codec::encoder enc(msg_value); + enc << proton::codec::start::described() + << descriptor + << _value + << proton::codec::finish(); + + + proton::message msg = msg_value; + proton::tracker delivery = txn_ctrl.send(msg); + return delivery; +} + +proton::tracker transaction_impl::send(proton::sender s, proton::message msg) { + if (state != transaction_impl::State::DECLARED) + throw proton::error("Only a declared transaction can send a message"); + proton::tracker tracker = s.send(msg); + update(tracker, 0x34); + return tracker; +} + +void transaction_impl::accept(delivery &t) { + t.settle(); +} + +void transaction_impl::update(tracker &t, uint64_t state) { Review Comment: I think this should be delivery not tracker ########## cpp/include/proton/session.hpp: ########## @@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + PN_CPP_EXTERN void declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false); + + PN_CPP_EXTERN bool txn_is_empty(); + PN_CPP_EXTERN bool txn_is_declared(); + PN_CPP_EXTERN void txn_commit(); + PN_CPP_EXTERN void txn_abort(); + PN_CPP_EXTERN void txn_declare(); + PN_CPP_EXTERN void txn_handle_outcome(proton::tracker); + PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message msg); + PN_CPP_EXTERN void txn_accept(delivery &t); + PN_CPP_EXTERN proton::connection txn_connection() const; + /// @cond INTERNAL friend class internal::factory<session>; friend class session_iterator; + friend class transaction_impl; Review Comment: I'd think it's more likely that transaction_impl needs to a friend of session_impl ########## cpp/include/proton/target_options.hpp: ########## @@ -60,6 +60,10 @@ class target_options { /// address is ignored if dynamic() is true. PN_CPP_EXTERN target_options& address(const std::string& addr); + /// Set the target be of type coordinator. + /// This immediately override the currently assigned type. + PN_CPP_EXTERN target_options& mark_coordinator(); Review Comment: Need a better name: set_coordinator? make_coordinator? It's possible this doesn't need to be a user visible option. ########## cpp/src/proactor_container_impl.cpp: ########## @@ -26,6 +26,7 @@ #include "proton/listener.hpp" #include "proton/reconnect_options.hpp" #include "proton/ssl.hpp" +#include "proton/target_options.hpp" Review Comment: Change Not needed ########## cpp/src/messaging_adapter.cpp: ########## @@ -116,7 +119,29 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { link_context& lctx = link_context::get(lnk); Tracing& ot = Tracing::getTracing(); - if (pn_link_is_receiver(lnk)) { + if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) { + if (pn_delivery_updated(dlv)) { + tracker t(make_wrapper<tracker>(dlv)); + ot.on_settled_span(t); + switch (pn_delivery_remote_state(dlv)) { + case PN_ACCEPTED: + handler.on_tracker_accept(t); + break; + case PN_REJECTED: + handler.on_tracker_reject(t); + break; + case PN_RELEASED: + case PN_MODIFIED: + handler.on_tracker_release(t); + break; + } + if (t.settled()) { + handler.on_tracker_settle(t); + if (lctx.auto_settle) + t.settle(); + } + } + } else if (pn_link_is_receiver(lnk)) { Review Comment: I don't understand why the handling of messages on a transaction coordinator link should be different from any other link. ########## cpp/src/proactor_container_impl.cpp: ########## @@ -34,11 +35,15 @@ #include "proton/proactor.h" #include "proton/transport.h" +#include "proton/delivery.h" + #include "contexts.hpp" #include "messaging_adapter.hpp" #include "reconnect_options_impl.hpp" #include "proton_bits.hpp" +#include <proton/types.hpp> + Review Comment: Change not needed ########## cpp/src/proactor_container_impl.cpp: ########## @@ -34,11 +35,15 @@ #include "proton/proactor.h" #include "proton/transport.h" +#include "proton/delivery.h" + Review Comment: Change not needed ########## cpp/examples/tx_send.cpp: ########## @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction_handler.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <atomic> +#include <chrono> +#include <thread> + +class tx_send : public proton::messaging_handler, proton::transaction_handler { + private: + proton::sender sender; + std::string url; + int total; + int batch_size; + int sent; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + + public: + tx_send(const std::string &s, int c, int b): + url(s), total(c), batch_size(b), sent(0) {} + + void on_container_start(proton::container &c) override { + sender = c.open_sender(url); + } + + void on_session_open(proton::session &s) override { + std::cout << "New session is open, declaring transaction now..." << std::endl; + s.declare_transaction(*this); + } + + void on_transaction_declare_failed(proton::session s) { + std::cout << "Transaction declarion failed" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_commit_failed(proton::session s) { + std::cout << "Transaction commit failed!" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_declared(proton::session s) override { + std::cout << "Transaction is declared" << std::endl; + send(); + } + + void on_sendable(proton::sender&) override { + send(); + } + + void send() { + std::atomic<int> unique_id(10000); + proton::session session = sender.session(); + while (session.txn_is_declared() && sender.credit() && + (committed + current_batch) < total) { + proton::message msg; + std::map<std::string, int> m; + m["sequence"] = committed + current_batch; + + msg.id(std::atomic_fetch_add(&unique_id, 1)); + msg.body(m); + std::cout << "Sending: " << msg << std::endl; + session.txn_send(sender, msg); Review Comment: I think the API we were aiming for would just use a sender object created from the transactioned session. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org