This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 13f2aa9 [fix](connector) fix performance degradation caused by
interval mistake (#294)
13f2aa9 is described below
commit 13f2aa9b57a1dc0d9317a37f0b1aad087adb6b2a
Author: gnehil <[email protected]>
AuthorDate: Fri Mar 21 14:09:41 2025 +0800
[fix](connector) fix performance degradation caused by interval mistake
(#294)
---
.../client/write/AbstractCopyIntoProcessor.java | 3 +-
.../client/write/AbstractStreamLoadProcessor.java | 6 +-
.../doris/spark/client/write/DorisWriter.java | 34 ++++++++++--
.../doris/spark/sql/DorisRowFlightSqlReader.scala | 4 +-
.../doris/spark/sql/DorisRowThriftReader.scala | 4 +-
.../scala/org/apache/doris/spark/util/Retry.scala | 3 +-
.../apache/doris/spark/write/DorisDataWriter.scala | 64 +++++++++++-----------
7 files changed, 76 insertions(+), 42 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java
index 6cf3549..28f7258 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractCopyIntoProcessor.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-public abstract class AbstractCopyIntoProcessor<R> implements DorisWriter<R>,
DorisCommitter {
+public abstract class AbstractCopyIntoProcessor<R> extends DorisWriter<R>
implements DorisCommitter {
protected static final Logger LOG =
LoggerFactory.getLogger("CopyIntoProcessor");
@@ -97,6 +97,7 @@ public abstract class AbstractCopyIntoProcessor<R> implements
DorisWriter<R>, Do
private boolean isNewBatch = true;
public AbstractCopyIntoProcessor(DorisConfig config) throws Exception {
+ super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE));
this.config = config;
this.frontend = new DorisFrontendClient(config);
this.properties = config.getSinkProperties();
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 9bc3037..2a10ffa 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-public abstract class AbstractStreamLoadProcessor<R> implements
DorisWriter<R>, DorisCommitter {
+public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R>
implements DorisCommitter {
protected final Logger logger =
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
@@ -112,6 +112,7 @@ public abstract class AbstractStreamLoadProcessor<R>
implements DorisWriter<R>,
private Future<CloseableHttpResponse> requestFuture = null;
public AbstractStreamLoadProcessor(DorisConfig config) throws Exception {
+ super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE));
this.config = config;
String tableIdentifier =
config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER);
String[] dbTableArr = tableIdentifier.split("\\.");
@@ -151,11 +152,13 @@ public abstract class AbstractStreamLoadProcessor<R>
implements DorisWriter<R>,
createNewBatch = false;
}
output.write(toFormat(row, format));
+ currentBatchCount++;
}
@Override
public String stop() throws Exception {
if (requestFuture != null) {
+ createNewBatch = true;
// arrow format need to send all buffer data before stop
if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) {
List<R> rs = new LinkedList<>(recordBuffer);
@@ -172,7 +175,6 @@ public abstract class AbstractStreamLoadProcessor<R>
implements DorisWriter<R>,
logger.info("stream load response: {}", resEntity);
StreamLoadResponse response = MAPPER.readValue(resEntity,
StreamLoadResponse.class);
if (response != null && response.isSuccess()) {
- createNewBatch = true;
return isTwoPhaseCommitEnabled ?
String.valueOf(response.getTxnId()) : null;
} else {
throw new StreamLoadException("stream load execute failed,
response: " + resEntity);
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
index 281a9b8..ada89c7 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
@@ -17,14 +17,40 @@
package org.apache.doris.spark.client.write;
+import org.apache.doris.spark.config.DorisOptions;
+
import java.io.IOException;
import java.io.Serializable;
-public interface DorisWriter<R> extends Serializable {
+public abstract class DorisWriter<R> implements Serializable {
+
+ protected int batchSize;
+
+ protected int currentBatchCount = 0;
+
+ public DorisWriter(int batchSize) {
+ if (batchSize <= 0) {
+ throw new
IllegalArgumentException(DorisOptions.DORIS_SINK_BATCH_SIZE.getName() + " must
be greater than 0");
+ }
+ this.batchSize = batchSize;
+ }
+
+ public abstract void load(R row) throws Exception;
+
+ public abstract String stop() throws Exception;
+
+ public abstract void close() throws IOException;
+
+ public boolean endOfBatch() {
+ return currentBatchCount >= batchSize;
+ }
- void load(R row) throws Exception;
+ public int getBatchCount() {
+ return currentBatchCount;
+ }
- String stop() throws Exception;
+ public void resetBatchCount() {
+ currentBatchCount = 0;
+ }
- void close() throws IOException;
}
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
index 3b5baed..03796c6 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowFlightSqlReader.scala
@@ -22,7 +22,7 @@ import org.apache.doris.spark.client.read.DorisFlightSqlReader
import org.apache.doris.spark.config.DorisOptions
import org.apache.doris.spark.exception.ShouldNeverHappenException
-import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.JavaConverters.{asScalaBufferConverter,
mapAsScalaMapConverter}
class DorisRowFlightSqlReader(partition: DorisReaderPartition) extends
DorisFlightSqlReader(partition) {
@@ -34,6 +34,8 @@ class DorisRowFlightSqlReader(partition:
DorisReaderPartition) extends DorisFlig
}
val row: DorisRow = new DorisRow(rowOrder)
rowBatch.next.asScala.zipWithIndex.foreach {
+ case (s, index) if index < row.values.size &&
s.isInstanceOf[java.util.HashMap[String, String]] =>
+ row.values.update(index, s.asInstanceOf[java.util.HashMap[String,
String]].asScala)
case (s, index) if index < row.values.size => row.values.update(index, s)
case _ => // nothing
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
index 07236b9..9c3cefa 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/DorisRowThriftReader.scala
@@ -22,7 +22,7 @@ import org.apache.doris.spark.client.read.DorisThriftReader
import org.apache.doris.spark.config.DorisOptions
import org.apache.doris.spark.exception.ShouldNeverHappenException
-import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.JavaConverters.{asScalaBufferConverter,
mapAsScalaMapConverter}
class DorisRowThriftReader(partition: DorisReaderPartition) extends
DorisThriftReader(partition) {
@@ -34,6 +34,8 @@ class DorisRowThriftReader(partition: DorisReaderPartition)
extends DorisThriftR
}
val row: DorisRow = new DorisRow(rowOrder)
rowBatch.next.asScala.zipWithIndex.foreach {
+ case (s, index) if index < row.values.size &&
s.isInstanceOf[java.util.HashMap[String, String]] =>
+ row.values.update(index, s.asInstanceOf[java.util.HashMap[String,
String]].asScala)
case (s, index) if index < row.values.size => row.values.update(index, s)
case _ => // nothing
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala
index c41be51..24f8c13 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/Retry.scala
@@ -34,10 +34,9 @@ object Retry {
val result = Try(f)
result match {
case Success(result) =>
- LockSupport.parkNanos(interval.toNanos)
Success(result)
case Failure(exception: T) if retryTimes > 0 =>
- logger.warn("Execution failed caused by: ", exception)
+ logger.warn("Execution failed caused by: {}", exception.getMessage)
logger.warn(s"$retryTimes times retry remaining, the next attempt will
be in ${interval.toMillis} ms")
LockSupport.parkNanos(interval.toNanos)
h
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index 03c8617..f4ff49f 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -27,11 +27,14 @@ import org.apache.spark.sql.connector.write.{DataWriter,
WriterCommitMessage}
import org.apache.spark.sql.types.StructType
import java.time.Duration
+import java.util.concurrent.locks.LockSupport
import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Random, Success}
class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId:
Int, taskId: Long, epochId: Long = -1) extends DataWriter[InternalRow] with
Logging {
+ private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)
+
private val (writer: DorisWriter[InternalRow], committer: DorisCommitter) =
config.getValue(DorisOptions.LOAD_MODE) match {
case "stream_load" => (new StreamLoadProcessor(config, schema), new
StreamLoadProcessor(config, schema))
@@ -39,43 +42,25 @@ class DorisDataWriter(config: DorisConfig, schema:
StructType, partitionId: Int,
case mode => throw new IllegalArgumentException("Unsupported load mode:
" + mode)
}
- private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)
-
private val batchIntervalMs =
config.getValue(DorisOptions.DORIS_SINK_BATCH_INTERVAL_MS)
private val retries = config.getValue(DorisOptions.DORIS_SINK_MAX_RETRIES)
private val twoPhaseCommitEnabled =
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC)
- private var currentBatchCount = 0
-
private val committedMessages = mutable.Buffer[String]()
private lazy val recordBuffer = mutable.Buffer[InternalRow]()
- override def write(record: InternalRow): Unit = {
- if (currentBatchCount >= batchSize) {
- val txnId = Some(writer.stop())
- if (txnId.isDefined) {
- committedMessages += txnId.get
- currentBatchCount = 0
- if (retries != 0) {
- recordBuffer.clear()
- }
- } else {
- throw new Exception()
- }
- }
- loadWithRetries(record)
- }
+ override def write(record: InternalRow): Unit = loadBatchWithRetries(record)
override def commit(): WriterCommitMessage = {
- val txnId = writer.stop()
+ val txnId = Option(writer.stop())
if (twoPhaseCommitEnabled) {
- if (StringUtils.isNotBlank(txnId)) {
- committedMessages += txnId
+ if (txnId.isDefined) {
+ committedMessages += txnId.get
} else {
- throw new Exception()
+ throw new Exception("Failed to commit batch")
}
}
DorisWriterCommitMessage(partitionId, taskId, epochId,
committedMessages.toArray)
@@ -95,26 +80,43 @@ class DorisDataWriter(config: DorisConfig, schema:
StructType, partitionId: Int,
}
@throws[Exception]
- private def loadWithRetries(record: InternalRow): Unit = {
+ private def loadBatchWithRetries(record: InternalRow): Unit = {
var isRetrying = false
Retry.exec[Unit, Exception](retries,
Duration.ofMillis(batchIntervalMs.toLong), log) {
if (isRetrying) {
+ // retrying, reload data from buffer
do {
- writer.load(recordBuffer(currentBatchCount))
- currentBatchCount += 1
- } while (currentBatchCount < recordBuffer.size)
+ val idx = writer.getBatchCount
+ writer.load(recordBuffer(idx))
+ } while (writer.getBatchCount < recordBuffer.size)
isRetrying = false
}
+ if (writer.endOfBatch()) {
+ // end of batch, stop batch write
+ val txnId = Option(writer.stop())
+ if (twoPhaseCommitEnabled) {
+ if (txnId.isDefined) {
+ committedMessages += txnId.get
+ } else {
+ throw new Exception("Failed to end batch write")
+ }
+ }
+ // clear buffer if retry is enabled
+ if (retries > 0) {
+ recordBuffer.clear()
+ }
+ writer.resetBatchCount()
+ LockSupport.parkNanos(batchIntervalMs.toLong)
+ }
writer.load(record)
- currentBatchCount += 1
} {
+ // batch write failed, set retry flag and reset batch count
isRetrying = true
- currentBatchCount = 0
+ writer.resetBatchCount()
} match {
case Success(_) => if (retries > 0) recordBuffer += record
case Failure(exception) => throw new Exception(exception)
}
-
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]