This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new aefeae30e1d Improve MySQLClient connect and supports recovery in the 
middle of a batch event (#19852)
aefeae30e1d is described below

commit aefeae30e1de331443087739b5e4151048df9fdb
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Aug 4 18:26:04 2022 +0800

    Improve MySQLClient connect and supports recovery in the middle of a batch 
event (#19852)
---
 .../mysql/ingest/GlobalTableMapEventMapping.java   | 42 ++++++++++++++++++++++
 .../mysql/ingest/binlog/BinlogContext.java         |  9 ++---
 .../pipeline/mysql/ingest/client/MySQLClient.java  | 32 +++++++++--------
 .../netty/MySQLBinlogEventPacketDecoder.java       |  6 ++--
 .../mysql/ingest/binlog/BinlogContextTest.java     |  3 +-
 .../mysql/ingest/client/MySQLClientTest.java       |  9 +++--
 .../netty/MySQLBinlogEventPacketDecoderTest.java   |  3 +-
 7 files changed, 78 insertions(+), 26 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
new file mode 100644
index 00000000000..92dd053e5d2
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/GlobalTableMapEventMapping.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.mysql.ingest;
+
+import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Global table map event mapping.
+ * // TODO still save at memory, if restart the Proxy, the data will be lost.
+ */
+public class GlobalTableMapEventMapping {
+    
+    private static final Map<String, Map<Long, 
MySQLBinlogTableMapEventPacket>> TABLE_MAP_EVENT_MAPPING = new 
ConcurrentHashMap<>();
+    
+    /**
+     * Get table map event map by database url.
+     *
+     * @param databaseUrl database url
+     * @return table map event map
+     */
+    public static Map<Long, MySQLBinlogTableMapEventPacket> 
getTableMapEventMap(final String databaseUrl) {
+        return TABLE_MAP_EVENT_MAPPING.computeIfAbsent(databaseUrl, k -> new 
ConcurrentHashMap<>());
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
index 76ec79b56e5..9c64b573293 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContext.java
@@ -18,11 +18,11 @@
 package org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -30,14 +30,15 @@ import java.util.Map;
  * Binlog context.
  */
 @Getter
-@Setter
+@RequiredArgsConstructor
 public final class BinlogContext {
     
+    @Setter
     private String fileName;
     
-    private int checksumLength;
+    private final int checksumLength;
     
-    private Map<Long, MySQLBinlogTableMapEventPacket> tableMap = new 
HashMap<>();
+    private final Map<Long, MySQLBinlogTableMapEventPacket> tableMap;
     
     /**
      * Cache table map event.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 145c9cf33bf..5a7d4474c40 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -32,6 +32,7 @@ import io.netty.util.concurrent.Promise;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.GlobalTableMapEventMapping;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLCommandPacketDecoder;
@@ -51,6 +52,7 @@ import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -191,7 +193,8 @@ public final class MySQLClient {
         responseCallback = null;
         channel.pipeline().remove(MySQLCommandPacketDecoder.class);
         channel.pipeline().remove(MySQLCommandResponseHandler.class);
-        channel.pipeline().addLast(new 
MySQLBinlogEventPacketDecoder(checksumLength));
+        String tableKey = String.join(":", connectInfo.getHost(), 
String.valueOf(connectInfo.getPort()));
+        channel.pipeline().addLast(new 
MySQLBinlogEventPacketDecoder(checksumLength, 
GlobalTableMapEventMapping.getTableMapEventMap(tableKey)));
         channel.pipeline().addLast(new MySQLBinlogEventHandler());
         channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) 
binlogPosition, connectInfo.getServerId(), binlogFileName));
     }
@@ -215,7 +218,7 @@ public final class MySQLClient {
     @SuppressWarnings("unchecked")
     private <T> T waitExpectedResponse(final Class<T> type) {
         try {
-            Object response = responseCallback.get();
+            Object response = responseCallback.get(5, TimeUnit.SECONDS);
             if (null == response) {
                 return null;
             }
@@ -226,7 +229,7 @@ public final class MySQLClient {
                 throw new RuntimeException(((MySQLErrPacket) 
response).getErrorMessage());
             }
             throw new RuntimeException("unexpected response type");
-        } catch (final InterruptedException | ExecutionException ex) {
+        } catch (final InterruptedException | ExecutionException | 
TimeoutException ex) {
             throw new RuntimeException(ex);
         }
     }
@@ -239,11 +242,9 @@ public final class MySQLClient {
             return;
         }
         try {
-            channel.close();
-            // CHECKSTYLE:OFF
-        } catch (final RuntimeException ex) {
-            // CHECKSTYLE:ON
-            log.error("close channel error", ex);
+            channel.close().sync();
+        } catch (final InterruptedException ex) {
+            log.error("close channel interrupted", ex);
         }
     }
     
@@ -286,23 +287,24 @@ public final class MySQLClient {
             if (!running) {
                 return;
             }
-            if (reconnectTimes.get() > 3) {
-                log.warn("exceeds the maximum number of retry times, last 
binlog event:{}", lastBinlogEvent);
-                running = false;
-                return;
-            }
             reconnect();
         }
-        
+    
         @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
             running = false;
             String fileName = null == lastBinlogEvent ? null : 
lastBinlogEvent.getFileName();
             Long position = null == lastBinlogEvent ? null : 
lastBinlogEvent.getPosition();
             log.error("MySQLBinlogEventHandler protocol resolution error, file 
name:{}, position:{}", fileName, position, cause);
+            reconnect();
         }
-        
+    
         private void reconnect() {
+            if (reconnectTimes.get() > 3) {
+                log.warn("exceeds the maximum number of retry times, last 
binlog event:{}", lastBinlogEvent);
+                running = false;
+                return;
+            }
             int retryTimes = reconnectTimes.incrementAndGet();
             log.info("reconnect MySQL client, retry times={}", retryTimes);
             closeChannel();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 6d5549624d1..37a6b79506c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -39,6 +39,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlog
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -49,9 +50,8 @@ public final class MySQLBinlogEventPacketDecoder extends 
ByteToMessageDecoder {
     
     private final BinlogContext binlogContext;
     
-    public MySQLBinlogEventPacketDecoder(final int checksumLength) {
-        binlogContext = new BinlogContext();
-        binlogContext.setChecksumLength(checksumLength);
+    public MySQLBinlogEventPacketDecoder(final int checksumLength, final 
Map<Long, MySQLBinlogTableMapEventPacket> tableMap) {
+        binlogContext = new BinlogContext(checksumLength, tableMap);
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContextTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContextTest.java
index 6faa252f948..4dd9acebc64 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContextTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/binlog/BinlogContextTest.java
@@ -27,6 +27,7 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -49,7 +50,7 @@ public final class BinlogContextTest {
     
     @Before
     public void setUp() {
-        binlogContext = new BinlogContext();
+        binlogContext = new BinlogContext(4, new HashMap<>());
         when(tableMapEventPacket.getSchemaName()).thenReturn(TEST_SCHEMA);
         when(tableMapEventPacket.getTableName()).thenReturn(TEST_TABLE);
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index 1bed2530ff9..f13cfdda984 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.mysql.ingest.client;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.Promise;
@@ -52,14 +53,18 @@ public final class MySQLClientTest {
     @Mock
     private ChannelPipeline pipeline;
     
+    @Mock
+    private ChannelFuture channelFuture;
+    
     private MySQLClient mysqlClient;
     
     @Before
-    public void setUp() {
+    public void setUp() throws InterruptedException {
         mysqlClient = new MySQLClient(new ConnectInfo(1, "host", 3306, 
"username", "password"));
         when(channel.pipeline()).thenReturn(pipeline);
         when(channel.isOpen()).thenReturn(true);
-        when(channel.close()).thenAnswer(invocation -> {
+        when(channel.close()).thenReturn(channelFuture);
+        when(channelFuture.sync()).thenAnswer(invocation -> {
             when(channel.isOpen()).thenReturn(false);
             return null;
         });
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index c9424aaf3dd..6e5160c7f0d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -43,6 +43,7 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -66,7 +67,7 @@ public final class MySQLBinlogEventPacketDecoderTest {
     
     @Before
     public void setUp() throws NoSuchFieldException, IllegalAccessException {
-        binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4);
+        binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4, new 
ConcurrentHashMap<>());
         binlogContext = ReflectionUtil.getFieldValue(binlogEventPacketDecoder, 
"binlogContext", BinlogContext.class);
         
when(channelHandlerContext.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()).thenReturn(StandardCharsets.UTF_8);
         columnDefs = Lists.newArrayList(new 
MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONGLONG), new 
MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONG),

Reply via email to