This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit c6bb1b44a952c1effbb6dcb6dcd92884b7565b26 Author: Yun Tang <myas...@live.com> AuthorDate: Sat Oct 23 16:45:24 2021 +0800 [Flink][Bug] Fix potential NPE when cancel DorisSourceFunction (#6838) Fix potential NPE of `scalaValueReader` when cancelling DorisSourceFunction. --- .../doris/flink/datastream/DorisSourceFunction.java | 19 +++++++++++++------ .../doris/flink/datastream/ScalaValueReader.scala | 3 +-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java index 08ec5b0..85f5f6b 100644 --- a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java +++ b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java @@ -43,8 +43,8 @@ public class DorisSourceFunction extends RichSourceFunction<List<?>> implements private final DorisDeserializationSchema<List<?>> deserializer; private final DorisOptions options; private final DorisReadOptions readOptions; + private transient volatile boolean isRunning; private List<PartitionDefinition> dorisPartitions; - private ScalaValueReader scalaValueReader; public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) { this.deserializer = deserializer; @@ -55,25 +55,32 @@ public class DorisSourceFunction extends RichSourceFunction<List<?>> implements @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + this.isRunning = true; this.dorisPartitions = RestService.findPartitions(options, readOptions, logger); } @Override public void run(SourceContext<List<?>> sourceContext) { for (PartitionDefinition partitions : dorisPartitions) { - scalaValueReader = new ScalaValueReader(partitions, options, readOptions); - while (scalaValueReader.hasNext()) { - List<?> next = scalaValueReader.next(); - sourceContext.collect(next); + try (ScalaValueReader scalaValueReader = new ScalaValueReader(partitions, options, readOptions)) { + while (isRunning && scalaValueReader.hasNext()) { + List<?> next = scalaValueReader.next(); + sourceContext.collect(next); + } } } } @Override public void cancel() { - scalaValueReader.close(); + isRunning = false; } + @Override + public void close() throws Exception { + super.close(); + isRunning = false; + } @Override public TypeInformation<List<?>> getProducedType() { diff --git a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala index 093390d..06df2ef 100644 --- a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala @@ -19,7 +19,6 @@ package org.apache.doris.flink.datastream import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean - import org.apache.doris.flink.backend.BackendClient import org.apache.doris.flink.cfg.ConfigurationOptions._ import org.apache.doris.flink.cfg.{DorisOptions, DorisReadOptions} @@ -41,7 +40,7 @@ import scala.util.control.Breaks * @param partition Doris RDD partition * @param options request configuration */ -class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, readOptions: DorisReadOptions) { +class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, readOptions: DorisReadOptions) extends AutoCloseable { protected val logger = Logger.getLogger(classOf[ScalaValueReader]) protected val client = new BackendClient(new Routing(partition.getBeAddress), readOptions) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org