This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b6604baa1d9 Support COM_STMT_SEND_LONG_DATA in MySQL Proxy (#18652)
b6604baa1d9 is described below
commit b6604baa1d9ceb50850f8dc60f13b9a6265e3749
Author: 吴伟杰 <[email protected]>
AuthorDate: Fri Jul 1 16:55:43 2022 +0800
Support COM_STMT_SEND_LONG_DATA in MySQL Proxy (#18652)
* Support receiving COM_STMT_SEND_LONG_DATA in MySQL Proxy
* Complete MySQLComStmtExecutePacketTest
* Complete MySQLCommandPacketFactoryTest
* Complete MySQLCommandExecutorFactoryTest
* Add MySQLComStmtSendLongDataExecutorTest
* Add MySQLComStmtSendLongDataPacketTest
* Complete MySQLComStmtResetExecutorTest
---
.../binary/MySQLComStmtSendLongDataPacket.java | 33 ++++++-------
.../binary/execute/MySQLComStmtExecutePacket.java | 8 +++-
.../binary/MySQLComStmtSendLongDataPacketTest.java | 39 ++++++++++++++++
.../execute/MySQLComStmtExecutePacketTest.java | 19 +++++++-
.../mysql/command/MySQLCommandExecutorFactory.java | 4 ++
.../mysql/command/MySQLCommandPacketFactory.java | 3 ++
....java => MySQLComStmtSendLongDataExecutor.java} | 20 ++++----
.../query/binary/MySQLPreparedStatement.java | 4 ++
.../execute/MySQLComStmtExecuteExecutor.java | 3 +-
.../binary/reset/MySQLComStmtResetExecutor.java | 3 +-
.../command/MySQLCommandExecutorFactoryTest.java | 8 ++++
.../command/MySQLCommandPacketFactoryTest.java | 3 +-
.../MySQLComStmtSendLongDataExecutorTest.java | 54 ++++++++++++++++++++++
.../reset/MySQLComStmtResetExecutorTest.java | 18 +++++++-
.../ReactiveMySQLComStmtExecuteExecutor.java | 3 +-
15 files changed, 185 insertions(+), 37 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacket.java
similarity index 52%
copy from
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
copy to
shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacket.java
index 9588539f3a2..8a37d5616e9 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacket.java
@@ -15,32 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+package
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import org.apache.shardingsphere.proxy.backend.session.PreparedStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-
-import java.util.Collections;
-import java.util.List;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacket;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
+import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
/**
- * Binary prepared statement for MySQL.
+ * COM_STMT_SEND_LONG_DATA command packet for MySQL.
*/
-@RequiredArgsConstructor
@Getter
-@Setter
-public final class MySQLPreparedStatement implements PreparedStatement {
+public final class MySQLComStmtSendLongDataPacket extends MySQLCommandPacket {
- private final String sql;
+ private final int statementId;
- private final SQLStatement sqlStatement;
+ private final int paramId;
- private final SQLStatementContext<?> sqlStatementContext;
+ private final byte[] data;
- private List<MySQLPreparedStatementParameterType> parameterTypes =
Collections.emptyList();
+ public MySQLComStmtSendLongDataPacket(final MySQLPacketPayload payload) {
+ super(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA);
+ statementId = payload.readInt4();
+ paramId = payload.readInt2();
+ data = payload.readStringEOFByBytes();
+ }
}
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
index 14c2bb672f4..1ef3221c1c7 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacket.java
@@ -33,6 +33,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
/**
* COM_STMT_EXECUTE command packet for MySQL.
@@ -95,12 +96,17 @@ public final class MySQLComStmtExecutePacket extends
MySQLCommandPacket {
* Read parameter values from packet.
*
* @param parameterTypes parameter type of values
+ * @param longDataIndexes indexes of long data
* @return parameter values
* @throws SQLException SQL exception
*/
- public List<Object> readParameters(final
List<MySQLPreparedStatementParameterType> parameterTypes) throws SQLException {
+ public List<Object> readParameters(final
List<MySQLPreparedStatementParameterType> parameterTypes, final Set<Integer>
longDataIndexes) throws SQLException {
List<Object> result = new ArrayList<>(parameterTypes.size());
for (int parameterIndex = 0; parameterIndex < parameterTypes.size();
parameterIndex++) {
+ if (longDataIndexes.contains(parameterIndex)) {
+ result.add(null);
+ continue;
+ }
MySQLBinaryProtocolValue binaryProtocolValue =
MySQLBinaryProtocolValueFactory.getBinaryProtocolValue(parameterTypes.get(parameterIndex).getColumnType());
result.add(nullBitmap.isNullParameter(parameterIndex) ? null :
binaryProtocolValue.read(payload));
}
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacketTest.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacketTest.java
new file mode 100644
index 00000000000..ab45454597c
--- /dev/null
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLComStmtSendLongDataPacketTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+
+import io.netty.buffer.Unpooled;
+import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class MySQLComStmtSendLongDataPacketTest {
+
+ @Test
+ public void assertNewPacket() {
+ byte[] data = {0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x61, 0x62, 0x63};
+ MySQLComStmtSendLongDataPacket actual = new
MySQLComStmtSendLongDataPacket(new
MySQLPacketPayload(Unpooled.wrappedBuffer(data), StandardCharsets.UTF_8));
+ assertThat(actual.getStatementId(), is(1));
+ assertThat(actual.getParamId(), is(0));
+ assertThat(actual.getData(),
is("abc".getBytes(StandardCharsets.UTF_8)));
+ }
+}
diff --git
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
index bf6677eb23e..73060266a76 100644
---
a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
+++
b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
@@ -57,7 +57,7 @@ public final class MySQLComStmtExecutePacketTest {
assertThat(parameterTypes.size(), is(1));
assertThat(parameterTypes.get(0).getColumnType(),
is(MySQLBinaryColumnType.MYSQL_TYPE_LONG));
assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
- assertThat(actual.readParameters(parameterTypes),
is(Collections.<Object>singletonList(1)));
+ assertThat(actual.readParameters(parameterTypes,
Collections.emptySet()), is(Collections.<Object>singletonList(1)));
}
@Test
@@ -71,6 +71,21 @@ public final class MySQLComStmtExecutePacketTest {
assertThat(parameterTypes.size(), is(1));
assertThat(parameterTypes.get(0).getColumnType(),
is(MySQLBinaryColumnType.MYSQL_TYPE_LONG));
assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
- assertThat(actual.readParameters(parameterTypes),
is(Collections.singletonList(null)));
+ assertThat(actual.readParameters(parameterTypes,
Collections.emptySet()), is(Collections.singletonList(null)));
+ }
+
+ @Test
+ public void assertNewWithLongDataParameter() throws SQLException {
+ byte[] data = {0x02, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x00, 0x01, (byte) 0xfc, 0x00};
+ MySQLPacketPayload payload = new
MySQLPacketPayload(Unpooled.wrappedBuffer(data), StandardCharsets.UTF_8);
+ MySQLComStmtExecutePacket actual = new
MySQLComStmtExecutePacket(payload, 1);
+ assertThat(actual.getStatementId(), is(2));
+ assertThat(actual.getNewParametersBoundFlag(),
is(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST));
+ List<MySQLPreparedStatementParameterType> parameterTypes =
actual.getNewParameterTypes();
+ assertThat(parameterTypes.size(), is(1));
+ assertThat(parameterTypes.get(0).getColumnType(),
is(MySQLBinaryColumnType.MYSQL_TYPE_BLOB));
+ assertThat(parameterTypes.get(0).getUnsignedFlag(), is(0));
+ assertThat(actual.readParameters(parameterTypes,
Collections.singleton(0)), is(Collections.singletonList(null)));
+ assertThat(actual.toString(),
is("MySQLComStmtExecutePacket(statementId=2)"));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
index 2eed6d6fb6a..417cf8e71ce 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -37,6 +38,7 @@ import
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.initdb.MySQL
import
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.ping.MySQLComPingExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.quit.MySQLComQuitExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.generic.MySQLUnsupportedCommandExecutor;
+import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLComStmtSendLongDataExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close.MySQLComStmtCloseExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.execute.MySQLComStmtExecuteExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prepare.MySQLComStmtPrepareExecutor;
@@ -79,6 +81,8 @@ public final class MySQLCommandExecutorFactory {
return new
MySQLComStmtPrepareExecutor((MySQLComStmtPreparePacket) commandPacket,
connectionSession);
case COM_STMT_EXECUTE:
return new
MySQLComStmtExecuteExecutor((MySQLComStmtExecutePacket) commandPacket,
connectionSession);
+ case COM_STMT_SEND_LONG_DATA:
+ return new
MySQLComStmtSendLongDataExecutor((MySQLComStmtSendLongDataPacket)
commandPacket, connectionSession);
case COM_STMT_RESET:
return new MySQLComStmtResetExecutor((MySQLComStmtResetPacket)
commandPacket, connectionSession);
case COM_STMT_CLOSE:
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
index ed514ac4224..27c73731912 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -68,6 +69,8 @@ public final class MySQLCommandPacketFactory {
case COM_STMT_EXECUTE:
MySQLPreparedStatement preparedStatement =
connectionSession.getPreparedStatementRegistry().getPreparedStatement(payload.getByteBuf().getIntLE(payload.getByteBuf().readerIndex()));
return new MySQLComStmtExecutePacket(payload,
preparedStatement.getSqlStatement().getParameterCount());
+ case COM_STMT_SEND_LONG_DATA:
+ return new MySQLComStmtSendLongDataPacket(payload);
case COM_STMT_RESET:
return new MySQLComStmtResetPacket(payload);
case COM_STMT_CLOSE:
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutor.java
similarity index 67%
copy from
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
copy to
shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutor.java
index 17e74c05acc..d5b572eb1ff 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutor.java
@@ -15,32 +15,32 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.reset;
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.reset.MySQLComStmtResetPacket;
-import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import
org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
/**
- * COM_STMT_RESET command executor for MySQL.
+ * COM_STMT_SEND_LONG_DATA command executor for MySQL.
*/
@RequiredArgsConstructor
-public final class MySQLComStmtResetExecutor implements CommandExecutor {
+public final class MySQLComStmtSendLongDataExecutor implements CommandExecutor
{
- private final MySQLComStmtResetPacket packet;
+ private final MySQLComStmtSendLongDataPacket packet;
private final ConnectionSession connectionSession;
@Override
- public Collection<DatabasePacket<?>> execute() {
- // TODO we should implement the stmt reset after supporting
COM_STMT_SEND_LONG_DATA
- return Collections.singleton(new MySQLOKPacket(1,
ServerStatusFlagCalculator.calculateFor(connectionSession)));
+ public Collection<DatabasePacket<?>> execute() throws SQLException {
+ MySQLPreparedStatement preparedStatement =
connectionSession.getPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
+ preparedStatement.getLongData().put(packet.getParamId(),
packet.getData());
+ return Collections.emptyList();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
index 9588539f3a2..1903fba61d1 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
@@ -27,6 +27,8 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Binary prepared statement for MySQL.
@@ -43,4 +45,6 @@ public final class MySQLPreparedStatement implements
PreparedStatement {
private final SQLStatementContext<?> sqlStatementContext;
private List<MySQLPreparedStatementParameterType> parameterTypes =
Collections.emptyList();
+
+ private final Map<Integer, byte[]> longData = new ConcurrentHashMap<>();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index 80c522d6ee0..e6e1eccb8c9 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -90,7 +90,8 @@ public final class MySQLComStmtExecuteExecutor implements
QueryCommandExecutor {
if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
connectionSession.getBackendConnection().handleAutoCommit();
}
- List<Object> parameters =
packet.readParameters(preparedStatement.getParameterTypes());
+ List<Object> parameters =
packet.readParameters(preparedStatement.getParameterTypes(),
preparedStatement.getLongData().keySet());
+ preparedStatement.getLongData().forEach(parameters::set);
SQLStatementContext<?> sqlStatementContext =
preparedStatement.getSqlStatementContext();
if (sqlStatementContext instanceof ParameterAware) {
((ParameterAware) sqlStatementContext).setUpParameters(parameters);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
index 17e74c05acc..cacbdf87e37 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutor.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
import java.util.Collection;
import java.util.Collections;
@@ -40,7 +41,7 @@ public final class MySQLComStmtResetExecutor implements
CommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() {
- // TODO we should implement the stmt reset after supporting
COM_STMT_SEND_LONG_DATA
+
connectionSession.getPreparedStatementRegistry().<MySQLPreparedStatement>getPreparedStatement(packet.getStatementId()).getLongData().clear();
return Collections.singleton(new MySQLOKPacket(1,
ServerStatusFlagCalculator.calculateFor(connectionSession)));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
index 0b2be5f5a3f..882a7822c88 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.MySQLCommandPacketType;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -49,6 +50,7 @@ import
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.initdb.MySQL
import
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.ping.MySQLComPingExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.admin.quit.MySQLComQuitExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.generic.MySQLUnsupportedCommandExecutor;
+import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLComStmtSendLongDataExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close.MySQLComStmtCloseExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.execute.MySQLComStmtExecuteExecutor;
import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prepare.MySQLComStmtPrepareExecutor;
@@ -148,6 +150,12 @@ public final class MySQLCommandExecutorFactoryTest extends
ProxyContextRestorer
instanceOf(MySQLComStmtExecuteExecutor.class));
}
+ @Test
+ public void assertNewInstanceWithComStmtSendLongData() throws SQLException
{
+
assertThat(MySQLCommandExecutorFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
mock(MySQLComStmtSendLongDataPacket.class), connectionSession),
+ instanceOf(MySQLComStmtSendLongDataExecutor.class));
+ }
+
@Test
public void assertNewInstanceWithComStmtReset() throws SQLException {
assertThat(MySQLCommandExecutorFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET,
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
index 276eb511ff9..1d3dfe3142c 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
+import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -191,7 +192,7 @@ public final class MySQLCommandPacketFactoryTest {
@Test
public void assertNewInstanceWithComStmtSendLongDataPacket() throws
SQLException {
-
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
+
assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA,
payload, connectionSession), instanceOf(MySQLComStmtSendLongDataPacket.class));
}
@Test
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
new file mode 100644
index 00000000000..025294a1855
--- /dev/null
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLComStmtSendLongDataExecutorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+
+import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLComStmtSendLongDataPacket;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class MySQLComStmtSendLongDataExecutorTest {
+
+ @Test
+ public void assertExecute() throws SQLException {
+ MySQLComStmtSendLongDataPacket packet =
mock(MySQLComStmtSendLongDataPacket.class);
+ when(packet.getStatementId()).thenReturn(1);
+ when(packet.getParamId()).thenReturn(0);
+ byte[] data = "data".getBytes(StandardCharsets.US_ASCII);
+ when(packet.getData()).thenReturn(data);
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+ when(connectionSession.getPreparedStatementRegistry()).thenReturn(new
PreparedStatementRegistry());
+ MySQLPreparedStatement preparedStatement = new
MySQLPreparedStatement("insert into t (b) values (?)", null, null);
+
connectionSession.getPreparedStatementRegistry().addPreparedStatement(1,
preparedStatement);
+ MySQLComStmtSendLongDataExecutor executor = new
MySQLComStmtSendLongDataExecutor(packet, connectionSession);
+ Collection<DatabasePacket<?>> actual = executor.execute();
+ assertThat(actual, is(Collections.emptyList()));
+ assertThat(preparedStatement.getLongData(),
is(Collections.singletonMap(0, data)));
+ }
+}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
index fea92c17c96..74327c8cf16 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/reset/MySQLComStmtResetExecutorTest.java
@@ -21,6 +21,10 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.r
import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
+import
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
+import
org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
+import org.apache.shardingsphere.transaction.core.TransactionType;
import org.junit.Test;
import java.util.Collection;
@@ -28,16 +32,26 @@ import java.util.Collection;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public final class MySQLComStmtResetExecutorTest {
@Test
public void assertExecute() {
- MySQLComStmtResetExecutor mysqlComStmtResetExecutor = new
MySQLComStmtResetExecutor(mock(MySQLComStmtResetPacket.class),
mock(ConnectionSession.class, RETURNS_DEEP_STUBS));
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+ when(connectionSession.getPreparedStatementRegistry()).thenReturn(new
PreparedStatementRegistry());
+ when(connectionSession.getTransactionStatus()).thenReturn(new
TransactionStatus(TransactionType.LOCAL));
+ MySQLPreparedStatement preparedStatement = new
MySQLPreparedStatement("", null, null);
+ preparedStatement.getLongData().put(0, new byte[0]);
+
connectionSession.getPreparedStatementRegistry().addPreparedStatement(1,
preparedStatement);
+ MySQLComStmtResetPacket packet = mock(MySQLComStmtResetPacket.class);
+ when(packet.getStatementId()).thenReturn(1);
+ MySQLComStmtResetExecutor mysqlComStmtResetExecutor = new
MySQLComStmtResetExecutor(packet, connectionSession);
Collection<DatabasePacket<?>> actual =
mysqlComStmtResetExecutor.execute();
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), instanceOf(MySQLOKPacket.class));
+ assertTrue(preparedStatement.getLongData().isEmpty());
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
index f09d131df27..8d06e751b27 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
@@ -88,7 +88,8 @@ public final class ReactiveMySQLComStmtExecuteExecutor
implements ReactiveComman
@Override
public Future<Collection<DatabasePacket<?>>> executeFuture() {
MySQLPreparedStatement preparedStatement =
updateAndGetPreparedStatement();
- List<Object> parameters =
packet.readParameters(preparedStatement.getParameterTypes());
+ List<Object> parameters =
packet.readParameters(preparedStatement.getParameterTypes(),
preparedStatement.getLongData().keySet());
+ preparedStatement.getLongData().forEach(parameters::set);
SQLStatementContext<?> sqlStatementContext =
preparedStatement.getSqlStatementContext();
if (sqlStatementContext instanceof ParameterAware) {
((ParameterAware) sqlStatementContext).setUpParameters(parameters);