This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new d5df8477b GH-3327: Bug fix incorrect compressed size reported by
DataPageV1 (#3326)
d5df8477b is described below
commit d5df8477bfb2aaab2f8825864baf6038072b9ae4
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Sep 25 16:45:43 2025 +0530
GH-3327: Bug fix incorrect compressed size reported by DataPageV1 (#3326)
---
.../parquet/cli/commands/ShowPagesCommand.java | 54 ++++++++++++++++++++--
1 file changed, 49 insertions(+), 5 deletions(-)
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
index f8a5b0007..8ca531c6d 100644
---
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
@@ -36,6 +36,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.text.TextStringBuilder;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.cli.BaseCommand;
import org.apache.parquet.cli.rawpages.RawPagesReader;
import org.apache.parquet.column.ColumnDescriptor;
@@ -46,9 +47,14 @@ import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.Page;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.slf4j.Logger;
@@ -107,7 +113,7 @@ public class ShowPagesCommand extends BaseCommand {
reader.getRowGroups().get(0).getColumns().get(0).getCodec();
// accumulate formatted lines to print by column
Map<String, List<String>> formatted = Maps.newLinkedHashMap();
- PageFormatter formatter = new PageFormatter();
+ PageFormatter formatter = new PageFormatter(reader);
PageReadStore pageStore;
int rowGroupNum = 0;
while ((pageStore = reader.readNextRowGroup()) != null) {
@@ -118,7 +124,7 @@ public class ShowPagesCommand extends BaseCommand {
formatted.put(columnName(descriptor), lines);
}
- formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
+ formatter.setContext(rowGroupNum, columns.get(descriptor), codec,
descriptor);
PageReader pages = pageStore.getPageReader(descriptor);
DictionaryPage dict = pages.readDictionaryPage();
@@ -155,10 +161,16 @@ public class ShowPagesCommand extends BaseCommand {
}
private class PageFormatter implements DataPage.Visitor<String> {
+ private final ParquetFileReader reader;
private int rowGroupNum;
private int pageNum;
private PrimitiveType type;
private String shortCodec;
+ private ColumnDescriptor currentColumn;
+
+ PageFormatter(ParquetFileReader reader) {
+ this.reader = reader;
+ }
String getHeader() {
return String.format(
@@ -166,11 +178,12 @@ public class ShowPagesCommand extends BaseCommand {
"page", "type", "enc", "count", "avg size", "size", "rows", "nulls",
"min / max");
}
- void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName
codec) {
+ void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName
codec, ColumnDescriptor column) {
this.rowGroupNum = rowGroupNum;
this.pageNum = 0;
this.type = type;
this.shortCodec = shortCodec(codec);
+ this.currentColumn = column;
}
String format(Page page) {
@@ -184,6 +197,37 @@ public class ShowPagesCommand extends BaseCommand {
return formatted;
}
+ private long getPageCompressedSize() {
+ long compressedSize = 0;
+ try (SeekableInputStream input = HadoopInputFile.fromPath(new
Path(reader.getFile()), getConf())
+ .newStream()) {
+ BlockMetaData rowGroup = reader.getRowGroups().get(rowGroupNum);
+ int columnIndex =
+
reader.getFileMetaData().getSchema().getColumns().indexOf(currentColumn);
+ ColumnChunkMetaData columnChunk =
rowGroup.getColumns().get(columnIndex);
+
+ long startOffset = columnChunk.hasDictionaryPage()
+ ? columnChunk.getDictionaryPageOffset()
+ : columnChunk.getFirstDataPageOffset();
+ input.seek(startOffset);
+ long endPos = startOffset + columnChunk.getTotalSize();
+ int currentPageIndex = 0;
+
+ while (input.getPos() < endPos) {
+ PageHeader pageHeader = Util.readPageHeader(input);
+ if (currentPageIndex == pageNum) {
+ compressedSize = pageHeader.getCompressed_page_size();
+ break;
+ }
+ input.skip(pageHeader.getCompressed_page_size());
+ currentPageIndex++;
+ }
+ return compressedSize;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private String printDictionaryPage(DictionaryPage dict) {
// TODO: the compressed size of a dictionary page is lost in Parquet
dict.getUncompressedSize();
@@ -212,7 +256,7 @@ public class ShowPagesCommand extends BaseCommand {
@Override
public String visit(DataPageV1 page) {
String enc = encodingAsString(page.getValueEncoding(), false);
- long totalSize = page.getCompressedSize();
+ long totalSize = getPageCompressedSize();
int count = page.getValueCount();
String numNulls = page.getStatistics().isNumNullsSet()
? Long.toString(page.getStatistics().getNumNulls())
@@ -237,7 +281,7 @@ public class ShowPagesCommand extends BaseCommand {
@Override
public String visit(DataPageV2 page) {
String enc = encodingAsString(page.getDataEncoding(), false);
- long totalSize = page.getCompressedSize();
+ long totalSize = getPageCompressedSize();
int count = page.getValueCount();
int numRows = page.getRowCount();
int numNulls = page.getNullCount();