This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new a18fca8006 [Fix][Connector-tdengine] Fix sql exception and concurrentmodifyexception when connect to taos and read data a18fca8006 is described below commit a18fca80061925ba3a2c9bd152b450dcf3d9308b Author: Alex Ting <alexti...@hotmail.com> AuthorDate: Thu Aug 8 11:22:41 2024 +0800 [Fix][Connector-tdengine] Fix sql exception and concurrentmodifyexception when connect to taos and read data --- .../tdengine/config/TDengineSourceConfig.java | 7 +- .../seatunnel/tdengine/source/TDengineSource.java | 56 +++++----- .../tdengine/source/TDengineSourceReader.java | 95 +++++++--------- .../source/TDengineSourceSplitEnumerator.java | 122 +++++++++++--------- .../tdengine/state/TDengineSourceState.java | 20 ++-- .../tdengine/source/TDengineSourceReaderTest.java | 124 +++++++++++++++++++++ 6 files changed, 279 insertions(+), 145 deletions(-) diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java index 0908c73387..4eabb754cf 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java @@ -30,7 +30,6 @@ import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengine import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND; -import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME; @Data @@ -54,7 +53,10 @@ public class TDengineSourceConfig implements Serializable { public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) { TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig(); - tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? pluginConfig.getString(URL) : null); + tdengineSourceConfig.setUrl( + pluginConfig.hasPath(ConfigNames.URL) + ? pluginConfig.getString(ConfigNames.URL) + : null); tdengineSourceConfig.setDatabase( pluginConfig.hasPath(DATABASE) ? pluginConfig.getString(DATABASE) : null); tdengineSourceConfig.setStable( @@ -69,6 +71,7 @@ public class TDengineSourceConfig implements Serializable { pluginConfig.hasPath(LOWER_BOUND) ? pluginConfig.getString(LOWER_BOUND) : null); tdengineSourceConfig.setTimezone( pluginConfig.hasPath(TIMEZONE) ? pluginConfig.getString(TIMEZONE) : "UTC"); + return tdengineSourceConfig; } diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java index 2f2e6a3f98..e72773781a 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java @@ -40,6 +40,7 @@ import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTyp import org.apache.commons.lang3.ArrayUtils; import com.google.auto.service.AutoService; +import com.taosdata.jdbc.TSDBDriver; import lombok.SneakyThrows; import java.sql.Connection; @@ -49,6 +50,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD; @@ -127,42 +129,36 @@ public class TDengineSource List<String> fieldNames = new ArrayList<>(); List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>(); - String jdbcUrl = - String.join( - "", - config.getUrl(), - config.getDatabase(), - "?user=", - config.getUsername(), - "&password=", - config.getPassword()); + String jdbcUrl = String.join("", config.getUrl(), config.getDatabase()); + // check td driver whether exist and if not, try to register checkDriverExist(jdbcUrl); - try (Connection conn = DriverManager.getConnection(jdbcUrl)) { - try (Statement statement = conn.createStatement()) { + + Properties properties = new Properties(); + properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername()); + properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword()); + String metaSQL = + String.format( + "select table_name from information_schema.ins_tables where db_name = '%s' and stable_name='%s'", + config.getDatabase(), config.getStable()); + try (Connection conn = DriverManager.getConnection(jdbcUrl, properties); + Statement statement = conn.createStatement(); ResultSet metaResultSet = statement.executeQuery( - "desc " + config.getDatabase() + "." + config.getStable()); - while (metaResultSet.next()) { - if (timestampFieldName == null) { - timestampFieldName = metaResultSet.getString(1); - } - fieldNames.add(metaResultSet.getString(1)); - fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2))); + String.format( + "desc %s.%s", config.getDatabase(), config.getStable())); + ResultSet subTableNameResultSet = statement.executeQuery(metaSQL)) { + while (metaResultSet.next()) { + if (timestampFieldName == null) { + timestampFieldName = metaResultSet.getString(1); } + fieldNames.add(metaResultSet.getString(1)); + fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2))); } - try (Statement statement = conn.createStatement()) { - String metaSQL = - "select table_name from information_schema.ins_tables where db_name = '" - + config.getDatabase() - + "' and stable_name='" - + config.getStable() - + "';"; - ResultSet subTableNameResultSet = statement.executeQuery(metaSQL); - while (subTableNameResultSet.next()) { - String subTableName = subTableNameResultSet.getString(1); - subTableNames.add(subTableName); - } + + while (subTableNameResultSet.next()) { + String subTableName = subTableNameResultSet.getString(1); + subTableNames.add(subTableName); } } diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java index 6782f085bd..bb4184702d 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.tdengine.source; -import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -25,9 +24,6 @@ import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; -import org.apache.commons.lang3.StringUtils; - -import com.google.common.collect.Sets; import com.taosdata.jdbc.TSDBDriver; import lombok.extern.slf4j.Slf4j; @@ -39,84 +35,76 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.Objects; import java.util.Properties; -import java.util.Set; +import java.util.concurrent.ConcurrentLinkedDeque; import static org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist; @Slf4j public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> { - - private static final long THREAD_WAIT_TIME = 500L; - private final TDengineSourceConfig config; - private final Set<TDengineSourceSplit> sourceSplits; + private final Deque<TDengineSourceSplit> sourceSplits; private final Context context; private Connection conn; + private volatile boolean noMoreSplit; + public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) { this.config = config; - this.sourceSplits = Sets.newHashSet(); + this.sourceSplits = new ConcurrentLinkedDeque<>(); this.context = readerContext; } @Override public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException { - if (sourceSplits.isEmpty()) { - Thread.sleep(THREAD_WAIT_TIME); - return; - } synchronized (collector.getCheckpointLock()) { - sourceSplits.forEach( - split -> { - try { - read(split, collector); - } catch (Exception e) { - throw new TDengineConnectorException( - CommonErrorCodeDeprecated.READER_OPERATION_FAILED, - "TDengine split read error", - e); - } - }); - } - - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { - // signal to the source that we have reached the end of the data. - log.info("Closed the bounded TDengine source"); - context.signalNoMoreElement(); + log.info("polling new split from queue!"); + TDengineSourceSplit split = sourceSplits.poll(); + if (Objects.nonNull(split)) { + log.info( + "starting run new split {}, query sql: {}!", + split.splitId(), + split.getQuery()); + try { + read(split, collector); + } catch (Exception e) { + throw new TDengineConnectorException( + CommonErrorCodeDeprecated.READER_OPERATION_FAILED, + "TDengine split read error", + e); + } + } else if (noMoreSplit && sourceSplits.isEmpty()) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded TDengine source"); + context.signalNoMoreElement(); + } else { + Thread.sleep(1000L); + } } } @Override public void open() { - String jdbcUrl = - StringUtils.join( - config.getUrl(), - config.getDatabase(), - "?user=", - config.getUsername(), - "&password=", - config.getPassword()); - Properties connProps = new Properties(); - // todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true", - // there is a exception : Caused by: java.sql.SQLException: can't create connection with - // server - // under docker network env - // @bobo (tdengine) - connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false"); + String jdbcUrl = config.getUrl(); + + Properties properties = new Properties(); + properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername()); + properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword()); + try { - // check td driver whether exist and if not, try to register checkDriverExist(jdbcUrl); - conn = DriverManager.getConnection(jdbcUrl, connProps); + conn = DriverManager.getConnection(jdbcUrl, properties); } catch (SQLException e) { throw new TDengineConnectorException( CommonErrorCodeDeprecated.READER_OPERATION_FAILED, - "get TDengine connection failed:" + jdbcUrl); + "get TDengine connection failed:" + jdbcUrl, + e); } } @@ -135,8 +123,8 @@ public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengine } private void read(TDengineSourceSplit split, Collector<SeaTunnelRow> output) throws Exception { - try (Statement statement = conn.createStatement()) { - final ResultSet resultSet = statement.executeQuery(split.getQuery()); + try (Statement statement = conn.createStatement(); + ResultSet resultSet = statement.executeQuery(split.getQuery())) { ResultSetMetaData meta = resultSet.getMetaData(); while (resultSet.next()) { @@ -151,6 +139,8 @@ public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengine } private Object convertDataType(Object object) { + if (Objects.isNull(object)) return null; + if (Timestamp.class.equals(object.getClass())) { return ((Timestamp) object).toLocalDateTime(); } else if (byte[].class.equals(object.getClass())) { @@ -171,7 +161,8 @@ public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengine @Override public void handleNoMoreSplits() { - // do nothing + log.info("no more split accepted!"); + noMoreSplit = true; } @Override diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java index d5787ba557..911a9a6ec1 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java @@ -17,28 +17,34 @@ package org.apache.seatunnel.connectors.seatunnel.tdengine.source; -import org.apache.seatunnel.api.source.SourceEvent; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Objects; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +@Slf4j public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> { private final SourceSplitEnumerator.Context<TDengineSourceSplit> context; private final TDengineSourceConfig config; private final StableMetadata stableMetadata; - private Set<TDengineSourceSplit> pendingSplit = new HashSet<>(); - private Set<TDengineSourceSplit> assignedSplit = new HashSet<>(); + private volatile boolean shouldEnumerate; + private final Object stateLock = new Object(); + private final Map<Integer, List<TDengineSourceSplit>> pendingSplits = new ConcurrentHashMap<>(); public TDengineSourceSplitEnumerator( StableMetadata stableMetadata, @@ -55,8 +61,10 @@ public class TDengineSourceSplitEnumerator this.config = config; this.context = context; this.stableMetadata = stableMetadata; + this.shouldEnumerate = sourceState == null; if (sourceState != null) { - this.assignedSplit = sourceState.getAssignedSplit(); + this.shouldEnumerate = sourceState.isShouldEnumerate(); + this.pendingSplits.putAll(sourceState.getPendingSplits()); } } @@ -69,16 +77,33 @@ public class TDengineSourceSplitEnumerator @Override public void run() { - pendingSplit = getAllSplits(); - assignSplit(context.registeredReaders()); + Set<Integer> readers = context.registeredReaders(); + if (shouldEnumerate) { + List<TDengineSourceSplit> newSplits = discoverySplits(); + + synchronized (stateLock) { + addPendingSplit(newSplits); + shouldEnumerate = false; + } + + assignSplit(readers); + } + + log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); } - /* - * each split has one sub table - */ - private Set<TDengineSourceSplit> getAllSplits() { + private void addPendingSplit(List<TDengineSourceSplit> newSplits) { + int readerCount = context.currentParallelism(); + for (TDengineSourceSplit split : newSplits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + + private List<TDengineSourceSplit> discoverySplits() { final String timestampFieldName = stableMetadata.getTimestampFieldName(); - final Set<TDengineSourceSplit> splits = new HashSet<>(); + final List<TDengineSourceSplit> splits = new ArrayList<>(); for (String subTableName : stableMetadata.getSubTableNames()) { TDengineSourceSplit splitBySubTable = createSplitBySubTable(subTableName, timestampFieldName); @@ -92,9 +117,11 @@ public class TDengineSourceSplitEnumerator String selectFields = Arrays.stream(stableMetadata.getRowType().getFieldNames()) .skip(1) + .map(name -> String.format("`%s`", name)) .collect(Collectors.joining(",")); String subTableSQL = - "select " + selectFields + " from " + config.getDatabase() + "." + subTableName; + String.format( + "select %s from %s.`%s`", selectFields, config.getDatabase(), subTableName); String start = config.getLowerBound(); String end = config.getUpperBound(); if (start != null || end != null) { @@ -116,69 +143,64 @@ public class TDengineSourceSplitEnumerator @Override public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) { + log.info("Add back splits {} to TDengineSourceSplitEnumerator.", splits); if (!splits.isEmpty()) { - pendingSplit.addAll(splits); + addPendingSplit(splits); assignSplit(Collections.singletonList(subtaskId)); } } @Override public int currentUnassignedSplitSize() { - return pendingSplit.size(); + return pendingSplits.size(); } @Override public void registerReader(int subtaskId) { - if (!pendingSplit.isEmpty()) { + log.info("Register reader {} to TDengineSourceSplitEnumerator.", subtaskId); + if (!pendingSplits.isEmpty()) { assignSplit(Collections.singletonList(subtaskId)); } } - private void assignSplit(Collection<Integer> taskIDList) { - assignedSplit = - pendingSplit.stream() - .map( - split -> { - int splitOwner = - getSplitOwner( - split.splitId(), context.currentParallelism()); - if (taskIDList.contains(splitOwner)) { - context.assignSplit(splitOwner, split); - return split; - } else { - return null; - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - pendingSplit.clear(); + private void assignSplit(Collection<Integer> readers) { + log.info("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List<TDengineSourceSplit> assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.info("Assign splits {} to reader {}", assignmentForReader, reader); + try { + context.assignSplit(reader, assignmentForReader); + } catch (Exception e) { + log.error( + "Failed to assign splits {} to reader {}", + assignmentForReader, + reader, + e); + pendingSplits.put(reader, assignmentForReader); + } + } + } } @Override public TDengineSourceState snapshotState(long checkpointId) { - return new TDengineSourceState(assignedSplit); - } - - @Override - public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { - SourceSplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent); - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - // nothing to do + synchronized (stateLock) { + return new TDengineSourceState(shouldEnumerate, pendingSplits); + } } @Override - public void notifyCheckpointAborted(long checkpointId) throws Exception { - SourceSplitEnumerator.super.notifyCheckpointAborted(checkpointId); - } + public void notifyCheckpointComplete(long checkpointId) {} @Override public void close() {} @Override public void handleSplitRequest(int subtaskId) { - // nothing to do + throw new TDengineConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + String.format("Unsupported handleSplitRequest: %d", subtaskId)); } } diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java index fc839682a9..4832cd398f 100644 --- a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java +++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/state/TDengineSourceState.java @@ -19,18 +19,16 @@ package org.apache.seatunnel.connectors.seatunnel.tdengine.state; import org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit; +import lombok.AllArgsConstructor; +import lombok.Getter; + import java.io.Serializable; -import java.util.Set; +import java.util.List; +import java.util.Map; +@AllArgsConstructor +@Getter public class TDengineSourceState implements Serializable { - - private final Set<TDengineSourceSplit> assignedSplit; - - public TDengineSourceState(Set<TDengineSourceSplit> assignedSplit) { - this.assignedSplit = assignedSplit; - } - - public Set<TDengineSourceSplit> getAssignedSplit() { - return assignedSplit; - } + private boolean shouldEnumerate; + private final Map<Integer, List<TDengineSourceSplit>> pendingSplits; } diff --git a/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java new file mode 100644 index 0000000000..abd42fefe1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-tdengine/src/test/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReaderTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +class TDengineSourceReaderTest { + Logger logger; + TDengineSourceReader tDengineSourceReader; + + @BeforeEach + void setup() { + tDengineSourceReader = new TDengineSourceReader(null, null); + + List<TDengineSourceSplit> sourceSplits = new ArrayList<>(); + int splitCnt = 100; + for (int i = 0; i < splitCnt; i++) { + sourceSplits.add(new TDengineSourceSplit(Integer.toString(i), "select sever_status()")); + } + + tDengineSourceReader.addSplits(sourceSplits); + + logger = Logger.getLogger("TDengineSourceReaderTest"); + } + + @Test + void testPoll() throws InterruptedException { + TestCollector testCollector = new TestCollector(); + + int totalSplitCnt = 150; + ThreadPoolExecutor pool = + new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + pool.execute( + () -> { + for (int i = 0; i < totalSplitCnt; i++) { + try { + tDengineSourceReader.pollNext(testCollector); + Thread.sleep(new Random().nextInt(5)); + } catch (TDengineConnectorException e) { + logger.info("skip create connection!"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + int newSplitCnt = 50; + int threadCnt = 3; + for (int i = 0; i < threadCnt; i++) { + pool.execute( + () -> { + for (int idx = 0; idx < newSplitCnt; idx++) { + logger.info( + String.format( + "%s receive new split", + Thread.currentThread().getName())); + tDengineSourceReader.addSplits( + Collections.singletonList( + new TDengineSourceSplit( + String.format( + "new_%s", + Thread.currentThread().getName() + idx), + "select server_status()"))); + try { + Thread.sleep(new Random().nextInt(5)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + + pool.awaitTermination(3, TimeUnit.SECONDS); + } + + private static class TestCollector implements Collector<SeaTunnelRow> { + + private final List<SeaTunnelRow> rows = new ArrayList<>(); + + public List<SeaTunnelRow> getRows() { + return rows; + } + + @Override + public void collect(SeaTunnelRow record) { + rows.add(record); + } + + @Override + public Object getCheckpointLock() { + return new Object(); + } + } +}