hailin0 commented on code in PR #6088:
URL: https://github.com/apache/seatunnel/pull/6088#discussion_r1708409640


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -39,84 +35,76 @@
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
 
 @Slf4j
 public class TDengineSourceReader implements SourceReader<SeaTunnelRow, 
TDengineSourceSplit> {
-
-    private static final long THREAD_WAIT_TIME = 500L;
-
     private final TDengineSourceConfig config;
 
-    private final Set<TDengineSourceSplit> sourceSplits;
+    private final Deque<TDengineSourceSplit> sourceSplits;
 
     private final Context context;
 
     private Connection conn;
 
+    private volatile boolean noMoreSplit;
+
     public TDengineSourceReader(TDengineSourceConfig config, 
SourceReader.Context readerContext) {
         this.config = config;
-        this.sourceSplits = Sets.newHashSet();
+        this.sourceSplits = new ConcurrentLinkedDeque<>();
         this.context = readerContext;
     }
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> collector) throws 
InterruptedException {
-        if (sourceSplits.isEmpty()) {
-            Thread.sleep(THREAD_WAIT_TIME);
-            return;
-        }
         synchronized (collector.getCheckpointLock()) {
-            sourceSplits.forEach(
-                    split -> {
-                        try {
-                            read(split, collector);
-                        } catch (Exception e) {
-                            throw new TDengineConnectorException(
-                                    
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
-                                    "TDengine split read error",
-                                    e);
-                        }
-                    });
-        }
-
-        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            // signal to the source that we have reached the end of the data.
-            log.info("Closed the bounded TDengine source");
-            context.signalNoMoreElement();
+            log.info("polling new split from queue!");
+            TDengineSourceSplit split = sourceSplits.poll();
+            if (Objects.nonNull(split)) {
+                log.info(
+                        "starting run new split {}, query sql: {}!",
+                        split.splitId(),
+                        split.getQuery());
+                try {
+                    read(split, collector);
+                } catch (Exception e) {
+                    throw new TDengineConnectorException(
+                            CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
+                            "TDengine split read error",
+                            e);
+                }
+            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info("Closed the bounded TDengine source");
+                context.signalNoMoreElement();
+            } else {
+                Thread.sleep(1000L);
+            }
         }
     }
 
     @Override
     public void open() {
-        String jdbcUrl =
-                StringUtils.join(
-                        config.getUrl(),
-                        config.getDatabase(),
-                        "?user=",

Review Comment:
   why update this config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to