alextinng commented on code in PR #6088: URL: https://github.com/apache/seatunnel/pull/6088#discussion_r1708499995
########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java: ########## @@ -39,84 +35,76 @@ import java.sql.Statement; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.Objects; import java.util.Properties; -import java.util.Set; +import java.util.concurrent.ConcurrentLinkedDeque; import static org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist; @Slf4j public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> { - - private static final long THREAD_WAIT_TIME = 500L; - private final TDengineSourceConfig config; - private final Set<TDengineSourceSplit> sourceSplits; + private final Deque<TDengineSourceSplit> sourceSplits; private final Context context; private Connection conn; + private volatile boolean noMoreSplit; + public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) { this.config = config; - this.sourceSplits = Sets.newHashSet(); + this.sourceSplits = new ConcurrentLinkedDeque<>(); this.context = readerContext; } @Override public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException { - if (sourceSplits.isEmpty()) { - Thread.sleep(THREAD_WAIT_TIME); - return; - } synchronized (collector.getCheckpointLock()) { - sourceSplits.forEach( - split -> { - try { - read(split, collector); - } catch (Exception e) { - throw new TDengineConnectorException( - CommonErrorCodeDeprecated.READER_OPERATION_FAILED, - "TDengine split read error", - e); - } - }); - } - - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { - // signal to the source that we have reached the end of the data. - log.info("Closed the bounded TDengine source"); - context.signalNoMoreElement(); + log.info("polling new split from queue!"); + TDengineSourceSplit split = sourceSplits.poll(); + if (Objects.nonNull(split)) { + log.info( + "starting run new split {}, query sql: {}!", + split.splitId(), + split.getQuery()); + try { + read(split, collector); + } catch (Exception e) { + throw new TDengineConnectorException( + CommonErrorCodeDeprecated.READER_OPERATION_FAILED, + "TDengine split read error", + e); + } + } else if (noMoreSplit && sourceSplits.isEmpty()) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded TDengine source"); + context.signalNoMoreElement(); + } else { + Thread.sleep(1000L); + } } } @Override public void open() { - String jdbcUrl = - StringUtils.join( - config.getUrl(), - config.getDatabase(), - "?user=", Review Comment: > why update this config? it's not safe to put username and password in jdbc url, it's better to put password in properties and pass properties to jdbc driver in production -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org