This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit dc9203a6582ffdc5b4b13cff877dedc607dbfc30
Author: wei zhao <zhaowei_3...@163.com>
AuthorDate: Fri Oct 29 17:06:05 2021 +0800

    [Optimize] Spark connector supports multiple spark 
versions:2.1.x/2.3.x/2.4.x/3.x (#6956)
    
    * Spark connector supports multiple spark versions:2.1.x/2.3.x/2.4.x/3.x
    Co-authored-by: wei.zhao <wei.z...@aispeech.com>
---
 .../org/apache/doris/spark/DorisStreamLoad.java    |   6 +-
 .../doris/spark/rdd/AbstractDorisRDDIterator.scala |  12 +-
 .../apache/doris/spark/rdd/ScalaValueReader.scala  |   2 +-
 .../doris/spark/sql/DorisSourceProvider.scala      |  26 +++--
 .../doris/spark/sql/DorisStreamLoadSink.scala      |  98 +++++++++++++++++
 .../apache/doris/spark/sql/DorisStreamWriter.scala | 122 ---------------------
 .../doris/spark/sql/ScalaDorisRowValueReader.scala |  10 +-
 .../scala/org/apache/doris/spark/sql/Utils.scala   |  38 +++++--
 8 files changed, 158 insertions(+), 156 deletions(-)

diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java 
b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index dcf569f..ccf3a5e 100644
--- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -150,6 +150,7 @@ public class DorisStreamLoad implements Serializable{
     }
 
     public void load(String value) throws StreamLoadException {
+        LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
         LoadResponse loadResponse = loadBatch(value);
         LOG.info("Streamload Response:{}",loadResponse);
         if(loadResponse.status != 200){
@@ -169,7 +170,7 @@ public class DorisStreamLoad implements Serializable{
 
     private LoadResponse loadBatch(String value) {
         Calendar calendar = Calendar.getInstance();
-        String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
+        String label = 
String.format("spark_streamload_%s%02d%02d_%02d%02d%02d_%s",
                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, 
calendar.get(Calendar.DAY_OF_MONTH),
                 calendar.get(Calendar.HOUR_OF_DAY), 
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                 UUID.randomUUID().toString().replaceAll("-", ""));
@@ -194,12 +195,11 @@ public class DorisStreamLoad implements Serializable{
             while ((line = br.readLine()) != null) {
                 response.append(line);
             }
-//            log.info("AuditLoader plugin load with label: {}, response code: 
{}, msg: {}, content: {}",label, status, respMsg, response.toString());
             return new LoadResponse(status, respMsg, response.toString());
 
         } catch (Exception e) {
             e.printStackTrace();
-            String err = "failed to load audit via AuditLoader plugin with 
label: " + label;
+            String err = "failed to execute spark streamload with label: " + 
label;
             LOG.warn(err, e);
             return new LoadResponse(-1, e.getMessage(), err);
         } finally {
diff --git 
a/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala 
b/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
index dc39773..5b2b36f 100644
--- a/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
+++ b/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
@@ -20,15 +20,15 @@ package org.apache.doris.spark.rdd
 import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_VALUE_READER_CLASS
 import org.apache.doris.spark.cfg.Settings
 import org.apache.doris.spark.rest.PartitionDefinition
-
 import org.apache.spark.util.TaskCompletionListener
-import org.apache.spark.internal.Logging
 import org.apache.spark.{TaskContext, TaskKilledException}
+import org.slf4j.{Logger, LoggerFactory}
 
 private[spark] abstract class AbstractDorisRDDIterator[T](
     context: TaskContext,
-    partition: PartitionDefinition) extends Iterator[T] with Logging {
+    partition: PartitionDefinition) extends Iterator[T] {
 
+  private val logger: Logger = 
LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
   private var initialized = false
   private var closed = false
 
@@ -38,7 +38,7 @@ private[spark] abstract class AbstractDorisRDDIterator[T](
     val settings = partition.settings()
     initReader(settings)
     val valueReaderName = settings.getProperty(DORIS_VALUE_READER_CLASS)
-    logDebug(s"Use value reader '$valueReaderName'.")
+    logger.debug(s"Use value reader '$valueReaderName'.")
     val cons = 
Class.forName(valueReaderName).getDeclaredConstructor(classOf[PartitionDefinition],
 classOf[Settings])
     cons.newInstance(partition, settings).asInstanceOf[ScalaValueReader]
   }
@@ -65,7 +65,7 @@ private[spark] abstract class AbstractDorisRDDIterator[T](
   }
 
   def closeIfNeeded(): Unit = {
-    logTrace(s"Close status is '$closed' when close Doris RDD Iterator")
+    logger.trace(s"Close status is '$closed' when close Doris RDD Iterator")
     if (!closed) {
       close()
       closed = true
@@ -73,7 +73,7 @@ private[spark] abstract class AbstractDorisRDDIterator[T](
   }
 
   protected def close(): Unit = {
-    logTrace(s"Initialize status is '$initialized' when close Doris RDD 
Iterator")
+    logger.trace(s"Initialize status is '$initialized' when close Doris RDD 
Iterator")
     if (initialized) {
       reader.close()
     }
diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala 
b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index a1b26e4..03643b2 100644
--- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -44,7 +44,7 @@ import scala.util.control.Breaks
  * @param settings request configuration
  */
 class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
-  protected val logger = Logger.getLogger(classOf[ScalaValueReader])
+  private val logger = Logger.getLogger(classOf[ScalaValueReader])
 
   protected val client = new BackendClient(new 
Routing(partition.getBeAddress), settings)
   protected val clientLock =
diff --git 
a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 3ac087d..ee77ce6 100644
--- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -21,25 +21,29 @@ import org.apache.doris.spark.DorisStreamLoad
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
 import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
-import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
-import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister, Filter, RelationProvider}
+import org.slf4j.{Logger, LoggerFactory}
 
 import java.io.IOException
 import java.util
-import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 import scala.util.control.Breaks
 
-private[sql] class DorisSourceProvider extends DataSourceRegister with 
RelationProvider with CreatableRelationProvider with StreamWriteSupport with 
Logging {
+private[sql] class DorisSourceProvider extends DataSourceRegister
+  with RelationProvider
+  with CreatableRelationProvider
+  with StreamSinkProvider {
+
+  private val logger: Logger = 
LoggerFactory.getLogger(classOf[DorisSourceProvider].getName)
+
   override def shortName(): String = SHORT_NAME
 
   override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String]): BaseRelation = {
-    new DorisRelation(sqlContext, Utils.params(parameters, log))
+    new DorisRelation(sqlContext, Utils.params(parameters, logger))
   }
 
 
@@ -51,7 +55,7 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister with RelationP
                               data: DataFrame): BaseRelation = {
 
     val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
-    sparkSettings.merge(Utils.params(parameters, log).asJava)
+    sparkSettings.merge(Utils.params(parameters, logger).asJava)
     // init stream loader
     val dorisStreamLoader = new DorisStreamLoad(sparkSettings)
 
@@ -124,10 +128,10 @@ private[sql] class DorisSourceProvider extends 
DataSourceRegister with RelationP
     }
   }
 
-  override def createStreamWriter(queryId: String, structType: StructType, 
outputMode: OutputMode, dataSourceOptions: DataSourceOptions): StreamWriter = {
+  override def createSink(sqlContext: SQLContext, parameters: Map[String, 
String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
     val sparkSettings = new SparkSettings(new SparkConf())
-    sparkSettings.merge(Utils.params(dataSourceOptions.asMap().toMap, 
log).asJava)
-    new DorisStreamWriter(sparkSettings)
+    sparkSettings.merge(Utils.params(parameters, logger).asJava)
+    new DorisStreamLoadSink(sqlContext, sparkSettings)
   }
 }
 
diff --git 
a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
new file mode 100644
index 0000000..409325d
--- /dev/null
+++ b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.slf4j.{Logger, LoggerFactory}
+
+import java.io.IOException
+import java.util
+import scala.util.control.Breaks
+
+private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: 
SparkSettings) extends Sink with Serializable {
+
+  private val logger: Logger = 
LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
+  @volatile private var latestBatchId = -1L
+  val maxRowCount: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, 
ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
+  val maxRetryTimes: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, 
ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
+  val dorisStreamLoader: DorisStreamLoad = 
CachedDorisStreamLoadClient.getOrCreate(settings)
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    if (batchId <= latestBatchId) {
+      logger.info(s"Skipping already committed batch $batchId")
+    } else {
+      write(data.queryExecution)
+      latestBatchId = batchId
+    }
+  }
+
+  def write(queryExecution: QueryExecution): Unit = {
+    queryExecution.toRdd.foreachPartition(iter => {
+      val rowsBuffer: util.List[util.List[Object]] = new 
util.ArrayList[util.List[Object]]()
+      iter.foreach(row => {
+        val line: util.List[Object] = new util.ArrayList[Object](maxRowCount)
+        for (i <- 0 until row.numFields) {
+          val field = row.copy().getUTF8String(i)
+          line.add(field.asInstanceOf[AnyRef])
+        }
+        rowsBuffer.add(line)
+        if (rowsBuffer.size > maxRowCount - 1) {
+          flush
+        }
+      })
+      // flush buffer
+      if (!rowsBuffer.isEmpty) {
+        flush
+      }
+
+      /**
+       * flush data to Doris and do retry when flush error
+       *
+       */
+      def flush = {
+        val loop = new Breaks
+        loop.breakable {
+
+          for (i <- 1 to maxRetryTimes) {
+            try {
+              dorisStreamLoader.load(rowsBuffer)
+              rowsBuffer.clear()
+              loop.break()
+            }
+            catch {
+              case e: Exception =>
+                try {
+                  Thread.sleep(1000 * i)
+                } catch {
+                  case ex: InterruptedException =>
+                    Thread.currentThread.interrupt()
+                    throw new IOException("unable to flush; interrupted while 
doing another attempt", e)
+                }
+            }
+          }
+        }
+      }
+    })
+  }
+
+  override def toString: String = "DorisStreamLoadSink"
+}
diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala 
b/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala
deleted file mode 100644
index 60d2c78..0000000
--- a/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark.sql
-
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
-import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, 
WriterCommitMessage}
-
-import java.io.IOException
-import java.util
-import scala.util.control.Breaks
-
-/**
- * A [[StreamWriter]] for Apache Doris streaming writing.
- *
- * @param settings params for writing doris table
- */
-class DorisStreamWriter(settings: SparkSettings) extends StreamWriter {
-  override def createWriterFactory(): DorisStreamWriterFactory = 
DorisStreamWriterFactory(settings)
-
-  override def commit(l: Long, writerCommitMessages: 
Array[WriterCommitMessage]): Unit = {}
-
-  override def abort(l: Long, writerCommitMessages: 
Array[WriterCommitMessage]): Unit = {}
-
-}
-
-/**
- * A [[DataWriterFactory]] for Apache Doris streaming writing. Will be 
serialized and sent to executors to generate
- * the per-task data writers.
- *
- * @param settings params for writing doris table
- */
-case class DorisStreamWriterFactory(settings: SparkSettings) extends 
DataWriterFactory[Row] {
-  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[Row] = {
-    new DorisStreamDataWriter(settings)
-  }
-}
-
-/**
- * Dummy commit message. The DataSourceV2 framework requires a commit message 
implementation but we
- * don't need to really send one.
- */
-case object DorisWriterCommitMessage extends WriterCommitMessage
-
-/**
- * A [[DataWriter]] for Apache Doris streaming writing. One data writer will 
be created in each partition to
- * process incoming rows.
- *
- * @param settings params for writing doris table
- */
-class DorisStreamDataWriter(settings: SparkSettings) extends DataWriter[Row] {
-  val maxRowCount: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, 
ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
-  val maxRetryTimes: Int = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, 
ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
-  val dorisStreamLoader: DorisStreamLoad = 
CachedDorisStreamLoadClient.getOrCreate(settings)
-  val rowsBuffer: util.List[util.List[Object]] = new 
util.ArrayList[util.List[Object]](maxRowCount)
-
-  override def write(row: Row): Unit = {
-    val line: util.List[Object] = new util.ArrayList[Object]()
-    for (i <- 0 until row.size) {
-      val field = row.get(i)
-      line.add(field.asInstanceOf[AnyRef])
-    }
-    if (!line.isEmpty) {
-      rowsBuffer.add(line)
-    }
-    if (rowsBuffer.size >= maxRowCount) {
-      // commit when buffer is full
-      commit()
-    }
-  }
-
-  override def commit(): WriterCommitMessage = {
-    // we don't commit request until rows-buffer received some rows
-    val loop = new Breaks
-    loop.breakable {
-      for (i <- 1 to maxRetryTimes) {
-        try {
-          if (!rowsBuffer.isEmpty) {
-            dorisStreamLoader.load(rowsBuffer)
-          }
-          rowsBuffer.clear()
-          loop.break()
-        }
-        catch {
-          case e: Exception =>
-            try {
-              Thread.sleep(1000 * i)
-              if (!rowsBuffer.isEmpty) {
-                dorisStreamLoader.load(rowsBuffer)
-              }
-              rowsBuffer.clear()
-            } catch {
-              case ex: InterruptedException =>
-                Thread.currentThread.interrupt()
-                throw new IOException("unable to flush; interrupted while 
doing another attempt", e)
-            }
-        }
-      }
-    }
-    DorisWriterCommitMessage
-  }
-
-  override def abort(): Unit = {
-  }
-}
\ No newline at end of file
diff --git 
a/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowValueReader.scala 
b/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowValueReader.scala
index 7825fcf..5b01854 100644
--- a/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowValueReader.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowValueReader.scala
@@ -18,26 +18,26 @@
 package org.apache.doris.spark.sql
 
 import scala.collection.JavaConverters._
-
 import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
 import org.apache.doris.spark.cfg.Settings
 import org.apache.doris.spark.exception.ShouldNeverHappenException
 import org.apache.doris.spark.rdd.ScalaValueReader
 import org.apache.doris.spark.rest.PartitionDefinition
 import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
-
-import org.apache.spark.internal.Logging
+import org.slf4j.{Logger, LoggerFactory}
 
 class ScalaDorisRowValueReader(
   partition: PartitionDefinition,
   settings: Settings)
-  extends ScalaValueReader(partition, settings) with Logging {
+  extends ScalaValueReader(partition, settings) {
+
+  private val logger: Logger = 
LoggerFactory.getLogger(classOf[ScalaDorisRowValueReader].getName)
 
   val rowOrder: Seq[String] = settings.getProperty(DORIS_READ_FIELD).split(",")
 
   override def next: AnyRef = {
     if (!hasNext) {
-      logError(SHOULD_NOT_HAPPEN_MESSAGE)
+      logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
       throw new ShouldNeverHappenException
     }
     val row: ScalaDorisRow = new ScalaDorisRow(rowOrder)
diff --git a/src/main/scala/org/apache/doris/spark/sql/Utils.scala 
b/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index f5b5af1..6b66646 100644
--- a/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++ b/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -17,14 +17,15 @@
 
 package org.apache.doris.spark.sql
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.doris.spark.cfg.ConfigurationOptions
 import org.apache.doris.spark.exception.DorisException
-
 import org.apache.spark.sql.jdbc.JdbcDialect
 import org.apache.spark.sql.sources._
-
 import org.slf4j.Logger
 
+import java.sql.{Date, Timestamp}
+
 private[sql] object Utils {
   /**
    * quote column name
@@ -42,16 +43,16 @@ private[sql] object Utils {
    */
   def compileFilter(filter: Filter, dialect: JdbcDialect, inValueLengthLimit: 
Int): Option[String] = {
     Option(filter match {
-      case EqualTo(attribute, value) => s"${quote(attribute)} = 
${dialect.compileValue(value)}"
-      case GreaterThan(attribute, value) => s"${quote(attribute)} > 
${dialect.compileValue(value)}"
-      case GreaterThanOrEqual(attribute, value) => s"${quote(attribute)} >= 
${dialect.compileValue(value)}"
-      case LessThan(attribute, value) => s"${quote(attribute)} < 
${dialect.compileValue(value)}"
-      case LessThanOrEqual(attribute, value) => s"${quote(attribute)} <= 
${dialect.compileValue(value)}"
+      case EqualTo(attribute, value) => s"${quote(attribute)} = 
${compileValue(value)}"
+      case GreaterThan(attribute, value) => s"${quote(attribute)} > 
${compileValue(value)}"
+      case GreaterThanOrEqual(attribute, value) => s"${quote(attribute)} >= 
${compileValue(value)}"
+      case LessThan(attribute, value) => s"${quote(attribute)} < 
${compileValue(value)}"
+      case LessThanOrEqual(attribute, value) => s"${quote(attribute)} <= 
${compileValue(value)}"
       case In(attribute, values) =>
         if (values.isEmpty || values.length >= inValueLengthLimit) {
           null
         } else {
-          s"${quote(attribute)} in (${dialect.compileValue(values)})"
+          s"${quote(attribute)} in (${compileValue(values)})"
         }
       case IsNull(attribute) => s"${quote(attribute)} is null"
       case IsNotNull(attribute) => s"${quote(attribute)} is not null"
@@ -74,6 +75,27 @@ private[sql] object Utils {
   }
 
   /**
+   * Escape special characters in SQL string literals.
+   * @param value The string to be escaped.
+   * @return Escaped string.
+   */
+  private def escapeSql(value: String): String =
+    if (value == null) null else StringUtils.replace(value, "'", "''")
+
+  /**
+   * Converts value to SQL expression.
+   * @param value The value to be converted.
+   * @return Converted value.
+   */
+  private def compileValue(value: Any): Any = value match {
+    case stringValue: String => s"'${escapeSql(stringValue)}'"
+    case timestampValue: Timestamp => "'" + timestampValue + "'"
+    case dateValue: Date => "'" + dateValue + "'"
+    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
+    case _ => value
+  }
+
+  /**
    * check parameters validation and process it.
    * @param parameters parameters from rdd and spark conf
    * @param logger slf4j logger

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to