This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c21bc4  [improve] Reduce the performance loss of additional buffer 
expansion. (#143)
1c21bc4 is described below

commit 1c21bc46d971eaed7df24e0382329bbbae4ed4c3
Author: Chuang Li <64473732+codecooke...@users.noreply.github.com>
AuthorDate: Wed Oct 25 15:02:38 2023 +0800

    [improve] Reduce the performance loss of additional buffer expansion. (#143)
---
 .../doris/spark/load/RecordBatchInputStream.java   | 88 +++++++---------------
 1 file changed, 29 insertions(+), 59 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
index f70809b..a361c39 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java
@@ -40,8 +40,6 @@ public class RecordBatchInputStream extends InputStream {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(RecordBatchInputStream.class);
 
-    private static final int DEFAULT_BUF_SIZE = 4096;
-
     /**
      * Load record batch
      */
@@ -55,7 +53,12 @@ public class RecordBatchInputStream extends InputStream {
     /**
      * record buffer
      */
-    private ByteBuffer buffer = ByteBuffer.allocate(0);
+
+    private ByteBuffer lineBuf = ByteBuffer.allocate(0);;
+
+    private ByteBuffer delimBuf = ByteBuffer.allocate(0);
+
+    private final byte[] delim;
 
     /**
      * record count has been read
@@ -70,31 +73,42 @@ public class RecordBatchInputStream extends InputStream {
     public RecordBatchInputStream(RecordBatch recordBatch, boolean 
passThrough) {
         this.recordBatch = recordBatch;
         this.passThrough = passThrough;
+        this.delim = recordBatch.getDelim();
     }
 
     @Override
     public int read() throws IOException {
         try {
-            if (buffer.remaining() == 0 && endOfBatch()) {
-                return -1; // End of stream
+            if (lineBuf.remaining() == 0 && endOfBatch()) {
+                return -1;
+            }
+
+            if (delimBuf != null && delimBuf.remaining() > 0) {
+                return delimBuf.get() & 0xff;
             }
         } catch (DorisException e) {
             throw new IOException(e);
         }
-        return buffer.get() & 0xFF;
+        return lineBuf.get() & 0xFF;
     }
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
         try {
-            if (buffer.remaining() == 0 && endOfBatch()) {
-                return -1; // End of stream
+            if (lineBuf.remaining() == 0 && endOfBatch()) {
+                return -1;
+            }
+
+            if (delimBuf != null && delimBuf.remaining() > 0) {
+                int bytesRead = Math.min(len, delimBuf.remaining());
+                delimBuf.get(b, off, bytesRead);
+                return bytesRead;
             }
         } catch (DorisException e) {
             throw new IOException(e);
         }
-        int bytesRead = Math.min(len, buffer.remaining());
-        buffer.get(b, off, bytesRead);
+        int bytesRead = Math.min(len, lineBuf.remaining());
+        lineBuf.get(b, off, bytesRead);
         return bytesRead;
     }
 
@@ -109,6 +123,7 @@ public class RecordBatchInputStream extends InputStream {
     public boolean endOfBatch() throws DorisException {
         Iterator<InternalRow> iterator = recordBatch.getIterator();
         if (readCount >= recordBatch.getBatchSize() || !iterator.hasNext()) {
+            delimBuf = null;
             return true;
         }
         readNext(iterator);
@@ -125,62 +140,18 @@ public class RecordBatchInputStream extends InputStream {
         if (!iterator.hasNext()) {
             throw new ShouldNeverHappenException();
         }
-        byte[] delim = recordBatch.getDelim();
         byte[] rowBytes = rowToByte(iterator.next());
         if (isFirst) {
-            ensureCapacity(rowBytes.length);
-            buffer.put(rowBytes);
-            buffer.flip();
+            delimBuf = null;
+            lineBuf = ByteBuffer.wrap(rowBytes);
             isFirst = false;
         } else {
-            ensureCapacity(delim.length + rowBytes.length);
-            buffer.put(delim);
-            buffer.put(rowBytes);
-            buffer.flip();
+            delimBuf =  ByteBuffer.wrap(delim);
+            lineBuf = ByteBuffer.wrap(rowBytes);
         }
         readCount++;
     }
 
-    /**
-     * Check if the buffer has enough capacity.
-     *
-     * @param need required buffer space
-     */
-    private void ensureCapacity(int need) {
-
-        int capacity = buffer.capacity();
-
-        if (need <= capacity) {
-            buffer.clear();
-            return;
-        }
-
-        // need to extend
-        int newCapacity = calculateNewCapacity(capacity, need);
-        LOG.info("expand buffer, min cap: {}, now cap: {}, new cap: {}", need, 
capacity, newCapacity);
-        buffer = ByteBuffer.allocate(newCapacity);
-
-    }
-
-    /**
-     * Calculate new capacity for buffer expansion.
-     *
-     * @param capacity current buffer capacity
-     * @param minCapacity required min buffer space
-     * @return new capacity
-     */
-    private int calculateNewCapacity(int capacity, int minCapacity) {
-        int newCapacity = 0;
-        if (capacity == 0) {
-            newCapacity = DEFAULT_BUF_SIZE;
-
-        }
-        while (newCapacity < minCapacity) {
-            newCapacity = newCapacity << 1;
-        }
-        return newCapacity;
-    }
-
     /**
      * Convert Spark row data to byte array
      *
@@ -220,5 +191,4 @@ public class RecordBatchInputStream extends InputStream {
 
     }
 
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to