pyckle commented on code in PR #1273:
URL: https://github.com/apache/parquet-java/pull/1273#discussion_r2258246902
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
+ if (rowGroupIdxIn != rowGroupIdx) {
+ rowGroupIdxIn = rowGroupIdx;
+ flushWriters();
+ initWriters();
+ }
+ while (rowsToWrite > 0) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ BlockMetaData block = blocks.get(rowGroupIdxOut);
+ List<ColumnChunkMetaData> chunks = block.getColumns();
+ long leftInBlock = block.getRowCount() - writtenFromBlock;
+ long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+ for (ColumnChunkMetaData chunk : chunks) {
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() +
" is encrypted");
+ }
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ copyValues(descriptor, writeFromBlock);
+ }
+ rowsToWrite -= writeFromBlock;
+ writtenFromBlock += writeFromBlock;
+ if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+ rowGroupIdxOut++;
+ if (rowGroupIdxOut == blocks.size()) {
+ inputFiles.poll();
+ rowGroupIdxOut = 0;
+ }
+ writtenFromBlock = 0;
+ // this is called after all rows are processed
+ initReaders();
+ }
+ }
+ flushWriters();
+ }
+
+ private void flushWriters() throws IOException {
+ cStores.values().forEach(cStore -> {
+ cStore.flush();
+ cStore.close();
+ });
+ cWriters.values().forEach(ColumnWriter::close);
+ for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+ if (cPageStores.containsKey(descriptor))
+ cPageStores.get(descriptor).flushToFileWriter(writer);
Review Comment:
I encountered this same question while writing a parquet driver
(https://github.com/Earnix/parquetforge). So I'll share what I dug up.
The parquet-format spec requires the ColumnChunk order in the footer to
match the schema order. That's documented here:
https://github.com/apache/parquet-format/blob/1dbc814b97c9307687a2e4bee55545ab6a2ef106/src/main/thrift/parquet.thrift#L1002
In the JIRA ticket linked to the commit message (now migrated to GH) -
https://github.com/apache/parquet-java/issues/1734 it seems that at least
impala accepts columns written in arbitrary orders, and ordering the
Dict/DataPages of a column chunk is not required.
With this said, be aware that this can adversely affect performance. See:
https://dl.acm.org/doi/10.1145/3035918.3035930
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]