This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 1c21bc4 [improve] Reduce the performance loss of additional buffer expansion. (#143) 1c21bc4 is described below commit 1c21bc46d971eaed7df24e0382329bbbae4ed4c3 Author: Chuang Li <64473732+codecooke...@users.noreply.github.com> AuthorDate: Wed Oct 25 15:02:38 2023 +0800 [improve] Reduce the performance loss of additional buffer expansion. (#143) --- .../doris/spark/load/RecordBatchInputStream.java | 88 +++++++--------------- 1 file changed, 29 insertions(+), 59 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java index f70809b..a361c39 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java @@ -40,8 +40,6 @@ public class RecordBatchInputStream extends InputStream { public static final Logger LOG = LoggerFactory.getLogger(RecordBatchInputStream.class); - private static final int DEFAULT_BUF_SIZE = 4096; - /** * Load record batch */ @@ -55,7 +53,12 @@ public class RecordBatchInputStream extends InputStream { /** * record buffer */ - private ByteBuffer buffer = ByteBuffer.allocate(0); + + private ByteBuffer lineBuf = ByteBuffer.allocate(0);; + + private ByteBuffer delimBuf = ByteBuffer.allocate(0); + + private final byte[] delim; /** * record count has been read @@ -70,31 +73,42 @@ public class RecordBatchInputStream extends InputStream { public RecordBatchInputStream(RecordBatch recordBatch, boolean passThrough) { this.recordBatch = recordBatch; this.passThrough = passThrough; + this.delim = recordBatch.getDelim(); } @Override public int read() throws IOException { try { - if (buffer.remaining() == 0 && endOfBatch()) { - return -1; // End of stream + if (lineBuf.remaining() == 0 && endOfBatch()) { + return -1; + } + + if (delimBuf != null && delimBuf.remaining() > 0) { + return delimBuf.get() & 0xff; } } catch (DorisException e) { throw new IOException(e); } - return buffer.get() & 0xFF; + return lineBuf.get() & 0xFF; } @Override public int read(byte[] b, int off, int len) throws IOException { try { - if (buffer.remaining() == 0 && endOfBatch()) { - return -1; // End of stream + if (lineBuf.remaining() == 0 && endOfBatch()) { + return -1; + } + + if (delimBuf != null && delimBuf.remaining() > 0) { + int bytesRead = Math.min(len, delimBuf.remaining()); + delimBuf.get(b, off, bytesRead); + return bytesRead; } } catch (DorisException e) { throw new IOException(e); } - int bytesRead = Math.min(len, buffer.remaining()); - buffer.get(b, off, bytesRead); + int bytesRead = Math.min(len, lineBuf.remaining()); + lineBuf.get(b, off, bytesRead); return bytesRead; } @@ -109,6 +123,7 @@ public class RecordBatchInputStream extends InputStream { public boolean endOfBatch() throws DorisException { Iterator<InternalRow> iterator = recordBatch.getIterator(); if (readCount >= recordBatch.getBatchSize() || !iterator.hasNext()) { + delimBuf = null; return true; } readNext(iterator); @@ -125,62 +140,18 @@ public class RecordBatchInputStream extends InputStream { if (!iterator.hasNext()) { throw new ShouldNeverHappenException(); } - byte[] delim = recordBatch.getDelim(); byte[] rowBytes = rowToByte(iterator.next()); if (isFirst) { - ensureCapacity(rowBytes.length); - buffer.put(rowBytes); - buffer.flip(); + delimBuf = null; + lineBuf = ByteBuffer.wrap(rowBytes); isFirst = false; } else { - ensureCapacity(delim.length + rowBytes.length); - buffer.put(delim); - buffer.put(rowBytes); - buffer.flip(); + delimBuf = ByteBuffer.wrap(delim); + lineBuf = ByteBuffer.wrap(rowBytes); } readCount++; } - /** - * Check if the buffer has enough capacity. - * - * @param need required buffer space - */ - private void ensureCapacity(int need) { - - int capacity = buffer.capacity(); - - if (need <= capacity) { - buffer.clear(); - return; - } - - // need to extend - int newCapacity = calculateNewCapacity(capacity, need); - LOG.info("expand buffer, min cap: {}, now cap: {}, new cap: {}", need, capacity, newCapacity); - buffer = ByteBuffer.allocate(newCapacity); - - } - - /** - * Calculate new capacity for buffer expansion. - * - * @param capacity current buffer capacity - * @param minCapacity required min buffer space - * @return new capacity - */ - private int calculateNewCapacity(int capacity, int minCapacity) { - int newCapacity = 0; - if (capacity == 0) { - newCapacity = DEFAULT_BUF_SIZE; - - } - while (newCapacity < minCapacity) { - newCapacity = newCapacity << 1; - } - return newCapacity; - } - /** * Convert Spark row data to byte array * @@ -220,5 +191,4 @@ public class RecordBatchInputStream extends InputStream { } - } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org