alextinng commented on code in PR #6088:
URL: https://github.com/apache/seatunnel/pull/6088#discussion_r1708499995


##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -39,84 +35,76 @@
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
 
 @Slf4j
 public class TDengineSourceReader implements SourceReader<SeaTunnelRow, 
TDengineSourceSplit> {
-
-    private static final long THREAD_WAIT_TIME = 500L;
-
     private final TDengineSourceConfig config;
 
-    private final Set<TDengineSourceSplit> sourceSplits;
+    private final Deque<TDengineSourceSplit> sourceSplits;
 
     private final Context context;
 
     private Connection conn;
 
+    private volatile boolean noMoreSplit;
+
     public TDengineSourceReader(TDengineSourceConfig config, 
SourceReader.Context readerContext) {
         this.config = config;
-        this.sourceSplits = Sets.newHashSet();
+        this.sourceSplits = new ConcurrentLinkedDeque<>();
         this.context = readerContext;
     }
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> collector) throws 
InterruptedException {
-        if (sourceSplits.isEmpty()) {
-            Thread.sleep(THREAD_WAIT_TIME);
-            return;
-        }
         synchronized (collector.getCheckpointLock()) {
-            sourceSplits.forEach(
-                    split -> {
-                        try {
-                            read(split, collector);
-                        } catch (Exception e) {
-                            throw new TDengineConnectorException(
-                                    
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
-                                    "TDengine split read error",
-                                    e);
-                        }
-                    });
-        }
-
-        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            // signal to the source that we have reached the end of the data.
-            log.info("Closed the bounded TDengine source");
-            context.signalNoMoreElement();
+            log.info("polling new split from queue!");
+            TDengineSourceSplit split = sourceSplits.poll();
+            if (Objects.nonNull(split)) {
+                log.info(
+                        "starting run new split {}, query sql: {}!",
+                        split.splitId(),
+                        split.getQuery());
+                try {
+                    read(split, collector);
+                } catch (Exception e) {
+                    throw new TDengineConnectorException(
+                            CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
+                            "TDengine split read error",
+                            e);
+                }
+            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info("Closed the bounded TDengine source");
+                context.signalNoMoreElement();
+            } else {
+                Thread.sleep(1000L);
+            }
         }
     }
 
     @Override
     public void open() {
-        String jdbcUrl =
-                StringUtils.join(
-                        config.getUrl(),
-                        config.getDatabase(),
-                        "?user=",

Review Comment:
   > why update this config?
   
   it's not safe to put username and password in jdbc url, it's better to put 
password in properties and pass properties to jdbc driver in production
   
   There's a risk that the JDBC URL, including the password, could be logged 
inadvertently, especially if the logging level is not properly configured to 
exclude sensitive information.



##########
seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java:
##########
@@ -39,84 +35,76 @@
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;
 
 @Slf4j
 public class TDengineSourceReader implements SourceReader<SeaTunnelRow, 
TDengineSourceSplit> {
-
-    private static final long THREAD_WAIT_TIME = 500L;
-
     private final TDengineSourceConfig config;
 
-    private final Set<TDengineSourceSplit> sourceSplits;
+    private final Deque<TDengineSourceSplit> sourceSplits;
 
     private final Context context;
 
     private Connection conn;
 
+    private volatile boolean noMoreSplit;
+
     public TDengineSourceReader(TDengineSourceConfig config, 
SourceReader.Context readerContext) {
         this.config = config;
-        this.sourceSplits = Sets.newHashSet();
+        this.sourceSplits = new ConcurrentLinkedDeque<>();
         this.context = readerContext;
     }
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> collector) throws 
InterruptedException {
-        if (sourceSplits.isEmpty()) {
-            Thread.sleep(THREAD_WAIT_TIME);
-            return;
-        }
         synchronized (collector.getCheckpointLock()) {
-            sourceSplits.forEach(
-                    split -> {
-                        try {
-                            read(split, collector);
-                        } catch (Exception e) {
-                            throw new TDengineConnectorException(
-                                    
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
-                                    "TDengine split read error",
-                                    e);
-                        }
-                    });
-        }
-
-        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            // signal to the source that we have reached the end of the data.
-            log.info("Closed the bounded TDengine source");
-            context.signalNoMoreElement();
+            log.info("polling new split from queue!");
+            TDengineSourceSplit split = sourceSplits.poll();
+            if (Objects.nonNull(split)) {
+                log.info(
+                        "starting run new split {}, query sql: {}!",
+                        split.splitId(),
+                        split.getQuery());
+                try {
+                    read(split, collector);
+                } catch (Exception e) {
+                    throw new TDengineConnectorException(
+                            CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
+                            "TDengine split read error",
+                            e);
+                }
+            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info("Closed the bounded TDengine source");
+                context.signalNoMoreElement();
+            } else {
+                Thread.sleep(1000L);
+            }
         }
     }
 
     @Override
     public void open() {
-        String jdbcUrl =
-                StringUtils.join(
-                        config.getUrl(),
-                        config.getDatabase(),
-                        "?user=",

Review Comment:
   > why update this config?
   
   It's not safe to put username and password in jdbc url, it's better to put 
password in properties and pass properties to jdbc driver in production
   
   There's a risk that the JDBC URL, including the password, could be logged 
inadvertently, especially if the logging level is not properly configured to 
exclude sensitive information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to