rajvarun77 commented on code in PR #3330:
URL: https://github.com/apache/brpc/pull/3330#discussion_r3384395487


##########
src/brpc/policy/mysql/mysql_common.h:
##########
@@ -0,0 +1,422 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Authors: Yang,Liming ([email protected])
+
+#ifndef BRPC_MYSQL_COMMON_H
+#define BRPC_MYSQL_COMMON_H
+
+#include <sstream>
+#include <map>
+#include "butil/logging.h"  // LOG()
+
+namespace brpc {
+// Msql Collation
+extern const char* MysqlDefaultCollation;
+extern const char* MysqlBinaryCollation;
+const std::map<std::string, uint8_t> MysqlCollations = {
+    {"big5_chinese_ci", 1},
+    {"latin2_czech_cs", 2},

Review Comment:
   Addressed in the current revision — `MysqlCollations` is declared `extern` 
in `mysql_common.h` and defined once in `mysql_common.cpp`, so there is no 
in-header definition / ODR issue.



##########
src/brpc/policy/mysql/mysql.cpp:
##########
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Authors: Yang,Liming ([email protected])
+
+#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
+#include <algorithm>
+#include <gflags/gflags.h>
+#include "butil/string_printf.h"
+#include "butil/macros.h"
+#include "brpc/controller.h"
+#include "brpc/policy/mysql/mysql.h"
+#include "brpc/policy/mysql/mysql_common.h"
+
+namespace brpc {
+
+DEFINE_int32(mysql_multi_replies_size, 10, "multi replies size in one 
MysqlResponse");
+
+// ===================================================================
+
+butil::Status MysqlStatementStub::PackExecuteCommand(butil::IOBuf* outbuf, 
uint32_t stmt_id) {
+    butil::Status st;
+    // long data
+    for (const auto& i : _long_data) {
+        st = MysqlMakeLongDataPacket(outbuf, stmt_id, i.param_id, i.long_data);
+        if (!st.ok()) {
+            LOG(ERROR) << "make long data header error " << st;
+            return st;
+        }
+    }
+    _long_data.clear();
+    // execute data
+    st = MysqlMakeExecutePacket(outbuf, stmt_id, _execute_data);
+    if (!st.ok()) {
+        LOG(ERROR) << "make execute header error " << st;
+        return st;
+    }
+    _execute_data.clear();
+    _null_mask.mask.clear();
+    _null_mask.area = butil::IOBuf::INVALID_AREA;
+    _param_types.types.clear();
+    _param_types.area = butil::IOBuf::INVALID_AREA;
+
+    return st;
+}
+
+MysqlRequest::MysqlRequest()
+    : NonreflectableMessage<MysqlRequest>() {
+    SharedCtor();
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx)
+    : NonreflectableMessage<MysqlRequest>() {
+    SharedCtor();
+    _tx = tx;
+}
+
+MysqlRequest::MysqlRequest(MysqlStatement* stmt)
+    : NonreflectableMessage<MysqlRequest>() {
+    SharedCtor();
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt)
+    : NonreflectableMessage<MysqlRequest>() {
+    SharedCtor();
+    _tx = tx;
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+MysqlRequest::MysqlRequest(const MysqlRequest& from)
+    : NonreflectableMessage<MysqlRequest>(from) {
+    SharedCtor();
+    MergeFrom(from);
+}
+
+void MysqlRequest::SharedCtor() {
+    _has_error = false;
+    _cached_size_ = 0;
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+    _param_index = 0;
+}
+
+MysqlRequest::~MysqlRequest() {
+    SharedDtor();
+    if (_stmt != NULL) {
+        delete _stmt;
+    }
+    _stmt = NULL;
+}
+
+void MysqlRequest::SharedDtor() {
+}
+
+void MysqlRequest::SetCachedSize(int size) const {
+    _cached_size_ = size;
+}
+
+void MysqlRequest::Clear() {
+    _has_error = false;
+    _buf.clear();
+    _has_command = false;
+    _tx = NULL;
+    if (_stmt) {
+        delete _stmt;
+        _stmt = NULL;
+    }
+    _param_index = 0;
+}
+
+size_t MysqlRequest::ByteSizeLong() const {
+    int total_size = _buf.size();
+    _cached_size_ = total_size;
+    return total_size;
+}
+
+void MysqlRequest::MergeFrom(const MysqlRequest& from) {
+    if (&from == this) {
+        return;
+    }
+    // Copy all members so CopyFrom/copy-construct yields an equivalent request
+    // instead of an empty one.
+    _has_command = from._has_command;
+    _has_error = from._has_error;
+    _buf = from._buf;
+    _cached_size_ = from._cached_size_;
+    _param_index = from._param_index;
+    // _tx is a non-owning pointer (never deleted by MysqlRequest): shallow 
copy.
+    _tx = from._tx;
+    // _stmt is owned (deleted in the dtor): deep-copy to avoid double free.
+    if (_stmt != NULL) {
+        delete _stmt;
+        _stmt = NULL;
+    }
+    if (from._stmt != NULL) {
+        _stmt = new MysqlStatementStub(*from._stmt);
+    }
+}
+
+void MysqlRequest::Swap(MysqlRequest* other) {
+    if (other != this) {
+        _buf.swap(other->_buf);
+        std::swap(_has_error, other->_has_error);
+        std::swap(_cached_size_, other->_cached_size_);
+        std::swap(_has_command, other->_has_command);
+        std::swap(_tx, other->_tx);
+        std::swap(_stmt, other->_stmt);
+        std::swap(_param_index, other->_param_index);
+    }
+}
+
+bool MysqlRequest::SerializeTo(butil::IOBuf* buf) const {
+    if (_has_error) {
+        LOG(ERROR) << "Reject serialization due to error in CommandXXX[V]";
+        return false;
+    }
+    *buf = _buf;
+    return true;
+}
+
+bool MysqlRequest::Query(const butil::StringPiece& command) {
+    if (_has_error) {
+        return false;
+    }
+
+    if (_has_command) {
+        return false;
+    }
+
+    const butil::Status st = MysqlMakeCommand(&_buf, MYSQL_COM_QUERY, command);
+    if (st.ok()) {
+        _has_command = true;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+
+bool MysqlRequest::AddParam(int8_t p) {
+    if (_has_error) {
+        return false;
+    }
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_TINY);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(uint8_t p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st =
+        MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_TINY, 
true);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(int16_t p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_SHORT);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(uint16_t p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st =
+        MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_SHORT, 
true);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(int32_t p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_LONG);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(uint32_t p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st =
+        MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_LONG, 
true);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(int64_t p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st =
+        MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_LONGLONG);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(uint64_t p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st =
+        MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_LONGLONG, true);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(float p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_FLOAT);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(double p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_DOUBLE);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(const butil::StringPiece& p) {
+    if (_stmt == NULL || _stmt->stmt() == NULL) {
+        _has_error = true;
+        return false;
+    }
+    const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, 
MYSQL_FIELD_TYPE_STRING);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+
+void MysqlRequest::Print(std::ostream& os) const {
+    butil::IOBuf cp = _buf;
+    {
+        uint8_t buf[3];
+        cp.cutn(buf, 3);
+        os << "size:" << mysql_uint3korr(buf) << ",";
+    }
+    {
+        uint8_t buf;
+        cp.cut1((char*)&buf);
+        os << "sequence:" << (unsigned)buf << ",";
+    }
+    os << "payload(hex):";
+    while (!cp.empty()) {
+        uint8_t buf;
+        cp.cut1((char*)&buf);
+        os << std::hex << (unsigned)buf;
+    }
+}
+
+std::ostream& operator<<(std::ostream& os, const MysqlRequest& r) {
+    r.Print(os);
+    return os;
+}
+
+// ===================================================================
+
+#ifndef _MSC_VER
+#endif  // !_MSC_VER
+
+MysqlResponse::MysqlResponse()
+    : NonreflectableMessage<MysqlResponse>() {
+    SharedCtor();
+}
+
+MysqlResponse::MysqlResponse(const MysqlResponse& from)
+    : NonreflectableMessage<MysqlResponse>(from) {
+    SharedCtor();
+    MergeFrom(from);
+}
+
+void MysqlResponse::SharedCtor() {
+    _nreply = 0;
+    _cached_size_ = 0;
+}
+
+MysqlResponse::~MysqlResponse() {
+    SharedDtor();
+}
+
+void MysqlResponse::SharedDtor() {
+}
+
+void MysqlResponse::SetCachedSize(int size) const {
+    _cached_size_ = size;
+}
+
+void MysqlResponse::Clear() {
+    // Reset all response state so a reused MysqlResponse does not return
+    // stale replies. Mirror what SharedCtor()/ctor initialize.
+    MysqlReply empty_reply;
+    _first_reply.Swap(empty_reply);
+    _other_replies.clear();
+    _arena.clear();
+    _nreply = 0;
+    _cached_size_ = 0;
+}
+
+size_t MysqlResponse::ByteSizeLong() const {
+    return _cached_size_;
+}
+
+void MysqlResponse::MergeFrom(const MysqlResponse& from) {
+    CHECK_NE(&from, this);
+}

Review Comment:
   Fixed — `MysqlResponse` is now explicitly non-copyable (deleted copy ctor + 
assignment), and `MergeFrom` is a hard `CHECK`-failure instead of a silent 
no-op, so an accidental copy/CopyFrom is caught rather than silently dropping 
parsed replies.



##########
test/mysql/brpc_mysql_connection_type_unittest.cpp:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// ===========================================================================
+// brpc MySQL-client CONNECTION-TYPE BOUNDARY integration test.
+//
+// PROVENANCE / CLEAN-ROOM NOTE
+// ----------------------------
+// This is NOT derived from any upstream MySQL/MariaDB test suite.  It asserts
+// a brpc-ARCHITECTURE boundary: how a MySQL prepared statement (whose
+// server-side handle is connection-scoped) interacts with brpc's
+// CONNECTION_TYPE_SHORT (a fresh TCP connection per request).  The data values
+// are this file's own; no upstream test code or structure was copied.
+//
+// THE BOUNDARY (spec fact, asserted -- not derived from our impl)
+// --------------------------------------------------------------
+// A MySQL prepared statement is created with COM_STMT_PREPARE on ONE TCP
+// connection; the server returns a `stmt_id` that is valid ONLY on that exact
+// connection.  COM_STMT_EXECUTE must therefore run on the SAME connection.
+//
+// CONNECTION_TYPE_SHORT opens a brand-new TCP connection for every request and
+// closes it afterwards, so there is no connection affinity across requests.
+// Consequently an execute under SHORT cannot land on the connection that holds
+// the prepared handle -- the prepare/execute affinity is broken by design.
+//
+//   * PreparedStatementUnderShortMustError (PRIMARY):
+//       build a SHORT channel, prepare "SELECT ? AS v", bind one INT param,
+//       CallMethod.  Must ERROR (cntl.Failed() OR reply(0).is_error()); must
+//       NOT return a correct result set; must NOT crash.  Looped a few times.
+//
+//   * PlainQueryUnderShortMustSucceed (POSITIVE CONTROL):
+//       same SHORT channel; a stateless COM_QUERY "SELECT 7 AS v" must SUCCEED
+//       and return 7.  Proves SHORT is fine for stateless queries; only the
+//       connection-scoped prepared-statement handle breaks under SHORT.

Review Comment:
   Addressed — the file-header comment now describes 
`PreparedStatementUnderShortRePreparesAndSucceeds` (success via transparent 
re-prepare on each new short connection), matching the actual test.



##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,556 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared 
statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。

Review Comment:
   Addressed — the overview now states that prepared statements are supported, 
consistent with the Prepared Statement section.



##########
src/brpc/policy/mysql/mysql_statement.cpp:
##########
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Authors: Yang,Liming ([email protected])
+
+#include <vector>
+#include <gflags/gflags.h>
+#include "brpc/socket.h"
+#include "brpc/policy/mysql/mysql_statement.h"
+
+namespace brpc {
+DEFINE_int32(mysql_statment_map_size,
+             100,
+             "Mysql statement map size, usually equal to max bthread number");

Review Comment:
   Fixed — the flag is spelled `mysql_statement_map_size` in the current 
revision (DEFINE + all uses).



##########
src/brpc/policy/mysql/mysql.h:
##########
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Authors: Yang,Liming ([email protected])
+
+#ifndef BRPC_MYSQL_H
+#define BRPC_MYSQL_H
+
+#include <string>
+#include <vector>
+
+#include "brpc/nonreflectable_message.h"
+#include "brpc/pb_compat.h"
+#include "butil/iobuf.h"
+#include "butil/strings/string_piece.h"
+#include "butil/arena.h"
+#include "brpc/parse_result.h"
+#include "brpc/policy/mysql/mysql_command.h"
+#include "brpc/policy/mysql/mysql_reply.h"
+#include "brpc/policy/mysql/mysql_transaction.h"
+#include "brpc/policy/mysql/mysql_statement.h"
+
+namespace brpc {
+// Request to mysql.
+// Notice that you can pipeline multiple commands in one request and sent
+// them to ONE mysql-server together.
+// Example:
+//   MysqlRequest request;
+//   request.Query("select * from table");
+//   MysqlResponse response;
+//   channel.CallMethod(NULL, &controller, &request, &response, NULL/*done*/);
+//   if (!cntl.Failed()) {
+//       LOG(INFO) << response.reply(0);
+//   }
+
+class MysqlStatementStub {
+public:
+    MysqlStatementStub(MysqlStatement* stmt);
+    MysqlStatement* stmt();
+    butil::IOBuf& execute_data();
+    butil::Status PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id);
+    // prepare statement null mask
+    struct NullMask {
+        NullMask() : area(butil::IOBuf::INVALID_AREA) {}
+        std::vector<uint8_t> mask;
+        butil::IOBuf::Area area;
+    };
+    // prepare statement param types
+    struct ParamTypes {
+        ParamTypes() : area(butil::IOBuf::INVALID_AREA) {}
+        std::vector<uint8_t> types;
+        butil::IOBuf::Area area;
+    };
+    // null mask and param types
+    NullMask& null_mask();
+    ParamTypes& param_types();
+    // save long data
+    void save_long_data(uint16_t param_id, const butil::StringPiece& value);
+
+private:
+    MysqlStatement* _stmt;
+    butil::IOBuf _execute_data;
+    NullMask _null_mask;
+    ParamTypes _param_types;
+    // long data
+    struct LongData {
+        uint16_t param_id;
+        butil::IOBuf long_data;
+    };
+    std::vector<LongData> _long_data;
+};
+
+inline MysqlStatementStub::MysqlStatementStub(MysqlStatement* stmt) : 
_stmt(stmt) {}
+
+inline MysqlStatement* MysqlStatementStub::stmt() {
+    return _stmt;
+}
+
+inline butil::IOBuf& MysqlStatementStub::execute_data() {
+    return _execute_data;
+}
+
+inline MysqlStatementStub::NullMask& MysqlStatementStub::null_mask() {
+    return _null_mask;
+}
+
+inline MysqlStatementStub::ParamTypes& MysqlStatementStub::param_types() {
+    return _param_types;
+}
+
+inline void MysqlStatementStub::save_long_data(uint16_t param_id, const 
butil::StringPiece& value) {
+    LongData d;
+    d.param_id = param_id;
+    d.long_data.append(value.data(), value.size());
+    _long_data.push_back(d);
+}
+
+class MysqlRequest : public NonreflectableMessage<MysqlRequest> {
+public:
+    MysqlRequest();
+    MysqlRequest(const MysqlTransaction* tx);
+    MysqlRequest(MysqlStatement* stmt);
+    MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt);
+    ~MysqlRequest() override;
+    MysqlRequest(const MysqlRequest& from);
+    inline MysqlRequest& operator=(const MysqlRequest& from) {
+        CopyFrom(from);
+        return *this;
+    }
+    void Swap(MysqlRequest* other);
+
+    // Serialize the request into `buf'. Return true on success.
+    bool SerializeTo(butil::IOBuf* buf) const;
+
+    // Protobuf methods.
+    void MergeFrom(const MysqlRequest& from) override;
+    void Clear() override;
+
+    size_t ByteSizeLong() const override;
+    int GetCachedSize() const PB_425_OVERRIDE {
+        return _cached_size_;
+    }
+
+    // call query command
+    bool Query(const butil::StringPiece& command);
+    // add statement params
+    bool AddParam(int8_t p);
+    bool AddParam(uint8_t p);
+    bool AddParam(int16_t p);
+    bool AddParam(uint16_t p);
+    bool AddParam(int32_t p);
+    bool AddParam(uint32_t p);
+    bool AddParam(int64_t p);
+    bool AddParam(uint64_t p);
+    bool AddParam(float p);
+    bool AddParam(double p);
+    bool AddParam(const butil::StringPiece& p);
+
+    // True if previous command failed.
+    bool has_error() const {
+        return _has_error;
+    }
+
+    const MysqlTransaction* get_tx() const {

Review Comment:
   Done — renamed `get_tx()` to `tx()` and updated the callers.



##########
src/brpc/policy/mysql/mysql.h:
##########
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Authors: Yang,Liming ([email protected])
+
+#ifndef BRPC_MYSQL_H
+#define BRPC_MYSQL_H
+
+#include <string>
+#include <vector>
+
+#include "brpc/nonreflectable_message.h"
+#include "brpc/pb_compat.h"
+#include "butil/iobuf.h"
+#include "butil/strings/string_piece.h"
+#include "butil/arena.h"
+#include "brpc/parse_result.h"
+#include "brpc/policy/mysql/mysql_command.h"
+#include "brpc/policy/mysql/mysql_reply.h"
+#include "brpc/policy/mysql/mysql_transaction.h"
+#include "brpc/policy/mysql/mysql_statement.h"
+
+namespace brpc {
+// Request to mysql.
+// Notice that you can pipeline multiple commands in one request and sent
+// them to ONE mysql-server together.
+// Example:
+//   MysqlRequest request;
+//   request.Query("select * from table");
+//   MysqlResponse response;
+//   channel.CallMethod(NULL, &controller, &request, &response, NULL/*done*/);
+//   if (!cntl.Failed()) {
+//       LOG(INFO) << response.reply(0);
+//   }
+
+class MysqlStatementStub {
+public:
+    MysqlStatementStub(MysqlStatement* stmt);
+    MysqlStatement* stmt();
+    butil::IOBuf& execute_data();
+    butil::Status PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id);
+    // prepare statement null mask
+    struct NullMask {
+        NullMask() : area(butil::IOBuf::INVALID_AREA) {}
+        std::vector<uint8_t> mask;
+        butil::IOBuf::Area area;
+    };
+    // prepare statement param types
+    struct ParamTypes {
+        ParamTypes() : area(butil::IOBuf::INVALID_AREA) {}
+        std::vector<uint8_t> types;
+        butil::IOBuf::Area area;
+    };
+    // null mask and param types
+    NullMask& null_mask();
+    ParamTypes& param_types();
+    // save long data
+    void save_long_data(uint16_t param_id, const butil::StringPiece& value);
+
+private:
+    MysqlStatement* _stmt;
+    butil::IOBuf _execute_data;
+    NullMask _null_mask;
+    ParamTypes _param_types;
+    // long data
+    struct LongData {
+        uint16_t param_id;
+        butil::IOBuf long_data;
+    };
+    std::vector<LongData> _long_data;
+};
+
+inline MysqlStatementStub::MysqlStatementStub(MysqlStatement* stmt) : 
_stmt(stmt) {}
+
+inline MysqlStatement* MysqlStatementStub::stmt() {
+    return _stmt;
+}
+
+inline butil::IOBuf& MysqlStatementStub::execute_data() {
+    return _execute_data;
+}
+
+inline MysqlStatementStub::NullMask& MysqlStatementStub::null_mask() {
+    return _null_mask;
+}
+
+inline MysqlStatementStub::ParamTypes& MysqlStatementStub::param_types() {
+    return _param_types;
+}
+
+inline void MysqlStatementStub::save_long_data(uint16_t param_id, const 
butil::StringPiece& value) {
+    LongData d;
+    d.param_id = param_id;
+    d.long_data.append(value.data(), value.size());
+    _long_data.push_back(d);
+}
+
+class MysqlRequest : public NonreflectableMessage<MysqlRequest> {
+public:
+    MysqlRequest();
+    MysqlRequest(const MysqlTransaction* tx);
+    MysqlRequest(MysqlStatement* stmt);
+    MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt);
+    ~MysqlRequest() override;
+    MysqlRequest(const MysqlRequest& from);
+    inline MysqlRequest& operator=(const MysqlRequest& from) {
+        CopyFrom(from);
+        return *this;
+    }
+    void Swap(MysqlRequest* other);
+
+    // Serialize the request into `buf'. Return true on success.
+    bool SerializeTo(butil::IOBuf* buf) const;
+
+    // Protobuf methods.
+    void MergeFrom(const MysqlRequest& from) override;
+    void Clear() override;
+
+    size_t ByteSizeLong() const override;
+    int GetCachedSize() const PB_425_OVERRIDE {
+        return _cached_size_;
+    }
+
+    // call query command
+    bool Query(const butil::StringPiece& command);
+    // add statement params
+    bool AddParam(int8_t p);
+    bool AddParam(uint8_t p);
+    bool AddParam(int16_t p);
+    bool AddParam(uint16_t p);
+    bool AddParam(int32_t p);
+    bool AddParam(uint32_t p);
+    bool AddParam(int64_t p);
+    bool AddParam(uint64_t p);
+    bool AddParam(float p);
+    bool AddParam(double p);
+    bool AddParam(const butil::StringPiece& p);
+
+    // True if previous command failed.
+    bool has_error() const {
+        return _has_error;
+    }
+
+    const MysqlTransaction* get_tx() const {
+        return _tx;
+    }
+
+    MysqlStatementStub* get_stmt() const {

Review Comment:
   Done — renamed `get_stmt()` to `stmt()` and updated the callers.



##########
src/brpc/controller.h:
##########
@@ -915,6 +925,16 @@ friend void 
policy::ProcessThriftRequest(InputMessageBase*);
     // Defined at both sides
     StreamSettings *_remote_stream_settings;
 
+    // Whether/how to reserve the sending socket after the RPC (mysql 
transactions).
+    BindSockAction _bind_sock_action;

Review Comment:
   Done — the `BindSockAction` is now stored in two bits of 
`Controller::_flags` (`FLAGS_BIND_SOCK_RESERVE`/`FLAGS_BIND_SOCK_USE`) via 
`set_bind_sock_action()`/`bind_sock_action()`, instead of a dedicated member. 
No behavior change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to