This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit c6bb1b44a952c1effbb6dcb6dcd92884b7565b26
Author: Yun Tang <myas...@live.com>
AuthorDate: Sat Oct 23 16:45:24 2021 +0800

    [Flink][Bug] Fix potential NPE when cancel DorisSourceFunction (#6838)
    
    Fix potential NPE of `scalaValueReader` when cancelling DorisSourceFunction.
---
 .../doris/flink/datastream/DorisSourceFunction.java   | 19 +++++++++++++------
 .../doris/flink/datastream/ScalaValueReader.scala     |  3 +--
 2 files changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java 
b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 08ec5b0..85f5f6b 100644
--- a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -43,8 +43,8 @@ public class DorisSourceFunction extends 
RichSourceFunction<List<?>> implements
     private final DorisDeserializationSchema<List<?>> deserializer;
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
+    private transient volatile boolean isRunning;
     private List<PartitionDefinition> dorisPartitions;
-    private ScalaValueReader scalaValueReader;
 
     public DorisSourceFunction(DorisStreamOptions streamOptions, 
DorisDeserializationSchema<List<?>> deserializer) {
         this.deserializer = deserializer;
@@ -55,25 +55,32 @@ public class DorisSourceFunction extends 
RichSourceFunction<List<?>> implements
     @Override
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
+        this.isRunning = true;
         this.dorisPartitions = RestService.findPartitions(options, 
readOptions, logger);
     }
 
     @Override
     public void run(SourceContext<List<?>> sourceContext) {
         for (PartitionDefinition partitions : dorisPartitions) {
-            scalaValueReader = new ScalaValueReader(partitions, options, 
readOptions);
-            while (scalaValueReader.hasNext()) {
-                List<?> next = scalaValueReader.next();
-                sourceContext.collect(next);
+            try (ScalaValueReader scalaValueReader = new 
ScalaValueReader(partitions, options, readOptions)) {
+                while (isRunning && scalaValueReader.hasNext()) {
+                    List<?> next = scalaValueReader.next();
+                    sourceContext.collect(next);
+                }
             }
         }
     }
 
     @Override
     public void cancel() {
-        scalaValueReader.close();
+        isRunning = false;
     }
 
+    @Override
+    public void close() throws Exception {
+        super.close();
+        isRunning = false;
+    }
 
     @Override
     public TypeInformation<List<?>> getProducedType() {
diff --git 
a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala 
b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
index 093390d..06df2ef 100644
--- a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
+++ b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
@@ -19,7 +19,6 @@ package org.apache.doris.flink.datastream
 
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
-
 import org.apache.doris.flink.backend.BackendClient
 import org.apache.doris.flink.cfg.ConfigurationOptions._
 import org.apache.doris.flink.cfg.{DorisOptions, DorisReadOptions}
@@ -41,7 +40,7 @@ import scala.util.control.Breaks
  * @param partition Doris RDD partition
  * @param options request configuration
  */
-class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, 
readOptions: DorisReadOptions) {
+class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, 
readOptions: DorisReadOptions) extends AutoCloseable {
   protected val logger = Logger.getLogger(classOf[ScalaValueReader])
 
   protected val client = new BackendClient(new 
Routing(partition.getBeAddress), readOptions)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to