[ https://issues.apache.org/jira/browse/PROTON-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937227#comment-17937227 ]
ASF GitHub Bot commented on PROTON-1442: ---------------------------------------- astitcher commented on code in PR #437: URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2006440001 ########## cpp/examples/tx_recv.cpp: ########## @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <chrono> +#include <thread> + +class tx_recv : public proton::messaging_handler, proton::transaction_handler { + private: + proton::receiver receiver; + std::string url; + int expected; + int batch_size; + int current_batch = 0; + int committed = 0; + + proton::session session; + public: + tx_recv(const std::string &s, int c, int b): + url(s), expected(c), batch_size(b) {} + + void on_container_start(proton::container &c) override { + receiver = c.open_receiver(url); + } + + void on_session_open(proton::session &s) override { + session = s; + std::cout << " [on_session_open] declare_txn started..." << std::endl; + s.declare_transaction(*this); + std::cout << " [on_session_open] declare_txn ended..." << std::endl; + } + + void on_transaction_declare_failed(proton::session) {} + void on_transaction_commit_failed(proton::session s) { + std::cout << "Transaction Commit Failed" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_declared(proton::session s) override { + std::cout << "[on_transaction_declared] txn called " << (&s) + << std::endl; + receiver.add_credit(batch_size); + } + + void on_message(proton::delivery &d, proton::message &msg) override { + std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; + session.txn_accept(d); Review Comment: session == d.session() ########## cpp/examples/tx_recv.cpp: ########## @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <chrono> +#include <thread> + +class tx_recv : public proton::messaging_handler, proton::transaction_handler { + private: + proton::receiver receiver; + std::string url; + int expected; + int batch_size; + int current_batch = 0; + int committed = 0; + + proton::session session; + public: + tx_recv(const std::string &s, int c, int b): + url(s), expected(c), batch_size(b) {} + + void on_container_start(proton::container &c) override { + receiver = c.open_receiver(url); + } + + void on_session_open(proton::session &s) override { + session = s; Review Comment: Probably not necessary ########## cpp/include/proton/target_options.hpp: ########## @@ -101,6 +103,39 @@ class target_options { /// @endcond }; +class coordinator_options { + public: + /// Create an empty set of options. + PN_CPP_EXTERN coordinator_options(); + + /// Copy options. + PN_CPP_EXTERN coordinator_options(const coordinator_options&); + + PN_CPP_EXTERN ~coordinator_options(); + + /// Copy options. + PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&); + + /// Set the address for the coordinator. It is unset by default. The + /// address is ignored if dynamic() is true. + PN_CPP_EXTERN coordinator_options& address(const std::string& addr); + + /// **Unsettled API** Extension capabilities that are supported/requested + PN_CPP_EXTERN coordinator_options& capabilities(const std::vector<symbol>&); + + private: + void apply(coordinator&) const; + + class impl; + std::unique_ptr<impl> impl_; + + /// @cond INTERNAL + friend class coordinator; + friend class sender_options; + friend class receiver_options; + /// @endcond +}; Review Comment: As coordinator this should be in it's own header file - but again maybe it's not needed to be visible to the API user anyway. ########## cpp/include/proton/container.hpp: ########## @@ -326,6 +326,7 @@ class PN_CPP_CLASS_EXTERN container { friend class receiver_options; friend class sender_options; friend class work_queue; + friend class transaction; Review Comment: Don't like this as `transaction` is not a user visible class, so it shouldn't be mentioned in the API at all. ########## cpp/include/proton/session.hpp: ########## @@ -29,13 +29,15 @@ #include "./sender.hpp" #include <string> +#include <iostream> /// @file /// @copybrief proton::session struct pn_session_t; namespace proton { + class transaction_impl; Review Comment: Not sure you need this. I think the `friend class` declaration below is sufficient. ########## cpp/src/node_options.cpp: ########## @@ -162,6 +162,7 @@ class target_options::impl { option<enum target::expiry_policy> expiry_policy; option<std::vector<symbol> > capabilities; option<target::dynamic_property_map> dynamic_properties; + option<int> type; Review Comment: possibly replace this with `option<bool> is_coordinator;`? ########## cpp/include/proton/fwd.hpp: ########## @@ -52,9 +52,12 @@ class sender_options; class session; class session_options; class source_options; +class coordinator_options; Review Comment: Not sure this is really user visible - I don't think users currently need access to coordinator termini. ########## cpp/examples/tx_recv.cpp: ########## @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <chrono> +#include <thread> + +class tx_recv : public proton::messaging_handler, proton::transaction_handler { + private: + proton::receiver receiver; + std::string url; + int expected; + int batch_size; + int current_batch = 0; + int committed = 0; + + proton::session session; + public: + tx_recv(const std::string &s, int c, int b): + url(s), expected(c), batch_size(b) {} + + void on_container_start(proton::container &c) override { + receiver = c.open_receiver(url); + } + + void on_session_open(proton::session &s) override { + session = s; + std::cout << " [on_session_open] declare_txn started..." << std::endl; + s.declare_transaction(*this); + std::cout << " [on_session_open] declare_txn ended..." << std::endl; + } + + void on_transaction_declare_failed(proton::session) {} + void on_transaction_commit_failed(proton::session s) { + std::cout << "Transaction Commit Failed" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_declared(proton::session s) override { + std::cout << "[on_transaction_declared] txn called " << (&s) + << std::endl; + receiver.add_credit(batch_size); + } + + void on_message(proton::delivery &d, proton::message &msg) override { + std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; + session.txn_accept(d); + current_batch += 1; + if(current_batch == batch_size) { + } + } + + void on_transaction_committed(proton::session s) override { + committed += current_batch; + current_batch = 0; + std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl; + if(committed == expected) { + std::cout << "All messages committed" << std::endl; + s.connection().close(); + } + else { + session.declare_transaction(*this); Review Comment: session == s ########## cpp/include/proton/target.hpp: ########## @@ -65,6 +65,26 @@ class target : public terminus { /// @endcond }; +class coordinator : public terminus { + public: + /// Create an empty coordinator. + coordinator() = default; + + /// The address of the coordinator. + PN_CPP_EXTERN std::string address() const; + private: + coordinator(pn_terminus_t* t); + coordinator(const sender&); + coordinator(const receiver&); + + + /// @cond INTERNAL + friend class proton::internal::factory<coordinator>; + friend class sender; + friend class receiver; + /// @endcond +}; Review Comment: Shouldn't this be in a new `coordinator.hpp` header if it is used visible at all? I think it's likely though that this class currently doesn't need to be use visible anyway as it has no current use for a use of the API. ########## cpp/examples/tx_send.cpp: ########## @@ -0,0 +1,172 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <chrono> +#include <thread> + +class tx_send : public proton::messaging_handler, proton::transaction_handler { + private: + proton::sender sender; + std::string url; + int total; + int batch_size; + int sent; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + int confirmed = 0; + + proton::session session; Review Comment: As tx_recv: I don't think you need to store away the session. ########## cpp/examples/tx_send.cpp: ########## @@ -0,0 +1,172 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <chrono> +#include <thread> + +class tx_send : public proton::messaging_handler, proton::transaction_handler { + private: + proton::sender sender; + std::string url; + int total; + int batch_size; + int sent; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + int confirmed = 0; + + proton::session session; + + public: + tx_send(const std::string &s, int c, int b): + url(s), total(c), batch_size(b), sent(0) {} + + void on_container_start(proton::container &c) override { + sender = c.open_sender(url); + } + + void on_session_open(proton::session &s) override { + session = s; + std::cout << " [on_session_open] declare_txn started..." << std::endl; + s.declare_transaction(*this); + std::cout << " [on_session_open] declare_txn ended..." << std::endl; + } + + void on_transaction_declare_failed(proton::session) {} + void on_transaction_commit_failed(proton::session s) { + std::cout << "Transaction Commit Failed" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_declared(proton::session s) override { + std::cout << "[on_transaction_declared] Session: " << (&s) + << std::endl; + std::cout << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty()) + << "\t" << std::endl; + send(sender); + } + + void on_sendable(proton::sender &s) override { + std::cout << " [OnSendable] session: " << &session + << std::endl; + send(s); + } + + void send(proton::sender &s) { + static int unique_id = 10000; + while (session.txn_is_declared() && sender.credit() && + (committed + current_batch) < total) { + proton::message msg; + std::map<std::string, int> m; + m["sequence"] = committed + current_batch; + + msg.id(unique_id++); + msg.body(m); + std::cout << "[example] transaction send msg: " << msg + << std::endl; + session.txn_send(sender, msg); + current_batch += 1; + if(current_batch == batch_size) + { + std::cout << " >> Txn attempt commit" << std::endl; + if (batch_index % 2 == 0) { + session.txn_commit(); + } else { + session.txn_abort(); + } + batch_index++; + } + } + } + + void on_tracker_accept(proton::tracker &t) override { + confirmed += 1; + std::cout << " [example] on_tracker_accept:" << confirmed + << std::endl; + } + + void on_transaction_committed(proton::session s) override { + committed += current_batch; + current_batch = 0; + std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl; + if(committed == total) { + std::cout << "All messages committed" << std::endl; + s.connection().close(); + } + else { + std::cout << "redlcaring txn " << std::endl; + session.declare_transaction(*this); + } + } + + void on_transaction_aborted(proton::session s) override { + std::cout << "Meesages Aborted ....." << std::endl; Review Comment: typo ########## cpp/examples/tx_send.cpp: ########## @@ -0,0 +1,172 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <chrono> +#include <thread> + +class tx_send : public proton::messaging_handler, proton::transaction_handler { + private: + proton::sender sender; + std::string url; + int total; + int batch_size; + int sent; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + int confirmed = 0; + + proton::session session; + + public: + tx_send(const std::string &s, int c, int b): + url(s), total(c), batch_size(b), sent(0) {} + + void on_container_start(proton::container &c) override { + sender = c.open_sender(url); + } + + void on_session_open(proton::session &s) override { + session = s; + std::cout << " [on_session_open] declare_txn started..." << std::endl; + s.declare_transaction(*this); + std::cout << " [on_session_open] declare_txn ended..." << std::endl; + } + + void on_transaction_declare_failed(proton::session) {} + void on_transaction_commit_failed(proton::session s) { + std::cout << "Transaction Commit Failed" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_declared(proton::session s) override { + std::cout << "[on_transaction_declared] Session: " << (&s) + << std::endl; + std::cout << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty()) + << "\t" << std::endl; + send(sender); + } + + void on_sendable(proton::sender &s) override { + std::cout << " [OnSendable] session: " << &session + << std::endl; + send(s); + } + + void send(proton::sender &s) { + static int unique_id = 10000; + while (session.txn_is_declared() && sender.credit() && + (committed + current_batch) < total) { + proton::message msg; + std::map<std::string, int> m; + m["sequence"] = committed + current_batch; + + msg.id(unique_id++); + msg.body(m); + std::cout << "[example] transaction send msg: " << msg + << std::endl; + session.txn_send(sender, msg); + current_batch += 1; + if(current_batch == batch_size) + { + std::cout << " >> Txn attempt commit" << std::endl; + if (batch_index % 2 == 0) { + session.txn_commit(); + } else { + session.txn_abort(); + } + batch_index++; + } + } + } + + void on_tracker_accept(proton::tracker &t) override { + confirmed += 1; + std::cout << " [example] on_tracker_accept:" << confirmed + << std::endl; + } + + void on_transaction_committed(proton::session s) override { + committed += current_batch; + current_batch = 0; + std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl; + if(committed == total) { + std::cout << "All messages committed" << std::endl; + s.connection().close(); + } + else { + std::cout << "redlcaring txn " << std::endl; Review Comment: typo ########## cpp/include/proton/transaction.hpp: ########## @@ -0,0 +1,63 @@ +#ifndef PROTON_TRANSACTION_HPP +#define PROTON_TRANSACTION_HPP + + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "./fwd.hpp" +#include "./internal/export.hpp" +#include "./sender.hpp" +#include "./tracker.hpp" +#include "./container.hpp" + +/// @file +/// @copybrief proton::transaction + +namespace proton { + +class transaction_handler; Review Comment: Don't need a declaration immediately followed by its definition ########## cpp/examples/tx_recv.cpp: ########## @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <chrono> +#include <thread> + +class tx_recv : public proton::messaging_handler, proton::transaction_handler { + private: + proton::receiver receiver; + std::string url; + int expected; + int batch_size; + int current_batch = 0; + int committed = 0; + + proton::session session; Review Comment: Not sure we need to store the session anywhere - I think it can always be derived inside any callback ########## cpp/include/proton/transaction.hpp: ########## @@ -0,0 +1,63 @@ +#ifndef PROTON_TRANSACTION_HPP Review Comment: I think this file should be called `transaction_handler.hpp` as that is what it defines ########## cpp/include/proton/terminus.hpp: ########## @@ -122,6 +122,7 @@ class terminus { friend class internal::factory<terminus>; friend class source; friend class target; + friend class coordinator; Review Comment: I don't really like this especially if coordinator ends up being purely internal to the binding ########## cpp/examples/tx_send.cpp: ########## @@ -0,0 +1,172 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/types.hpp> +#include <proton/transaction.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include <chrono> +#include <thread> + +class tx_send : public proton::messaging_handler, proton::transaction_handler { + private: + proton::sender sender; + std::string url; + int total; + int batch_size; + int sent; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + int confirmed = 0; + + proton::session session; + + public: + tx_send(const std::string &s, int c, int b): + url(s), total(c), batch_size(b), sent(0) {} + + void on_container_start(proton::container &c) override { + sender = c.open_sender(url); + } + + void on_session_open(proton::session &s) override { + session = s; + std::cout << " [on_session_open] declare_txn started..." << std::endl; + s.declare_transaction(*this); + std::cout << " [on_session_open] declare_txn ended..." << std::endl; + } + + void on_transaction_declare_failed(proton::session) {} + void on_transaction_commit_failed(proton::session s) { + std::cout << "Transaction Commit Failed" << std::endl; + s.connection().close(); + exit(-1); + } + + void on_transaction_declared(proton::session s) override { + std::cout << "[on_transaction_declared] Session: " << (&s) + << std::endl; + std::cout << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty()) + << "\t" << std::endl; + send(sender); + } + + void on_sendable(proton::sender &s) override { + std::cout << " [OnSendable] session: " << &session + << std::endl; + send(s); + } + + void send(proton::sender &s) { + static int unique_id = 10000; Review Comment: I think using `static` like this is bad practice in example code. It won't be thread safe if this code is copied into user code which uses multiple sending connections. Since we've got C++17 you could use `std::atomic_int` and `std::atomic_fetch_add` ########## cpp/include/proton/fwd.hpp: ########## @@ -52,9 +52,12 @@ class sender_options; class session; class session_options; class source_options; +class coordinator_options; class ssl; class target_options; class tracker; +class transaction; Review Comment: There is no user visible transaaction class, so this can't be necessary. ########## cpp/include/proton/target.hpp: ########## @@ -65,6 +65,26 @@ class target : public terminus { /// @endcond }; +class coordinator : public terminus { + public: + /// Create an empty coordinator. + coordinator() = default; + + /// The address of the coordinator. + PN_CPP_EXTERN std::string address() const; + private: + coordinator(pn_terminus_t* t); + coordinator(const sender&); + coordinator(const receiver&); + + + /// @cond INTERNAL + friend class proton::internal::factory<coordinator>; + friend class sender; + friend class receiver; + /// @endcond +}; Review Comment: As I said above I'm coming to think this might actually be just an internal subclass or `target` or just represented as a transaction_coordinator boolean on a target, set by a similarly named target_option. ########## cpp/src/proton_bits.hpp: ########## @@ -98,11 +102,15 @@ template <> struct wrapped<sender> { typedef pn_link_t type; }; template <> struct wrapped<receiver> { typedef pn_link_t type; }; template <> struct wrapped<transfer> { typedef pn_delivery_t type; }; template <> struct wrapped<tracker> { typedef pn_delivery_t type; }; +template <> struct wrapped<disposition> { + typedef pn_disposition_t type; +}; Review Comment: Inconsistent formatting ########## cpp/include/proton/sender_options.hpp: ########## @@ -94,6 +94,9 @@ class sender_options { /// Options for the receiver node of the receiver. PN_CPP_EXTERN sender_options& target(const target_options&); + /// Options for the coordinator node of the receiver. + PN_CPP_EXTERN sender_options& coordinator(const coordinator_options&); Review Comment: I've now come to think that I was wrong earlier when I suggested that `coordinator` is a peer class to `target` and `source`. Reading the spec more carefully, I think that a transaction coordinator is a special kind of target (that is how it's described in the spec) and could be modeled either as a subclass of` `target` or maybe just as a simple boolean `target_option` say `is_transaction_coordinator`. ########## cpp/src/node_options.cpp: ########## @@ -175,6 +176,9 @@ class target_options::impl { get(dynamic_properties.value, target_map); value(pn_terminus_properties(unwrap(t))) = target_map; } + if (type.set) { + pn_terminus_set_type(unwrap(t), pn_terminus_type_t(type.value)); + } Review Comment: use `is_coordinator`? ########## cpp/src/proton_bits.hpp: ########## @@ -111,6 +119,9 @@ template <> struct wrapper<pn_connection_t> { typedef connection type; }; template <> struct wrapper<pn_session_t> { typedef session type; }; template <> struct wrapper<pn_link_t> { typedef link type; }; template <> struct wrapper<pn_delivery_t> { typedef transfer type; }; +template <> struct wrapper<pn_disposition_t> { + typedef disposition type; +}; Review Comment: Inconsistent formatting ########## cpp/src/sender_options.cpp: ########## @@ -66,6 +66,7 @@ class sender_options::impl { option<bool> auto_settle; option<source_options> source; option<target_options> target; + option<coordinator_options> coordinator; Review Comment: Not needed if coordinator is a kind of target ########## cpp/src/sender_options.cpp: ########## @@ -82,6 +83,10 @@ class sender_options::impl { proton::target local_t(make_wrapper<proton::target>(pn_link_target(unwrap(s)))); target.value.apply(local_t); } + if (coordinator.set) { + proton::coordinator local_t(make_wrapper<proton::coordinator>(pn_link_target(unwrap(s)))); + coordinator.value.apply(local_t); + } Review Comment: Ditto ########## cpp/src/transaction.cpp: ########## @@ -0,0 +1,44 @@ +/* Review Comment: Call this file transaction_handler.cpp ########## cpp/include/proton/target_options.hpp: ########## @@ -101,6 +103,39 @@ class target_options { /// @endcond }; +class coordinator_options { + public: + /// Create an empty set of options. + PN_CPP_EXTERN coordinator_options(); + + /// Copy options. + PN_CPP_EXTERN coordinator_options(const coordinator_options&); + + PN_CPP_EXTERN ~coordinator_options(); + + /// Copy options. + PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&); + + /// Set the address for the coordinator. It is unset by default. The + /// address is ignored if dynamic() is true. + PN_CPP_EXTERN coordinator_options& address(const std::string& addr); + + /// **Unsettled API** Extension capabilities that are supported/requested + PN_CPP_EXTERN coordinator_options& capabilities(const std::vector<symbol>&); + + private: + void apply(coordinator&) const; + + class impl; + std::unique_ptr<impl> impl_; + + /// @cond INTERNAL + friend class coordinator; + friend class sender_options; + friend class receiver_options; + /// @endcond +}; Review Comment: Again, probably an internal subclass of `target` or not needed at all because it's represented by a simple boolean in target. ########## cpp/src/sender_options.cpp: ########## @@ -118,6 +123,7 @@ sender_options& sender_options::delivery_mode(proton::delivery_mode m) {impl_->d sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; } sender_options& sender_options::source(const source_options &s) {impl_->source = s; return *this; } sender_options& sender_options::target(const target_options &s) {impl_->target = s; return *this; } +sender_options& sender_options::coordinator(const coordinator_options &s) {impl_->coordinator = s; return *this; } Review Comment: Ditto > [c++] Support for transactions > ------------------------------ > > Key: PROTON-1442 > URL: https://issues.apache.org/jira/browse/PROTON-1442 > Project: Qpid Proton > Issue Type: Improvement > Components: cpp-binding > Reporter: Radim Kubis > Assignee: Rakhi Kumari > Priority: Major > > Support for transactions in Qpid Proton C++. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org