This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 9481645 mysql client add reconnect function where active is inactive (#6323) 9481645 is described below commit 9481645cbed4411407586e7f63190ca8e98cd446 Author: 邱鹿 Lucas <lucas...@163.com> AuthorDate: Fri Jul 10 19:03:58 2020 +0800 mysql client add reconnect function where active is inactive (#6323) Co-authored-by: qiulu3 <Lucas209910> --- .../scaling/mysql/MySQLBinlogDumper.java | 3 +- .../scaling/mysql/client/ConnectInfo.java | 36 ++++++++++ .../scaling/mysql/client/MySQLClient.java | 80 +++++++++++++--------- .../scaling/mysql/client/MySQLClientTest.java | 7 +- 4 files changed, 92 insertions(+), 34 deletions(-) diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java index f1fc7b8..b1bb0e4 100755 --- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java +++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java @@ -32,6 +32,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Placeholde import org.apache.shardingsphere.scaling.core.execute.executor.record.Record; import org.apache.shardingsphere.scaling.core.metadata.JdbcUri; import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager; +import org.apache.shardingsphere.scaling.mysql.client.ConnectInfo; import org.apache.shardingsphere.scaling.mysql.client.MySQLClient; import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent; import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractRowsEvent; @@ -83,7 +84,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp public void dump(final Channel channel) { JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration(); final JdbcUri uri = new JdbcUri(jdbcDataSourceConfiguration.getJdbcUrl()); - MySQLClient client = new MySQLClient(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword()); + MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword())); client.connect(); client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition()); while (isRunning()) { diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java new file mode 100644 index 0000000..6e689dd --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/ConnectInfo.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.mysql.client; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter +public class ConnectInfo { + + private final int serverId; + + private final String host; + + private final int port; + + private final String username; + + private final String password; +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java index d483e08..87e24b8 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java +++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClient.java @@ -17,18 +17,6 @@ package org.apache.shardingsphere.scaling.mysql.client; -import org.apache.shardingsphere.db.protocol.codec.PacketCodec; -import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine; -import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket; -import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket; -import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket; -import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket; -import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket; -import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLNegotiateHandler; -import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLBinlogEventPacketDecoder; -import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLCommandPacketDecoder; -import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent; - import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -43,6 +31,17 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Promise; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.db.protocol.codec.PacketCodec; +import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine; +import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket; +import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket; +import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket; +import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket; +import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket; +import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent; +import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLBinlogEventPacketDecoder; +import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLCommandPacketDecoder; +import org.apache.shardingsphere.scaling.mysql.client.netty.MySQLNegotiateHandler; import java.net.InetSocketAddress; import java.util.concurrent.ArrayBlockingQueue; @@ -56,17 +55,9 @@ import java.util.concurrent.TimeUnit; @Slf4j public final class MySQLClient { - private final int serverId; - - private final String host; - - private final int port; + private final ConnectInfo connectInfo; - private final String username; - - private final String password; - - private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); + private EventLoopGroup eventLoopGroup; private Channel channel; @@ -80,21 +71,21 @@ public final class MySQLClient { * Connect to MySQL. */ public synchronized void connect() { + eventLoopGroup = new NioEventLoopGroup(1); responseCallback = new DefaultPromise<>(eventLoopGroup.next()); channel = new Bootstrap() .group(eventLoopGroup) .channel(NioSocketChannel.class) + .option(ChannelOption.AUTO_READ, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(final SocketChannel socketChannel) { socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine())); socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder()); - socketChannel.pipeline().addLast(new MySQLNegotiateHandler(username, password, responseCallback)); + socketChannel.pipeline().addLast(new MySQLNegotiateHandler(connectInfo.getUsername(), connectInfo.getPassword(), responseCallback)); socketChannel.pipeline().addLast(new MySQLCommandResponseHandler()); } - }) - .option(ChannelOption.AUTO_READ, true) - .connect(host, port).channel(); + }).connect(connectInfo.getHost(), connectInfo.getPort()).channel(); serverInfo = waitExpectedResponse(ServerInfo.class); } @@ -158,7 +149,8 @@ public final class MySQLClient { private void registerSlave() { responseCallback = new DefaultPromise<>(eventLoopGroup.next()); InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); - MySQLComRegisterSlaveCommandPacket registerSlaveCommandPacket = new MySQLComRegisterSlaveCommandPacket(serverId, localAddress.getHostName(), username, password, localAddress.getPort()); + MySQLComRegisterSlaveCommandPacket registerSlaveCommandPacket = new MySQLComRegisterSlaveCommandPacket( + connectInfo.getServerId(), localAddress.getHostName(), connectInfo.getUsername(), connectInfo.getPassword(), localAddress.getPort()); channel.writeAndFlush(registerSlaveCommandPacket); waitExpectedResponse(MySQLOKPacket.class); } @@ -185,9 +177,9 @@ public final class MySQLClient { channel.pipeline().remove(MySQLCommandResponseHandler.class); channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder(checksumLength)); channel.pipeline().addLast(new MySQLBinlogEventHandler()); - channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, serverId, binlogFileName)); + channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName)); } - + /** * Poll binlog event. * @@ -220,7 +212,7 @@ public final class MySQLClient { } } - class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter { + private class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { @@ -238,18 +230,42 @@ public final class MySQLClient { } } - class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter { + private class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter { + + private AbstractBinlogEvent lastBinlogEvent; @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { if (msg instanceof AbstractBinlogEvent) { - blockingEventQueue.put((AbstractBinlogEvent) msg); + lastBinlogEvent = (AbstractBinlogEvent) msg; + blockingEventQueue.put(lastBinlogEvent); } } @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + log.warn("channel inactive"); + reconnect(); + } + + @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { log.error("protocol resolution error", cause); + reconnect(); + } + + private void reconnect() { + log.info("reconnect mysql client."); + closeOldChannel(); + connect(); + subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition()); + } + + private void closeOldChannel() { + try { + channel.closeFuture().sync(); + } catch (InterruptedException ignored) { + } } } } diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java index 1bb7b84..6aa0322 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java +++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java @@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.mysql.client; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.Promise; import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket; @@ -56,7 +57,7 @@ public final class MySQLClientTest { @Before public void setUp() { - mysqlClient = new MySQLClient(1, "host", 3306, "username", "password"); + mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306, "username", "password")); when(channel.pipeline()).thenReturn(pipeline); inetSocketAddress = new InetSocketAddress("host", 3306); when(channel.localAddress()).thenReturn(inetSocketAddress); @@ -75,6 +76,7 @@ public final class MySQLClientTest { public void assertExecute() throws NoSuchFieldException, IllegalAccessException { mockChannelResponse(new MySQLOKPacket(0)); ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel); + ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1)); assertTrue(mysqlClient.execute("")); verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class)); } @@ -85,6 +87,7 @@ public final class MySQLClientTest { ReflectionUtil.setFieldValueToClass(expected, "affectedRows", 10); mockChannelResponse(expected); ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel); + ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1)); assertThat(mysqlClient.executeUpdate(""), is(10)); verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class)); } @@ -94,6 +97,7 @@ public final class MySQLClientTest { InternalResultSet expected = new InternalResultSet(null); mockChannelResponse(expected); ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel); + ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1)); assertThat(mysqlClient.executeQuery(""), is(expected)); verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class)); } @@ -104,6 +108,7 @@ public final class MySQLClientTest { serverInfo.setServerVersion(new ServerVersion("5.5.0-log")); ReflectionUtil.setFieldValueToClass(mysqlClient, "serverInfo", serverInfo); ReflectionUtil.setFieldValueToClass(mysqlClient, "channel", channel); + ReflectionUtil.setFieldValueToClass(mysqlClient, "eventLoopGroup", new NioEventLoopGroup(1)); mockChannelResponse(new MySQLOKPacket(0)); mysqlClient.subscribe("", 4L); verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComRegisterSlaveCommandPacket.class));