This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a18fca8006 [Fix][Connector-tdengine] Fix sql exception and 
concurrentmodifyexception when connect to taos and read data
a18fca8006 is described below

commit a18fca80061925ba3a2c9bd152b450dcf3d9308b
Author: Alex Ting <alexti...@hotmail.com>
AuthorDate: Thu Aug 8 11:22:41 2024 +0800

    [Fix][Connector-tdengine] Fix sql exception and concurrentmodifyexception 
when connect to taos and read data
---
 .../tdengine/config/TDengineSourceConfig.java      |   7 +-
 .../seatunnel/tdengine/source/TDengineSource.java  |  56 +++++-----
 .../tdengine/source/TDengineSourceReader.java      |  95 +++++++---------
 .../source/TDengineSourceSplitEnumerator.java      | 122 +++++++++++---------
 .../tdengine/state/TDengineSourceState.java        |  20 ++--
 .../tdengine/source/TDengineSourceReaderTest.java  | 124 +++++++++++++++++++++
 6 files changed, 279 insertions(+), 145 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
index 0908c73387..4eabb754cf 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -30,7 +30,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengine
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
-import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
 
 @Data
@@ -54,7 +53,10 @@ public class TDengineSourceConfig implements Serializable {
 
     public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
         TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
-        tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? 
pluginConfig.getString(URL) : null);
+        tdengineSourceConfig.setUrl(
+                pluginConfig.hasPath(ConfigNames.URL)
+                        ? pluginConfig.getString(ConfigNames.URL)
+                        : null);
         tdengineSourceConfig.setDatabase(
                 pluginConfig.hasPath(DATABASE) ? 
pluginConfig.getString(DATABASE) : null);
         tdengineSourceConfig.setStable(
@@ -69,6 +71,7 @@ public class TDengineSourceConfig implements Serializable {
                 pluginConfig.hasPath(LOWER_BOUND) ? 
pluginConfig.getString(LOWER_BOUND) : null);
         tdengineSourceConfig.setTimezone(
                 pluginConfig.hasPath(TIMEZONE) ? 
pluginConfig.getString(TIMEZONE) : "UTC");
+
         return tdengineSourceConfig;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
index 2f2e6a3f98..e72773781a 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java
@@ -40,6 +40,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTyp
 import org.apache.commons.lang3.ArrayUtils;
 
 import com.google.auto.service.AutoService;
+import com.taosdata.jdbc.TSDBDriver;
 import lombok.SneakyThrows;
 
 import java.sql.Connection;
@@ -49,6 +50,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
@@ -127,42 +129,36 @@ public class TDengineSource
         List<String> fieldNames = new ArrayList<>();
         List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>();
 
-        String jdbcUrl =
-                String.join(
-                        "",
-                        config.getUrl(),
-                        config.getDatabase(),
-                        "?user=",
-                        config.getUsername(),
-                        "&password=",
-                        config.getPassword());
+        String jdbcUrl = String.join("", config.getUrl(), 
config.getDatabase());
+
         // check td driver whether exist and if not, try to register
         checkDriverExist(jdbcUrl);
-        try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
-            try (Statement statement = conn.createStatement()) {
+
+        Properties properties = new Properties();
+        properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername());
+        properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());
+        String metaSQL =
+                String.format(
+                        "select table_name from information_schema.ins_tables 
where db_name = '%s' and stable_name='%s'",
+                        config.getDatabase(), config.getStable());
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, 
properties);
+                Statement statement = conn.createStatement();
                 ResultSet metaResultSet =
                         statement.executeQuery(
-                                "desc " + config.getDatabase() + "." + 
config.getStable());
-                while (metaResultSet.next()) {
-                    if (timestampFieldName == null) {
-                        timestampFieldName = metaResultSet.getString(1);
-                    }
-                    fieldNames.add(metaResultSet.getString(1));
-                    
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
+                                String.format(
+                                        "desc %s.%s", config.getDatabase(), 
config.getStable()));
+                ResultSet subTableNameResultSet = 
statement.executeQuery(metaSQL)) {
+            while (metaResultSet.next()) {
+                if (timestampFieldName == null) {
+                    timestampFieldName = metaResultSet.getString(1);
                 }
+                fieldNames.add(metaResultSet.getString(1));
+                
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
             }
-            try (Statement statement = conn.createStatement()) {
-                String metaSQL =
-                        "select table_name from information_schema.ins_tables 
where db_name = '"
-                                + config.getDatabase()
-                                + "' and stable_name='"
-                                + config.getStable()
-                                + "';";
-                ResultSet subTableNameResultSet = 
statement.executeQuery(metaSQL);
-                while (subTableNameResultSet.next()) {
-                    String subTableName = subTableNameResultSet.getString(1);
-                    subTableNames.add(subTableName);
-                }
+
+            while (subTableNameResultSet.next()) {
+                String subTableName = subTableNameResultSet.getString(1);
+                subTableNames.add(subTableName);
             }
         }
 
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
index 6782f085bd..bb4184702d 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
 
-import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -25,9 +24,6 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
 
-import org.apache.commons.lang3.StringUtils;
-
-import com.google.common.collect.Sets;
 import com.taosdata.jdbc.TSDBDriver;
 import lombok.extern.slf4j.Slf4j;
 
@@ -39,84 +35,76 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
 
 @Slf4j
 public class TDengineSourceReader implements SourceReader<SeaTunnelRow, 
TDengineSourceSplit> {
-
-    private static final long THREAD_WAIT_TIME = 500L;
-
     private final TDengineSourceConfig config;
 
-    private final Set<TDengineSourceSplit> sourceSplits;
+    private final Deque<TDengineSourceSplit> sourceSplits;
 
     private final Context context;
 
     private Connection conn;
 
+    private volatile boolean noMoreSplit;
+
     public TDengineSourceReader(TDengineSourceConfig config, 
SourceReader.Context readerContext) {
         this.config = config;
-        this.sourceSplits = Sets.newHashSet();
+        this.sourceSplits = new ConcurrentLinkedDeque<>();
         this.context = readerContext;
     }
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> collector) throws 
InterruptedException {
-        if (sourceSplits.isEmpty()) {
-            Thread.sleep(THREAD_WAIT_TIME);
-            return;
-        }
         synchronized (collector.getCheckpointLock()) {
-            sourceSplits.forEach(
-                    split -> {
-                        try {
-                            read(split, collector);
-                        } catch (Exception e) {
-                            throw new TDengineConnectorException(
-                                    
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
-                                    "TDengine split read error",
-                                    e);
-                        }
-                    });
-        }
-
-        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            // signal to the source that we have reached the end of the data.
-            log.info("Closed the bounded TDengine source");
-            context.signalNoMoreElement();
+            log.info("polling new split from queue!");
+            TDengineSourceSplit split = sourceSplits.poll();
+            if (Objects.nonNull(split)) {
+                log.info(
+                        "starting run new split {}, query sql: {}!",
+                        split.splitId(),
+                        split.getQuery());
+                try {
+                    read(split, collector);
+                } catch (Exception e) {
+                    throw new TDengineConnectorException(
+                            CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
+                            "TDengine split read error",
+                            e);
+                }
+            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info("Closed the bounded TDengine source");
+                context.signalNoMoreElement();
+            } else {
+                Thread.sleep(1000L);
+            }
         }
     }
 
     @Override
     public void open() {
-        String jdbcUrl =
-                StringUtils.join(
-                        config.getUrl(),
-                        config.getDatabase(),
-                        "?user=",
-                        config.getUsername(),
-                        "&password=",
-                        config.getPassword());
-        Properties connProps = new Properties();
-        // todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true",
-        // there is a exception : Caused by: java.sql.SQLException: can't 
create connection with
-        // server
-        // under docker network env
-        // @bobo (tdengine)
-        connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false");
+        String jdbcUrl = config.getUrl();
+
+        Properties properties = new Properties();
+        properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername());
+        properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());
+
         try {
-            // check td driver whether exist and if not, try to register
             checkDriverExist(jdbcUrl);
-            conn = DriverManager.getConnection(jdbcUrl, connProps);
+            conn = DriverManager.getConnection(jdbcUrl, properties);
         } catch (SQLException e) {
             throw new TDengineConnectorException(
                     CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
-                    "get TDengine connection failed:" + jdbcUrl);
+                    "get TDengine connection failed:" + jdbcUrl,
+                    e);
         }
     }
 
@@ -135,8 +123,8 @@ public class TDengineSourceReader implements 
SourceReader<SeaTunnelRow, TDengine
     }
 
     private void read(TDengineSourceSplit split, Collector<SeaTunnelRow> 
output) throws Exception {
-        try (Statement statement = conn.createStatement()) {
-            final ResultSet resultSet = 
statement.executeQuery(split.getQuery());
+        try (Statement statement = conn.createStatement();
+                ResultSet resultSet = 
statement.executeQuery(split.getQuery())) {
             ResultSetMetaData meta = resultSet.getMetaData();
 
             while (resultSet.next()) {
@@ -151,6 +139,8 @@ public class TDengineSourceReader implements 
SourceReader<SeaTunnelRow, TDengine
     }
 
     private Object convertDataType(Object object) {
+        if (Objects.isNull(object)) return null;
+
         if (Timestamp.class.equals(object.getClass())) {
             return ((Timestamp) object).toLocalDateTime();
         } else if (byte[].class.equals(object.getClass())) {
@@ -171,7 +161,8 @@ public class TDengineSourceReader implements 
SourceReader<SeaTunnelRow, TDengine
 
     @Override
     public void handleNoMoreSplits() {
-        // do nothing
+        log.info("no more split accepted!");
+        noMoreSplit = true;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
index d5787ba557..911a9a6ec1 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java
@@ -17,28 +17,34 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
 
-import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
 
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class TDengineSourceSplitEnumerator
         implements SourceSplitEnumerator<TDengineSourceSplit, 
TDengineSourceState> {
 
     private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
     private final TDengineSourceConfig config;
     private final StableMetadata stableMetadata;
-    private Set<TDengineSourceSplit> pendingSplit = new HashSet<>();
-    private Set<TDengineSourceSplit> assignedSplit = new HashSet<>();
+    private volatile boolean shouldEnumerate;
+    private final Object stateLock = new Object();
+    private final Map<Integer, List<TDengineSourceSplit>> pendingSplits = new 
ConcurrentHashMap<>();
 
     public TDengineSourceSplitEnumerator(
             StableMetadata stableMetadata,
@@ -55,8 +61,10 @@ public class TDengineSourceSplitEnumerator
         this.config = config;
         this.context = context;
         this.stableMetadata = stableMetadata;
+        this.shouldEnumerate = sourceState == null;
         if (sourceState != null) {
-            this.assignedSplit = sourceState.getAssignedSplit();
+            this.shouldEnumerate = sourceState.isShouldEnumerate();
+            this.pendingSplits.putAll(sourceState.getPendingSplits());
         }
     }
 
@@ -69,16 +77,33 @@ public class TDengineSourceSplitEnumerator
 
     @Override
     public void run() {
-        pendingSplit = getAllSplits();
-        assignSplit(context.registeredReaders());
+        Set<Integer> readers = context.registeredReaders();
+        if (shouldEnumerate) {
+            List<TDengineSourceSplit> newSplits = discoverySplits();
+
+            synchronized (stateLock) {
+                addPendingSplit(newSplits);
+                shouldEnumerate = false;
+            }
+
+            assignSplit(readers);
+        }
+
+        log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
+        readers.forEach(context::signalNoMoreSplits);
     }
 
-    /*
-     * each split has one sub table
-     */
-    private Set<TDengineSourceSplit> getAllSplits() {
+    private void addPendingSplit(List<TDengineSourceSplit> newSplits) {
+        int readerCount = context.currentParallelism();
+        for (TDengineSourceSplit split : newSplits) {
+            int ownerReader = getSplitOwner(split.splitId(), readerCount);
+            pendingSplits.computeIfAbsent(ownerReader, r -> new 
ArrayList<>()).add(split);
+        }
+    }
+
+    private List<TDengineSourceSplit> discoverySplits() {
         final String timestampFieldName = 
stableMetadata.getTimestampFieldName();
-        final Set<TDengineSourceSplit> splits = new HashSet<>();
+        final List<TDengineSourceSplit> splits = new ArrayList<>();
         for (String subTableName : stableMetadata.getSubTableNames()) {
             TDengineSourceSplit splitBySubTable =
                     createSplitBySubTable(subTableName, timestampFieldName);
@@ -92,9 +117,11 @@ public class TDengineSourceSplitEnumerator
         String selectFields =
                 Arrays.stream(stableMetadata.getRowType().getFieldNames())
                         .skip(1)
+                        .map(name -> String.format("`%s`", name))
                         .collect(Collectors.joining(","));
         String subTableSQL =
-                "select " + selectFields + " from " + config.getDatabase() + 
"." + subTableName;
+                String.format(
+                        "select %s from %s.`%s`", selectFields, 
config.getDatabase(), subTableName);
         String start = config.getLowerBound();
         String end = config.getUpperBound();
         if (start != null || end != null) {
@@ -116,69 +143,64 @@ public class TDengineSourceSplitEnumerator
 
     @Override
     public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) 
{
+        log.info("Add back splits {} to TDengineSourceSplitEnumerator.", 
splits);
         if (!splits.isEmpty()) {
-            pendingSplit.addAll(splits);
+            addPendingSplit(splits);
             assignSplit(Collections.singletonList(subtaskId));
         }
     }
 
     @Override
     public int currentUnassignedSplitSize() {
-        return pendingSplit.size();
+        return pendingSplits.size();
     }
 
     @Override
     public void registerReader(int subtaskId) {
-        if (!pendingSplit.isEmpty()) {
+        log.info("Register reader {} to TDengineSourceSplitEnumerator.", 
subtaskId);
+        if (!pendingSplits.isEmpty()) {
             assignSplit(Collections.singletonList(subtaskId));
         }
     }
 
-    private void assignSplit(Collection<Integer> taskIDList) {
-        assignedSplit =
-                pendingSplit.stream()
-                        .map(
-                                split -> {
-                                    int splitOwner =
-                                            getSplitOwner(
-                                                    split.splitId(), 
context.currentParallelism());
-                                    if (taskIDList.contains(splitOwner)) {
-                                        context.assignSplit(splitOwner, split);
-                                        return split;
-                                    } else {
-                                        return null;
-                                    }
-                                })
-                        .filter(Objects::nonNull)
-                        .collect(Collectors.toSet());
-        pendingSplit.clear();
+    private void assignSplit(Collection<Integer> readers) {
+        log.info("Assign pendingSplits to readers {}", readers);
+
+        for (int reader : readers) {
+            List<TDengineSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
+                log.info("Assign splits {} to reader {}", assignmentForReader, 
reader);
+                try {
+                    context.assignSplit(reader, assignmentForReader);
+                } catch (Exception e) {
+                    log.error(
+                            "Failed to assign splits {} to reader {}",
+                            assignmentForReader,
+                            reader,
+                            e);
+                    pendingSplits.put(reader, assignmentForReader);
+                }
+            }
+        }
     }
 
     @Override
     public TDengineSourceState snapshotState(long checkpointId) {
-        return new TDengineSourceState(assignedSplit);
-    }
-
-    @Override
-    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-        SourceSplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) {
-        // nothing to do
+        synchronized (stateLock) {
+            return new TDengineSourceState(shouldEnumerate, pendingSplits);
+        }
     }
 
     @Override
-    public void notifyCheckpointAborted(long checkpointId) throws Exception {
-        SourceSplitEnumerator.super.notifyCheckpointAborted(checkpointId);
-    }
+    public void notifyCheckpointComplete(long checkpointId) {}
 
     @Override
     public void close() {}
 
     @Override
     public void handleSplitRequest(int subtaskId) {
-        // nothing to do
+        throw new TDengineConnectorException(
+                CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                String.format("Unsupported handleSplitRequest: %d", 
subtaskId));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
index fc839682a9..4832cd398f 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java
@@ -19,18 +19,16 @@ package 
org.apache.seatunnel.connectors.seatunnel.tdengine.state;
 
 import 
org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 import java.io.Serializable;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
 
+@AllArgsConstructor
+@Getter
 public class TDengineSourceState implements Serializable {
-
-    private final Set<TDengineSourceSplit> assignedSplit;
-
-    public TDengineSourceState(Set<TDengineSourceSplit> assignedSplit) {
-        this.assignedSplit = assignedSplit;
-    }
-
-    public Set<TDengineSourceSplit> getAssignedSplit() {
-        return assignedSplit;
-    }
+    private boolean shouldEnumerate;
+    private final Map<Integer, List<TDengineSourceSplit>> pendingSplits;
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
new file mode 100644
index 0000000000..abd42fefe1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+class TDengineSourceReaderTest {
+    Logger logger;
+    TDengineSourceReader tDengineSourceReader;
+
+    @BeforeEach
+    void setup() {
+        tDengineSourceReader = new TDengineSourceReader(null, null);
+
+        List<TDengineSourceSplit> sourceSplits = new ArrayList<>();
+        int splitCnt = 100;
+        for (int i = 0; i < splitCnt; i++) {
+            sourceSplits.add(new TDengineSourceSplit(Integer.toString(i), 
"select sever_status()"));
+        }
+
+        tDengineSourceReader.addSplits(sourceSplits);
+
+        logger = Logger.getLogger("TDengineSourceReaderTest");
+    }
+
+    @Test
+    void testPoll() throws InterruptedException {
+        TestCollector testCollector = new TestCollector();
+
+        int totalSplitCnt = 150;
+        ThreadPoolExecutor pool =
+                new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>());
+        pool.execute(
+                () -> {
+                    for (int i = 0; i < totalSplitCnt; i++) {
+                        try {
+                            tDengineSourceReader.pollNext(testCollector);
+                            Thread.sleep(new Random().nextInt(5));
+                        } catch (TDengineConnectorException e) {
+                            logger.info("skip create connection!");
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+
+        int newSplitCnt = 50;
+        int threadCnt = 3;
+        for (int i = 0; i < threadCnt; i++) {
+            pool.execute(
+                    () -> {
+                        for (int idx = 0; idx < newSplitCnt; idx++) {
+                            logger.info(
+                                    String.format(
+                                            "%s receive new split",
+                                            Thread.currentThread().getName()));
+                            tDengineSourceReader.addSplits(
+                                    Collections.singletonList(
+                                            new TDengineSourceSplit(
+                                                    String.format(
+                                                            "new_%s",
+                                                            
Thread.currentThread().getName() + idx),
+                                                    "select 
server_status()")));
+                            try {
+                                Thread.sleep(new Random().nextInt(5));
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    });
+        }
+
+        pool.awaitTermination(3, TimeUnit.SECONDS);
+    }
+
+    private static class TestCollector implements Collector<SeaTunnelRow> {
+
+        private final List<SeaTunnelRow> rows = new ArrayList<>();
+
+        public List<SeaTunnelRow> getRows() {
+            return rows;
+        }
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            rows.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return new Object();
+        }
+    }
+}

Reply via email to