liugddx commented on code in PR #6088: URL: https://github.com/apache/seatunnel/pull/6088#discussion_r1696160438
########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java: ########## @@ -39,84 +35,76 @@ import java.sql.Statement; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.Objects; import java.util.Properties; -import java.util.Set; +import java.util.concurrent.ConcurrentLinkedDeque; import static org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist; @Slf4j public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> { - - private static final long THREAD_WAIT_TIME = 500L; - private final TDengineSourceConfig config; - private final Set<TDengineSourceSplit> sourceSplits; + private final Deque<TDengineSourceSplit> sourceSplits; private final Context context; private Connection conn; + private volatile boolean noMoreSplit; + public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) { this.config = config; - this.sourceSplits = Sets.newHashSet(); + this.sourceSplits = new ConcurrentLinkedDeque<>(); this.context = readerContext; } @Override public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException { - if (sourceSplits.isEmpty()) { - Thread.sleep(THREAD_WAIT_TIME); - return; - } synchronized (collector.getCheckpointLock()) { - sourceSplits.forEach( - split -> { - try { - read(split, collector); - } catch (Exception e) { - throw new TDengineConnectorException( - CommonErrorCodeDeprecated.READER_OPERATION_FAILED, - "TDengine split read error", - e); - } - }); - } - - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { - // signal to the source that we have reached the end of the data. - log.info("Closed the bounded TDengine source"); - context.signalNoMoreElement(); + log.info("polling new split from queue!"); + TDengineSourceSplit split = sourceSplits.poll(); + if (Objects.nonNull(split)) { + log.info( + "starting run new split {}, query sql: {}!", + split.splitId(), + split.getQuery()); + try { + read(split, collector); + } catch (Exception e) { + throw new TDengineConnectorException( + CommonErrorCodeDeprecated.READER_OPERATION_FAILED, + "TDengine split read error", + e); + } + } else if (noMoreSplit && sourceSplits.isEmpty()) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded TDengine source"); + context.signalNoMoreElement(); + } else { + Thread.sleep(1000L); + } } } @Override public void open() { - String jdbcUrl = - StringUtils.join( - config.getUrl(), - config.getDatabase(), - "?user=", - config.getUsername(), - "&password=", - config.getPassword()); - Properties connProps = new Properties(); - // todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true", - // there is a exception : Caused by: java.sql.SQLException: can't create connection with - // server - // under docker network env - // @bobo (tdengine) - connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false"); + String jdbcUrl = config.getUrl(); + + Properties properties = new Properties(); + properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername()); + properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword()); Review Comment: Can other jdbc parameters be added here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org