This is an automated email from the ASF dual-hosted git repository. menghaoran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 84a4891 Support Transaction in PostgreSQL (#10169) 84a4891 is described below commit 84a48917c4c08df3644c74fe0ba9f6e54d9e7eb6 Author: Liang Zhang <terrym...@163.com> AuthorDate: Sat Apr 24 14:39:57 2021 +0800 Support Transaction in PostgreSQL (#10169) --- .../packet/generic/PostgreSQLReadyForQueryPacket.java | 13 +++++++++++-- .../packet/generic/PostgreSQLReadyForQueryPacketTest.java | 15 +++++++++++++-- .../distsql/parser/core/DistSQLVisitor.java | 3 ++- .../parser/api/DistSQLStatementParserEngineTest.java | 2 +- .../rdl/impl/CreateShardingTableRuleBackendHandler.java | 9 ++++++--- .../impl/CreateShardingTableRuleBackendHandlerTest.java | 9 +++++---- .../authentication/PostgreSQLAuthenticationEngine.java | 2 +- .../command/PostgreSQLCommandExecuteEngine.java | 9 +++++---- .../statement/ddl/impl/AlterSessionStatementAssert.java | 2 +- 9 files changed, 45 insertions(+), 19 deletions(-) diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java index f6c980c..f017877 100644 --- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java +++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.db.protocol.postgresql.packet.generic; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag; import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType; @@ -25,13 +26,21 @@ import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacket /** * Ready for query packet for PostgreSQL. */ +@RequiredArgsConstructor public final class PostgreSQLReadyForQueryPacket implements PostgreSQLIdentifierPacket { - private static final char STATUS = 'I'; + private static final char IN_TRANSACTION = 'T'; + + private static final char NOT_IN_TRANSACTION = 'I'; + + // TODO consider about TRANSACTION_FAILED + private static final char TRANSACTION_FAILED = 'E'; + + private final boolean isInTransaction; @Override public void write(final PostgreSQLPacketPayload payload) { - payload.writeInt1(STATUS); + payload.writeInt1(isInTransaction ? IN_TRANSACTION : NOT_IN_TRANSACTION); } @Override diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java index db4845a..7c0ecb3 100644 --- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java +++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java @@ -29,10 +29,21 @@ import static org.junit.Assert.assertThat; public final class PostgreSQLReadyForQueryPacketTest { @Test - public void assertReadWrite() { + public void assertReadWriteWithInTransaction() { ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(1); PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf); - PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(); + PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(true); + assertThat(packet.getIdentifier(), is(PostgreSQLMessagePacketType.READY_FOR_QUERY)); + packet.write(payload); + assertThat(byteBuf.writerIndex(), is(1)); + assertThat(byteBuf.readByte(), is((byte) 'T')); + } + + @Test + public void assertReadWriteWithNotInTransaction() { + ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(1); + PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf); + PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(false); assertThat(packet.getIdentifier(), is(PostgreSQLMessagePacketType.READY_FOR_QUERY)); packet.write(payload); assertThat(byteBuf.writerIndex(), is(1)); diff --git a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java index ac64ce1..7325a95 100644 --- a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java +++ b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.distsql.parser.core; import com.google.common.base.Joiner; +import org.antlr.v4.runtime.tree.ParseTree; import org.antlr.v4.runtime.tree.TerminalNode; import org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementBaseVisitor; import org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.AddResourceContext; @@ -188,7 +189,7 @@ public final class DistSQLVisitor extends DistSQLStatementBaseVisitor<ASTNode> { @Override public ASTNode visitCreateShardingBroadcastTableRules(final CreateShardingBroadcastTableRulesContext ctx) { CreateShardingBroadcastTableRulesStatement result = new CreateShardingBroadcastTableRulesStatement(); - result.getTables().addAll(ctx.IDENTIFIER().stream().map(each -> each.getText()).collect(Collectors.toList())); + result.getTables().addAll(ctx.IDENTIFIER().stream().map(ParseTree::getText).collect(Collectors.toList())); return result; } diff --git a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/test/java/org/apache/shardingsphere/distsql/parser/api/DistSQLStatementParserEngineTest.java b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/test/java/org/apache/shardingsphere/distsql/parser/api/DistSQLStatementParserEngineTest.java index e2f1389..57e3960 100644 --- a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/test/java/org/apache/shardingsphere/distsql/parser/api/DistSQLStatementParserEngineTest.java +++ b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/test/java/org/apache/shardingsphere/distsql/parser/api/DistSQLStatementParserEngineTest.java @@ -92,7 +92,7 @@ public final class DistSQLStatementParserEngineTest { SQLStatement sqlStatement = engine.parse(RDL_ADD_RESOURCE_MULTIPLE); assertTrue(sqlStatement instanceof AddResourceStatement); assertThat(((AddResourceStatement) sqlStatement).getDataSources().size(), is(2)); - List<DataSourceSegment> dataSourceSegments = new ArrayList(((AddResourceStatement) sqlStatement).getDataSources()); + List<DataSourceSegment> dataSourceSegments = new ArrayList<>(((AddResourceStatement) sqlStatement).getDataSources()); DataSourceSegment dataSourceSegment = dataSourceSegments.get(0); assertThat(dataSourceSegment.getName(), is("ds_0")); assertThat(dataSourceSegment.getHostName(), is("127.0.0.1")); diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandler.java index ce442da..240d62c 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandler.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandler.java @@ -30,12 +30,15 @@ import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader; import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader; import org.apache.shardingsphere.proxy.backend.text.SchemaRequiredBackendHandler; import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration; +import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration; +import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration; import org.apache.shardingsphere.sharding.converter.ShardingRuleStatementConverter; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -83,12 +86,12 @@ public final class CreateShardingTableRuleBackendHandler extends SchemaRequiredB private Collection<String> getLogicTables(final String schemaName) { Optional<ShardingRuleConfiguration> shardingRuleConfiguration = getShardingRuleConfiguration(schemaName); - Collection<String> result = new ArrayList<>(); + Collection<String> result = new LinkedList<>(); if (!shardingRuleConfiguration.isPresent()) { return result; } - result.addAll(shardingRuleConfiguration.get().getTables().stream().map(each -> each.getLogicTable()).collect(Collectors.toList())); - result.addAll(shardingRuleConfiguration.get().getAutoTables().stream().map(each -> each.getLogicTable()).collect(Collectors.toList())); + result.addAll(shardingRuleConfiguration.get().getTables().stream().map(ShardingTableRuleConfiguration::getLogicTable).collect(Collectors.toList())); + result.addAll(shardingRuleConfiguration.get().getAutoTables().stream().map(ShardingAutoTableRuleConfiguration::getLogicTable).collect(Collectors.toList())); return result; } diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandlerTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandlerTest.java index 6dc5e6a..163b0c4 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandlerTest.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandlerTest.java @@ -41,6 +41,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -71,9 +72,9 @@ public final class CreateShardingTableRuleBackendHandlerTest { private CreateShardingTableRuleBackendHandler handler = new CreateShardingTableRuleBackendHandler(sqlStatement, backendConnection); @Before - public void setUp() throws Exception { + public void setUp() { ProxyContext.getInstance().init(metaDataContexts, transactionContexts); - when(metaDataContexts.getAllSchemaNames()).thenReturn(Arrays.asList("test")); + when(metaDataContexts.getAllSchemaNames()).thenReturn(Collections.singletonList("test")); when(metaDataContexts.getMetaData(eq("test"))).thenReturn(shardingSphereMetaData); when(shardingSphereMetaData.getRuleMetaData()).thenReturn(ruleMetaData); } @@ -99,7 +100,7 @@ public final class CreateShardingTableRuleBackendHandlerTest { TableRuleSegment tableRuleSegment = new TableRuleSegment(); tableRuleSegment.setLogicTable("t_order"); when(ruleMetaData.getConfigurations()).thenReturn(buildShardingConfigurations()); - when(sqlStatement.getTables()).thenReturn(Arrays.asList(tableRuleSegment)); + when(sqlStatement.getTables()).thenReturn(Collections.singletonList(tableRuleSegment)); handler.execute("test", sqlStatement); } @@ -107,6 +108,6 @@ public final class CreateShardingTableRuleBackendHandlerTest { ShardingRuleConfiguration configuration = new ShardingRuleConfiguration(); configuration.getTables().add(new ShardingTableRuleConfiguration("t_order")); configuration.getAutoTables().add(new ShardingAutoTableRuleConfiguration("t_order")); - return new ArrayList<>(Arrays.asList(configuration)); + return new ArrayList<>(Collections.singletonList(configuration)); } } diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java index bcb9912..8df85a5 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java @@ -100,7 +100,7 @@ public final class PostgreSQLAuthenticationEngine implements AuthenticationEngin context.write(new PostgreSQLParameterStatusPacket("server_version", "12.3")); context.write(new PostgreSQLParameterStatusPacket("client_encoding", "UTF8")); context.write(new PostgreSQLParameterStatusPacket("server_encoding", "UTF8")); - context.writeAndFlush(new PostgreSQLReadyForQueryPacket()); + context.writeAndFlush(new PostgreSQLReadyForQueryPacket(false)); return AuthenticationResultBuilder.finished(currentAuthResult.getUsername(), "", currentAuthResult.getDatabase()); } } diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java index a6e5971..8fa1feb 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java @@ -30,6 +30,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQ import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLReadyForQueryPacket; import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload; import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey; +import org.apache.shardingsphere.infra.transaction.TransactionHolder; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.proxy.frontend.command.CommandExecuteEngine; @@ -68,7 +69,7 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin @Override public Optional<DatabasePacket<?>> getOtherPacket() { - return Optional.of(new PostgreSQLReadyForQueryPacket()); + return Optional.of(new PostgreSQLReadyForQueryPacket(TransactionHolder.isTransaction())); } @Override @@ -76,11 +77,11 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) throws SQLException { if (ResponseType.QUERY == queryCommandExecutor.getResponseType() && !context.channel().isActive()) { context.write(new PostgreSQLCommandCompletePacket()); - context.write(new PostgreSQLReadyForQueryPacket()); + context.write(new PostgreSQLReadyForQueryPacket(TransactionHolder.isTransaction())); return; } if (ResponseType.UPDATE == queryCommandExecutor.getResponseType()) { - context.write(new PostgreSQLReadyForQueryPacket()); + context.write(new PostgreSQLReadyForQueryPacket(TransactionHolder.isTransaction())); return; } int count = 0; @@ -99,6 +100,6 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin } } context.write(new PostgreSQLCommandCompletePacket()); - context.write(new PostgreSQLReadyForQueryPacket()); + context.write(new PostgreSQLReadyForQueryPacket(TransactionHolder.isTransaction())); } } diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/ddl/impl/AlterSessionStatementAssert.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/ddl/impl/AlterSessionStatementAssert.java index 237a770..7ea7426 100644 --- a/shardingsphere-sql-parser/shardingsphere-sql-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/ddl/impl/AlterSessionStatementAssert.java +++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/ddl/impl/AlterSessionStatementAssert.java @@ -27,7 +27,7 @@ import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain * Alter session statement assert. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public class AlterSessionStatementAssert { +public final class AlterSessionStatementAssert { /** * Assert alter session statement is correct with expected parser result.