Fokko commented on code in PR #3569:
URL: https://github.com/apache/parquet-java/pull/3569#discussion_r3291380959
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java:
##########
@@ -76,27 +91,83 @@ public Encoding getEncoding() {
@Override
public void reset() {
+ batchCount = 0;
for (CapacityByteArrayOutputStream stream : this.byteStreams) {
stream.reset();
}
}
@Override
public void close() {
+ batchCount = 0;
for (CapacityByteArrayOutputStream stream : byteStreams) {
stream.close();
}
}
- protected void scatterBytes(byte[] bytes) {
- if (bytes.length != this.numStreams) {
- throw new ParquetEncodingException(String.format(
- "Number of bytes doesn't match the number of streams. Num butes: %d,
Num streams: %d",
- bytes.length, this.numStreams));
+ /**
+ * Buffer a 4-byte integer value for batched scatter to the byte streams.
+ * Values are accumulated until the batch is full, then flushed as bulk
+ * {@code write(byte[], off, len)} calls -- one per stream.
+ */
+ protected void bufferInt(int v) {
+ if (intBatch == null) {
+ intBatch = new int[BATCH_SIZE];
+ scatterBuf = new byte[BATCH_SIZE];
+ }
+ intBatch[batchCount++] = v;
+ if (batchCount == BATCH_SIZE) {
+ flushIntBatch();
+ }
+ }
+
+ /**
+ * Buffer an 8-byte long value for batched scatter to the byte streams.
+ */
+ protected void bufferLong(long v) {
+ if (longBatch == null) {
+ longBatch = new long[BATCH_SIZE];
+ scatterBuf = new byte[BATCH_SIZE];
}
- for (int i = 0; i < bytes.length; ++i) {
- this.byteStreams[i].write(bytes[i]);
+ longBatch[batchCount++] = v;
+ if (batchCount == BATCH_SIZE) {
+ flushLongBatch();
+ }
+ }
+
+ private void flushBatch() {
+ if (batchCount == 0) return;
+ if (intBatch != null) {
+ flushIntBatch();
+ } else if (longBatch != null) {
+ flushLongBatch();
+ }
Review Comment:
Should we add an else case that throws?
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java:
##########
@@ -29,9 +28,23 @@
public abstract class ByteStreamSplitValuesWriter extends ValuesWriter {
+ /**
+ * Batch size for buffered scatter writes. Values are accumulated in a batch
buffer
+ * and flushed as bulk {@code write(byte[], off, len)} calls to each stream,
replacing
+ * N individual single-byte writes with one bulk write per stream per flush.
+ */
+ private static final int BATCH_SIZE = 64;
+
protected final int numStreams;
protected final int elementSizeInBytes;
- private final CapacityByteArrayOutputStream[] byteStreams;
+ protected final CapacityByteArrayOutputStream[] byteStreams;
Review Comment:
Why not keep this one private?
```suggestion
private final CapacityByteArrayOutputStream[] byteStreams;
```
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesWriter.java:
##########
@@ -187,7 +260,49 @@ public FixedLenByteArrayByteStreamSplitValuesWriter(
public final void writeBytes(Binary v) {
assert (v.length() == length)
: ("Fixed Binary size " + v.length() + " does not match field type
length " + length);
- super.scatterBytes(v.getBytesUnsafe());
+ if (batchBufs == null) {
+ batchBufs = new byte[length][BATCH_SIZE];
+ }
+ byte[] bytes = v.getBytesUnsafe();
+ for (int stream = 0; stream < length; stream++) {
+ batchBufs[stream][flbaBatchCount] = bytes[stream];
+ }
+ flbaBatchCount++;
+ if (flbaBatchCount == BATCH_SIZE) {
+ flushFlbaBatch();
+ }
+ }
+
+ private void flushFlbaBatch() {
+ if (flbaBatchCount == 0) return;
+ final int count = flbaBatchCount;
+ for (int stream = 0; stream < length; stream++) {
+ byteStreams[stream].write(batchBufs[stream], 0, count);
Review Comment:
I see, probaly we want to make a method for this one:
```
writeToStream(stream, batchBufs[stream], 0, count);
```
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bytestreamsplit/ByteStreamSplitValuesReader.java:
##########
@@ -49,17 +49,138 @@ protected int nextElementByteOffset() {
return offset;
}
- // Decode an entire data page
+ // Decode an entire data page by transposing from stream-split layout to
interleaved layout.
private byte[] decodeData(ByteBuffer encoded, int valuesCount) {
- assert encoded.limit() == valuesCount * elementSizeInBytes;
- byte[] decoded = new byte[encoded.limit()];
- int destByteIndex = 0;
- for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) {
- for (int stream = 0; stream < elementSizeInBytes; ++stream,
++destByteIndex) {
- decoded[destByteIndex] = encoded.get(srcValueIndex + stream *
valuesCount);
+ int totalBytes = valuesCount * elementSizeInBytes;
+ assert encoded.remaining() >= totalBytes;
Review Comment:
```suggestion
assert encoded.remaining() == totalBytes;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]