This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b6604baa1d9 Support COM_STMT_SEND_LONG_DATA in MySQL Proxy (#18652)
b6604baa1d9 is described below

commit b6604baa1d9ceb50850f8dc60f13b9a6265e3749
Author: 吴伟杰 <[email protected]>
AuthorDate: Fri Jul 1 16:55:43 2022 +0800

    Support COM_STMT_SEND_LONG_DATA in MySQL Proxy (#18652)
    
    * Support receiving COM_STMT_SEND_LONG_DATA in MySQL Proxy
    
    * Complete MySQLComStmtExecutePacketTest
    
    * Complete MySQLCommandPacketFactoryTest
    
    * Complete MySQLCommandExecutorFactoryTest
    
    * Add MySQLComStmtSendLongDataExecutorTest
    
    * Add MySQLComStmtSendLongDataPacketTest
    
    * Complete MySQLComStmtResetExecutorTest
---
 .../binary/MySQLComStmtSendLongDataPacket.java     | 33 ++++++-------
 .../binary/execute/MySQLComStmtExecutePacket.java  |  8 +++-
 .../binary/MySQLComStmtSendLongDataPacketTest.java | 39 ++++++++++++++++
 .../execute/MySQLComStmtExecutePacketTest.java     | 19 +++++++-
 .../mysql/command/MySQLCommandExecutorFactory.java |  4 ++
 .../mysql/command/MySQLCommandPacketFactory.java   |  3 ++
 ....java => MySQLComStmtSendLongDataExecutor.java} | 20 ++++----
 .../query/binary/MySQLPreparedStatement.java       |  4 ++
 .../execute/MySQLComStmtExecuteExecutor.java       |  3 +-
 .../binary/reset/MySQLComStmtResetExecutor.java    |  3 +-
 .../command/MySQLCommandExecutorFactoryTest.java   |  8 ++++
 .../command/MySQLCommandPacketFactoryTest.java     |  3 +-
 .../MySQLComStmtSendLongDataExecutorTest.java      | 54 ++++++++++++++++++++++
 .../reset/MySQLComStmtResetExecutorTest.java       | 18 +++++++-
 .../ReactiveMySQLComStmtExecuteExecutor.java       |  3 +-
 15 files changed, 185 insertions(+), 37 deletions(-)

diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacket.java
similarity index 52%
copy from 
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
copy to 
shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacket.java
index 9588539f3a2..8a37d5616e9 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacket.java
@@ -15,32 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import org.apache.shardingsphere.proxy.backend.session.PreparedStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-
-import java.util.Collections;
-import java.util.List;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacket;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
+import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 /**
- * Binary prepared statement for MySQL.
+ * COM_STMT_SEND_LONG_DATA command packet for MySQL.
  */
-@RequiredArgsConstructor
 @Getter
-@Setter
-public final class MySQLPreparedStatement implements PreparedStatement {
+public final class MySQLComStmtSendLongDataPacket extends MySQLCommandPacket {
     
-    private final String sql;
+    private final int statementId;
     
-    private final SQLStatement sqlStatement;
+    private final int paramId;
     
-    private final SQLStatementContext<?> sqlStatementContext;
+    private final byte[] data;
     
-    private List<MySQLPreparedStatementParameterType> parameterTypes = 
Collections.emptyList();
+    public MySQLComStmtSendLongDataPacket(final MySQLPacketPayload payload) {
+        super(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA);
+        statementId = payload.readInt4();
+        paramId = payload.readInt2();
+        data = payload.readStringEOFByBytes();
+    }
 }
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
index 14c2bb672f4..1ef3221c1c7 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
@@ -33,6 +33,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 /**
  * COM_STMT_EXECUTE command packet for MySQL.
@@ -95,12 +96,17 @@ public final class MySQLComStmtExecutePacket extends 
MySQLCommandPacket {
      * Read parameter values from packet.
      *
      * @param parameterTypes parameter type of values
+     * @param longDataIndexes indexes of long data
      * @return parameter values
      * @throws SQLException SQL exception
      */
-    public List<Object> readParameters(final 
List<MySQLPreparedStatementParameterType> parameterTypes) throws SQLException {
+    public List<Object> readParameters(final 
List<MySQLPreparedStatementParameterType> parameterTypes, final Set<Integer> 
longDataIndexes) throws SQLException {
         List<Object> result = new ArrayList<>(parameterTypes.size());
         for (int parameterIndex = 0; parameterIndex < parameterTypes.size(); 
parameterIndex++) {
+            if (longDataIndexes.contains(parameterIndex)) {
+                result.add(null);
+                continue;
+            }
             MySQLBinaryProtocolValue binaryProtocolValue = 
MySQLBinaryProtocolValueFactory.getBinaryProtocolValue(parameterTypes.get(parameterIndex).getColumnType());
             result.add(nullBitmap.isNullParameter(parameterIndex) ? null : 
binaryProtocolValue.read(payload));
         }
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacketTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacketTest.java
new file mode 100644
index 00000000000..ab45454597c
--- /dev/null
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacketTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+
+import io.netty.buffer.Unpooled;
+import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class MySQLComStmtSendLongDataPacketTest {
+    
+    @Test
+    public void assertNewPacket() {
+        byte[] data = {0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x61, 0x62, 0x63};
+        MySQLComStmtSendLongDataPacket actual = new 
MySQLComStmtSendLongDataPacket(new 
MySQLPacketPayload(Unpooled.wrappedBuffer(data), StandardCharsets.UTF_8));
+        assertThat(actual.getStatementId(), is(1));
+        assertThat(actual.getParamId(), is(0));
+        assertThat(actual.getData(), 
is("abc".getBytes(StandardCharsets.UTF_8)));
+    }
+}
diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
index bf6677eb23e..73060266a76 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
@@ -57,7 +57,7 @@ public final class MySQLComStmtExecutePacketTest {
         assertThat(parameterTypes.size(), is(1));
         assertThat(parameterTypes.get(0).getColumnType(), 
is(MySQLBinaryColumnType.MYSQL_TYPE_LONG));
         assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
-        assertThat(actual.readParameters(parameterTypes), 
is(Collections.<Object>singletonList(1)));
+        assertThat(actual.readParameters(parameterTypes, 
Collections.emptySet()), is(Collections.<Object>singletonList(1)));
     }
     
     @Test
@@ -71,6 +71,21 @@ public final class MySQLComStmtExecutePacketTest {
         assertThat(parameterTypes.size(), is(1));
         assertThat(parameterTypes.get(0).getColumnType(), 
is(MySQLBinaryColumnType.MYSQL_TYPE_LONG));
         assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
-        assertThat(actual.readParameters(parameterTypes), 
is(Collections.singletonList(null)));
+        assertThat(actual.readParameters(parameterTypes, 
Collections.emptySet()), is(Collections.singletonList(null)));
+    }
+    
+    @Test
+    public void assertNewWithLongDataParameter() throws SQLException {
+        byte[] data = {0x02, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 
0x00, 0x01, (byte) 0xfc, 0x00};
+        MySQLPacketPayload payload = new 
MySQLPacketPayload(Unpooled.wrappedBuffer(data), StandardCharsets.UTF_8);
+        MySQLComStmtExecutePacket actual = new 
MySQLComStmtExecutePacket(payload, 1);
+        assertThat(actual.getStatementId(), is(2));
+        assertThat(actual.getNewParametersBoundFlag(), 
is(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST));
+        List<MySQLPreparedStatementParameterType> parameterTypes = 
actual.getNewParameterTypes();
+        assertThat(parameterTypes.size(), is(1));
+        assertThat(parameterTypes.get(0).getColumnType(), 
is(MySQLBinaryColumnType.MYSQL_TYPE_BLOB));
+        assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
+        assertThat(actual.readParameters(parameterTypes, 
Collections.singleton(0)), is(Collections.singletonList(null)));
+        assertThat(actual.toString(), 
is("MySQLComStmtExecutePacket(statementId=2)"));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
index 2eed6d6fb6a..417cf8e71ce 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -37,6 +38,7 @@ import 
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.initdb.MySQL
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.ping.MySQLComPingExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.quit.MySQLComQuitExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.generic.MySQLUnsupportedCommandExecutor;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLComStmtSendLongDataExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close.MySQLComStmtCloseExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.execute.MySQLComStmtExecuteExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prepare.MySQLComStmtPrepareExecutor;
@@ -79,6 +81,8 @@ public final class MySQLCommandExecutorFactory {
                 return new 
MySQLComStmtPrepareExecutor((MySQLComStmtPreparePacket) commandPacket, 
connectionSession);
             case COM_STMT_EXECUTE:
                 return new 
MySQLComStmtExecuteExecutor((MySQLComStmtExecutePacket) commandPacket, 
connectionSession);
+            case COM_STMT_SEND_LONG_DATA:
+                return new 
MySQLComStmtSendLongDataExecutor((MySQLComStmtSendLongDataPacket) 
commandPacket, connectionSession);
             case COM_STMT_RESET:
                 return new MySQLComStmtResetExecutor((MySQLComStmtResetPacket) 
commandPacket, connectionSession);
             case COM_STMT_CLOSE:
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
index ed514ac4224..27c73731912 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -68,6 +69,8 @@ public final class MySQLCommandPacketFactory {
             case COM_STMT_EXECUTE:
                 MySQLPreparedStatement preparedStatement = 
connectionSession.getPreparedStatementRegistry().getPreparedStatement(payload.getByteBuf().getIntLE(payload.getByteBuf().readerIndex()));
                 return new MySQLComStmtExecutePacket(payload, 
preparedStatement.getSqlStatement().getParameterCount());
+            case COM_STMT_SEND_LONG_DATA:
+                return new MySQLComStmtSendLongDataPacket(payload);
             case COM_STMT_RESET:
                 return new MySQLComStmtResetPacket(payload);
             case COM_STMT_CLOSE:
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutor.java
similarity index 67%
copy from 
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
copy to 
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutor.java
index 17e74c05acc..d5b572eb1ff 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutor.java
@@ -15,32 +15,32 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.reset;
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
 
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.reset.MySQLComStmtResetPacket;
-import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import 
org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
 
+import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
 
 /**
- * COM_STMT_RESET command executor for MySQL.
+ * COM_STMT_SEND_LONG_DATA command executor for MySQL.
  */
 @RequiredArgsConstructor
-public final class MySQLComStmtResetExecutor implements CommandExecutor {
+public final class MySQLComStmtSendLongDataExecutor implements CommandExecutor 
{
     
-    private final MySQLComStmtResetPacket packet;
+    private final MySQLComStmtSendLongDataPacket packet;
     
     private final ConnectionSession connectionSession;
     
     @Override
-    public Collection<DatabasePacket<?>> execute() {
-        // TODO we should implement the stmt reset after supporting 
COM_STMT_SEND_LONG_DATA
-        return Collections.singleton(new MySQLOKPacket(1, 
ServerStatusFlagCalculator.calculateFor(connectionSession)));
+    public Collection<DatabasePacket<?>> execute() throws SQLException {
+        MySQLPreparedStatement preparedStatement = 
connectionSession.getPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
+        preparedStatement.getLongData().put(packet.getParamId(), 
packet.getData());
+        return Collections.emptyList();
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
index 9588539f3a2..1903fba61d1 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
@@ -27,6 +27,8 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Binary prepared statement for MySQL.
@@ -43,4 +45,6 @@ public final class MySQLPreparedStatement implements 
PreparedStatement {
     private final SQLStatementContext<?> sqlStatementContext;
     
     private List<MySQLPreparedStatementParameterType> parameterTypes = 
Collections.emptyList();
+    
+    private final Map<Integer, byte[]> longData = new ConcurrentHashMap<>();
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index 80c522d6ee0..e6e1eccb8c9 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -90,7 +90,8 @@ public final class MySQLComStmtExecuteExecutor implements 
QueryCommandExecutor {
         if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
             connectionSession.getBackendConnection().handleAutoCommit();
         }
-        List<Object> parameters = 
packet.readParameters(preparedStatement.getParameterTypes());
+        List<Object> parameters = 
packet.readParameters(preparedStatement.getParameterTypes(), 
preparedStatement.getLongData().keySet());
+        preparedStatement.getLongData().forEach(parameters::set);
         SQLStatementContext<?> sqlStatementContext = 
preparedStatement.getSqlStatementContext();
         if (sqlStatementContext instanceof ParameterAware) {
             ((ParameterAware) sqlStatementContext).setUpParameters(parameters);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
index 17e74c05acc..cacbdf87e37 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -40,7 +41,7 @@ public final class MySQLComStmtResetExecutor implements 
CommandExecutor {
     
     @Override
     public Collection<DatabasePacket<?>> execute() {
-        // TODO we should implement the stmt reset after supporting 
COM_STMT_SEND_LONG_DATA
+        
connectionSession.getPreparedStatementRegistry().<MySQLPreparedStatement>getPreparedStatement(packet.getStatementId()).getLongData().clear();
         return Collections.singleton(new MySQLOKPacket(1, 
ServerStatusFlagCalculator.calculateFor(connectionSession)));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
index 0b2be5f5a3f..882a7822c88 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -49,6 +50,7 @@ import 
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.initdb.MySQL
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.ping.MySQLComPingExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.quit.MySQLComQuitExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.generic.MySQLUnsupportedCommandExecutor;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLComStmtSendLongDataExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close.MySQLComStmtCloseExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.execute.MySQLComStmtExecuteExecutor;
 import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prepare.MySQLComStmtPrepareExecutor;
@@ -148,6 +150,12 @@ public final class MySQLCommandExecutorFactoryTest extends 
ProxyContextRestorer
                 instanceOf(MySQLComStmtExecuteExecutor.class));
     }
     
+    @Test
+    public void assertNewInstanceWithComStmtSendLongData() throws SQLException 
{
+        
assertThat(MySQLCommandExecutorFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
 mock(MySQLComStmtSendLongDataPacket.class), connectionSession),
+                instanceOf(MySQLComStmtSendLongDataExecutor.class));
+    }
+    
     @Test
     public void assertNewInstanceWithComStmtReset() throws SQLException {
         
assertThat(MySQLCommandExecutorFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET,
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
index 276eb511ff9..1d3dfe3142c 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -191,7 +192,7 @@ public final class MySQLCommandPacketFactoryTest {
     
     @Test
     public void assertNewInstanceWithComStmtSendLongDataPacket() throws 
SQLException {
-        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
 payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
+        
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
 payload, connectionSession), instanceOf(MySQLComStmtSendLongDataPacket.class));
     }
     
     @Test
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
new file mode 100644
index 00000000000..025294a1855
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class MySQLComStmtSendLongDataExecutorTest {
+    
+    @Test
+    public void assertExecute() throws SQLException {
+        MySQLComStmtSendLongDataPacket packet = 
mock(MySQLComStmtSendLongDataPacket.class);
+        when(packet.getStatementId()).thenReturn(1);
+        when(packet.getParamId()).thenReturn(0);
+        byte[] data = "data".getBytes(StandardCharsets.US_ASCII);
+        when(packet.getData()).thenReturn(data);
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        when(connectionSession.getPreparedStatementRegistry()).thenReturn(new 
PreparedStatementRegistry());
+        MySQLPreparedStatement preparedStatement = new 
MySQLPreparedStatement("insert into t (b) values (?)", null, null);
+        
connectionSession.getPreparedStatementRegistry().addPreparedStatement(1, 
preparedStatement);
+        MySQLComStmtSendLongDataExecutor executor = new 
MySQLComStmtSendLongDataExecutor(packet, connectionSession);
+        Collection<DatabasePacket<?>> actual = executor.execute();
+        assertThat(actual, is(Collections.emptyList()));
+        assertThat(preparedStatement.getLongData(), 
is(Collections.singletonMap(0, data)));
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
index fea92c17c96..74327c8cf16 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
@@ -21,6 +21,10 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.r
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
+import 
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
+import 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
+import org.apache.shardingsphere.transaction.core.TransactionType;
 import org.junit.Test;
 
 import java.util.Collection;
@@ -28,16 +32,26 @@ import java.util.Collection;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public final class MySQLComStmtResetExecutorTest {
     
     @Test
     public void assertExecute() {
-        MySQLComStmtResetExecutor mysqlComStmtResetExecutor = new 
MySQLComStmtResetExecutor(mock(MySQLComStmtResetPacket.class), 
mock(ConnectionSession.class, RETURNS_DEEP_STUBS));
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        when(connectionSession.getPreparedStatementRegistry()).thenReturn(new 
PreparedStatementRegistry());
+        when(connectionSession.getTransactionStatus()).thenReturn(new 
TransactionStatus(TransactionType.LOCAL));
+        MySQLPreparedStatement preparedStatement = new 
MySQLPreparedStatement("", null, null);
+        preparedStatement.getLongData().put(0, new byte[0]);
+        
connectionSession.getPreparedStatementRegistry().addPreparedStatement(1, 
preparedStatement);
+        MySQLComStmtResetPacket packet = mock(MySQLComStmtResetPacket.class);
+        when(packet.getStatementId()).thenReturn(1);
+        MySQLComStmtResetExecutor mysqlComStmtResetExecutor = new 
MySQLComStmtResetExecutor(packet, connectionSession);
         Collection<DatabasePacket<?>> actual = 
mysqlComStmtResetExecutor.execute();
         assertThat(actual.size(), is(1));
         assertThat(actual.iterator().next(), instanceOf(MySQLOKPacket.class));
+        assertTrue(preparedStatement.getLongData().isEmpty());
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
index f09d131df27..8d06e751b27 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
@@ -88,7 +88,8 @@ public final class ReactiveMySQLComStmtExecuteExecutor 
implements ReactiveComman
     @Override
     public Future<Collection<DatabasePacket<?>>> executeFuture() {
         MySQLPreparedStatement preparedStatement = 
updateAndGetPreparedStatement();
-        List<Object> parameters = 
packet.readParameters(preparedStatement.getParameterTypes());
+        List<Object> parameters = 
packet.readParameters(preparedStatement.getParameterTypes(), 
preparedStatement.getLongData().keySet());
+        preparedStatement.getLongData().forEach(parameters::set);
         SQLStatementContext<?> sqlStatementContext = 
preparedStatement.getSqlStatementContext();
         if (sqlStatementContext instanceof ParameterAware) {
             ((ParameterAware) sqlStatementContext).setUpParameters(parameters);

Reply via email to