Fokko commented on code in PR #3565:
URL: https://github.com/apache/parquet-java/pull/3565#discussion_r3291176269


##########
parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java:
##########
@@ -19,44 +19,44 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+/**
+ * Plain encoding reader for BINARY values.
+ *
+ * <p>Reads directly from a {@link ByteBuffer} with {@link 
ByteOrder#LITTLE_ENDIAN} byte order,
+ * using {@link ByteBuffer#getInt()} for the 4-byte length prefix instead of 4 
individual
+ * {@code InputStream.read()} calls through {@link 
org.apache.parquet.bytes.BytesUtils#readIntLittleEndian}.
+ */
 public class BinaryPlainValuesReader extends ValuesReader {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BinaryPlainValuesReader.class);
-  private ByteBufferInputStream in;
+  private ByteBuffer buffer;
 
   @Override
   public Binary readBytes() {
-    try {
-      int length = BytesUtils.readIntLittleEndian(in);
-      return Binary.fromConstantByteBuffer(in.slice(length));
-    } catch (IOException | RuntimeException e) {
-      throw new ParquetDecodingException("could not read bytes at offset " + 
in.position(), e);

Review Comment:
   Should we keep the `ParquetDecodingException`? Otherwise we're throwing the 
raw `{IOException,RuntimeException}` which is a behavioral change.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java:
##########
@@ -19,120 +19,91 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.LittleEndianDataInputStream;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Plain encoding for float, double, int, long
+ * Plain encoding for float, double, int, long.
+ *
+ * <p>Reads directly from a {@link ByteBuffer} with {@link 
ByteOrder#LITTLE_ENDIAN} byte order,
+ * bypassing the deprecated {@code LittleEndianDataInputStream} wrapper to 
avoid per-value virtual
+ * dispatch overhead. The underlying page data is obtained as a single 
contiguous {@link ByteBuffer}
+ * via {@link ByteBufferInputStream#slice(int)}.
  */
 public abstract class PlainValuesReader extends ValuesReader {
   private static final Logger LOG = 
LoggerFactory.getLogger(PlainValuesReader.class);
 
-  protected LittleEndianDataInputStream in;
+  protected ByteBuffer buffer;
 
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream stream) 
throws IOException {
     LOG.debug("init from page at offset {} for length {}", stream.position(), 
stream.available());
-    this.in = new LittleEndianDataInputStream(stream.remainingStream());
+    int available = stream.available();
+    if (available > 0) {
+      this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN);
+    } else {
+      this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);
+    }
   }
 
   @Override
   public void skip() {
     skip(1);
   }
 
-  void skipBytesFully(int n) throws IOException {
-    int skipped = 0;
-    while (skipped < n) {
-      skipped += in.skipBytes(n - skipped);
-    }
-  }
-
   public static class DoublePlainValuesReader extends PlainValuesReader {
 
     @Override
     public void skip(int n) {
-      try {
-        skipBytesFully(n * 8);
-      } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip " + n + " double 
values", e);

Review Comment:
   Same here, do we want to keep the `ParquetDecodingException`?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java:
##########
@@ -19,120 +19,91 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.LittleEndianDataInputStream;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Plain encoding for float, double, int, long
+ * Plain encoding for float, double, int, long.
+ *
+ * <p>Reads directly from a {@link ByteBuffer} with {@link 
ByteOrder#LITTLE_ENDIAN} byte order,
+ * bypassing the deprecated {@code LittleEndianDataInputStream} wrapper to 
avoid per-value virtual
+ * dispatch overhead. The underlying page data is obtained as a single 
contiguous {@link ByteBuffer}
+ * via {@link ByteBufferInputStream#slice(int)}.
  */
 public abstract class PlainValuesReader extends ValuesReader {
   private static final Logger LOG = 
LoggerFactory.getLogger(PlainValuesReader.class);
 
-  protected LittleEndianDataInputStream in;
+  protected ByteBuffer buffer;
 
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream stream) 
throws IOException {
     LOG.debug("init from page at offset {} for length {}", stream.position(), 
stream.available());
-    this.in = new LittleEndianDataInputStream(stream.remainingStream());
+    int available = stream.available();
+    if (available > 0) {
+      this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN);
+    } else {
+      this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);
+    }
   }
 
   @Override
   public void skip() {
     skip(1);
   }
 
-  void skipBytesFully(int n) throws IOException {
-    int skipped = 0;
-    while (skipped < n) {
-      skipped += in.skipBytes(n - skipped);
-    }
-  }
-
   public static class DoublePlainValuesReader extends PlainValuesReader {
 
     @Override
     public void skip(int n) {
-      try {
-        skipBytesFully(n * 8);
-      } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip " + n + " double 
values", e);
-      }
+      buffer.position(buffer.position() + n * 8);

Review Comment:
   Should we use `Math.multiplyExact` here and below?



##########
parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java:
##########
@@ -643,6 +673,20 @@ public ByteBuffer toByteBuffer() throws IOException {
       return java.nio.ByteBuffer.wrap(in, offset, length);
     }
 
+    /**
+     * Zero-copy override: returns the backing array directly when fully used,
+     * skipping the base-class BAOS allocation + copy on every decompressor 
call.
+     * Returning the mutable array is safe — the base class already exposes a
+     * mutable {@code BAOS.getBuf()}.
+     */
+    @Override
+    public byte[] toByteArray() {

Review Comment:
   This overrides a deprecated API, as a follow-up we probably should move the 
internal calls to the new API: 
   
   ```java
   @deprecated Use {@link #toByteBuffer(ByteBufferAllocator, Consumer)}
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java:
##########
@@ -19,44 +19,44 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+/**
+ * Plain encoding reader for BINARY values.
+ *
+ * <p>Reads directly from a {@link ByteBuffer} with {@link 
ByteOrder#LITTLE_ENDIAN} byte order,
+ * using {@link ByteBuffer#getInt()} for the 4-byte length prefix instead of 4 
individual
+ * {@code InputStream.read()} calls through {@link 
org.apache.parquet.bytes.BytesUtils#readIntLittleEndian}.
+ */
 public class BinaryPlainValuesReader extends ValuesReader {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BinaryPlainValuesReader.class);
-  private ByteBufferInputStream in;
+  private ByteBuffer buffer;
 
   @Override
   public Binary readBytes() {
-    try {
-      int length = BytesUtils.readIntLittleEndian(in);
-      return Binary.fromConstantByteBuffer(in.slice(length));
-    } catch (IOException | RuntimeException e) {
-      throw new ParquetDecodingException("could not read bytes at offset " + 
in.position(), e);
-    }
+    int length = buffer.getInt();
+    ByteBuffer valueSlice = buffer.slice();
+    valueSlice.limit(length);
+    buffer.position(buffer.position() + length);
+    return Binary.fromConstantByteBuffer(valueSlice);
   }
 
   @Override
   public void skip() {
-    try {
-      int length = BytesUtils.readIntLittleEndian(in);
-      in.skipFully(length);
-    } catch (IOException | RuntimeException e) {
-      throw new ParquetDecodingException("could not skip bytes at offset " + 
in.position(), e);
-    }
+    int length = buffer.getInt();
+    buffer.position(buffer.position() + length);
   }
 
   @Override
   public void initFromPage(int valueCount, ByteBufferInputStream stream) 
throws IOException {
-    LOG.debug(
-        "init from page at offset {} for length {}",
-        stream.position(),
-        (stream.available() - stream.position()));
-    this.in = stream.remainingStream();
+    int available = stream.available();
+    if (available > 0) {
+      this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN);
+    } else {
+      this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);

Review Comment:
   Should we create a constant for the 
`ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);`?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java:
##########
@@ -19,120 +19,91 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.LittleEndianDataInputStream;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Plain encoding for float, double, int, long
+ * Plain encoding for float, double, int, long.
+ *
+ * <p>Reads directly from a {@link ByteBuffer} with {@link 
ByteOrder#LITTLE_ENDIAN} byte order,
+ * bypassing the deprecated {@code LittleEndianDataInputStream} wrapper to 
avoid per-value virtual
+ * dispatch overhead. The underlying page data is obtained as a single 
contiguous {@link ByteBuffer}
+ * via {@link ByteBufferInputStream#slice(int)}.
  */
 public abstract class PlainValuesReader extends ValuesReader {
   private static final Logger LOG = 
LoggerFactory.getLogger(PlainValuesReader.class);
 
-  protected LittleEndianDataInputStream in;

Review Comment:
   We should go through the deprecation cycle here, but is anything using this 
outside of the project itself?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java:
##########
@@ -19,52 +19,56 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * ValuesReader for FIXED_LEN_BYTE_ARRAY.
+ *
+ * <p>Reads directly from a {@link ByteBuffer}, bypassing the {@link 
ByteBufferInputStream}
+ * wrapper to avoid per-value stream overhead (remaining checks, IOException 
wrapping,
+ * virtual dispatch). The underlying page data is obtained as a single 
contiguous
+ * {@link ByteBuffer} via {@link ByteBufferInputStream#slice(int)}.
  */
 public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
   private static final Logger LOG = 
LoggerFactory.getLogger(FixedLenByteArrayPlainValuesReader.class);
 
   private final int length;
-  private ByteBufferInputStream in;
+  private ByteBuffer buffer;
 
   public FixedLenByteArrayPlainValuesReader(int length) {
     this.length = length;
   }
 
   @Override
   public Binary readBytes() {
-    try {
-      return Binary.fromConstantByteBuffer(in.slice(length));
-    } catch (IOException | RuntimeException e) {
-      throw new ParquetDecodingException("could not read bytes at offset " + 
in.position(), e);

Review Comment:
   Same as above, should we keep the wrapped `ParquetDecodingException`?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java:
##########
@@ -18,57 +18,65 @@
  */
 package org.apache.parquet.column.values.plain;
 
-import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
-
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * encodes boolean for the plain encoding: one bit at a time (0 = false)
+ * Decodes PLAIN-encoded booleans: one bit per value, packed 8 per byte, 
little-endian
+ * bit order (bit 0 of each byte is the first value).
+ *
+ * <p>Direct bit extraction from the page ByteBuffer avoids the overhead of 
the generic
+ * bit-packing machinery ({@code ByteBitPackingValuesReader}) and intermediate
+ * {@code int[8]} buffers.
  */
 public class BooleanPlainValuesReader extends ValuesReader {
   private static final Logger LOG = 
LoggerFactory.getLogger(BooleanPlainValuesReader.class);
 
-  private ByteBitPackingValuesReader in = new ByteBitPackingValuesReader(1, 
LITTLE_ENDIAN);
+  private byte[] pageData;
+  private int pageOffset;
+  private int bitIndex;
 
-  /**
-   * {@inheritDoc}
-   *
-   * @see org.apache.parquet.column.values.ValuesReader#readBoolean()
-   */
   @Override
-  public boolean readBoolean() {
-    return in.readInteger() == 0 ? false : true;
+  public void initFromPage(int valueCount, ByteBufferInputStream stream) 
throws IOException {
+    LOG.debug("init from page at offset {} for length {}", stream.position(), 
stream.available());
+    int effectiveBitLength = valueCount; // bitWidth = 1
+    int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
+    length = Math.min(length, stream.available());
+    ByteBuffer buf = stream.slice(length);
+
+    // Bulk access: use backing array directly if available, otherwise copy 
once.
+    if (buf.hasArray()) {
+      pageData = buf.array();
+      pageOffset = buf.arrayOffset() + buf.position();
+    } else {
+      pageData = new byte[length];
+      buf.get(pageData);
+      pageOffset = 0;
+    }
+    bitIndex = 0;
+    updateNextOffset(length);
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @see org.apache.parquet.column.values.ValuesReader#skip()
-   */
   @Override
-  public void skip() {
-    in.readInteger();
+  public boolean readBoolean() {
+    int byteIdx = pageOffset + (bitIndex >>> 3);
+    int bitPos = bitIndex & 7;
+    bitIndex++;
+    return ((pageData[byteIdx] >>> bitPos) & 1) != 0;
   }
 
-  /**
-   * {@inheritDoc}
-   *
-   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, 
ByteBufferInputStream)
-   */
   @Override
-  public void initFromPage(int valueCount, ByteBufferInputStream stream) 
throws IOException {
-    LOG.debug("init from page at offset {} for length {}", stream.position(), 
stream.available());
-    this.in.initFromPage(valueCount, stream);
+  public void skip() {
+    bitIndex++;
   }
 
-  @Deprecated
   @Override
-  public int getNextOffset() {
-    return in.getNextOffset();
+  public void skip(int n) {
+    bitIndex += n;

Review Comment:
   Should we check for bounds, and throw a `ParquetDecodingException` in case 
of out of bounds?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to