gemini-code-assist[bot] commented on code in PR #37740:
URL: https://github.com/apache/beam/pull/37740#discussion_r2879906837
##########
sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java:
##########
@@ -518,6 +523,64 @@ public void
testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() {
readPipeline.run().waitUntilFinish();
}
+ @Test
+ public void testWriteWithWriterProperties() throws Exception {
+ int customPageSize = 256 * 1024; // 256 KB
+ int customMinRowCount = 5;
+ List<GenericRecord> records = generateGenericRecords(1000);
+
+ mainPipeline
+ .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+ .apply(
+ FileIO.<GenericRecord>write()
+ .via(
+ ParquetIO.sink(SCHEMA)
+ .withPageSize(customPageSize)
+ .withDictionaryEncoding(false)
+ .withBloomFilterEnabled(true)
+ .withMinRowCountForPageSizeCheck(customMinRowCount))
+ .to(temporaryFolder.getRoot().getAbsolutePath()));
+ mainPipeline.run().waitUntilFinish();
+
+ // Read back the file metadata and verify the settings took effect.
+ File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) ->
!name.startsWith("."));
+ assertTrue("Expected at least one output file", outputFiles != null &&
outputFiles.length > 0);
+
+ org.apache.hadoop.fs.Path hadoopPath = new
org.apache.hadoop.fs.Path(outputFiles[0].toURI());
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new
Configuration()))) {
+ ParquetMetadata footer = reader.getFooter();
+
+ // Verify bloom filters were written: at least one column should have a
bloom filter.
+ boolean hasBloomFilter = false;
+ for (BlockMetaData block : footer.getBlocks()) {
+ for (ColumnChunkMetaData col : block.getColumns()) {
+ if (col.getBloomFilterOffset() >= 0) {
+ hasBloomFilter = true;
+ }
+ }
+ }
+ assertTrue("Expected bloom filters to be present", hasBloomFilter);
+
+ // Verify dictionary encoding was disabled: no columns should use
dictionary pages.
+ for (BlockMetaData block : footer.getBlocks()) {
+ for (ColumnChunkMetaData col : block.getColumns()) {
+ assertEquals(
+ "Expected no dictionary page when dictionary encoding is
disabled",
+ 0,
+ col.getDictionaryPageOffset());
+ }
+ }
Review Comment:

This check for dictionary pages has a small bug and can be made more
readable. When no dictionary page is present, `getDictionaryPageOffset()`
returns `-1`, not `0`. Additionally, using Java Streams can make this check
more concise.
```java
boolean allHaveNoDictionary =
footer.getBlocks().stream()
.flatMap(block -> block.getColumns().stream())
.allMatch(col -> col.getDictionaryPageOffset() == -1L);
assertTrue(
"Expected no dictionary pages when dictionary encoding is
disabled", allHaveNoDictionary);
```
##########
sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java:
##########
@@ -518,6 +523,64 @@ public void
testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() {
readPipeline.run().waitUntilFinish();
}
+ @Test
+ public void testWriteWithWriterProperties() throws Exception {
+ int customPageSize = 256 * 1024; // 256 KB
+ int customMinRowCount = 5;
+ List<GenericRecord> records = generateGenericRecords(1000);
+
+ mainPipeline
+ .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+ .apply(
+ FileIO.<GenericRecord>write()
+ .via(
+ ParquetIO.sink(SCHEMA)
+ .withPageSize(customPageSize)
+ .withDictionaryEncoding(false)
+ .withBloomFilterEnabled(true)
+ .withMinRowCountForPageSizeCheck(customMinRowCount))
+ .to(temporaryFolder.getRoot().getAbsolutePath()));
+ mainPipeline.run().waitUntilFinish();
+
+ // Read back the file metadata and verify the settings took effect.
+ File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) ->
!name.startsWith("."));
+ assertTrue("Expected at least one output file", outputFiles != null &&
outputFiles.length > 0);
+
+ org.apache.hadoop.fs.Path hadoopPath = new
org.apache.hadoop.fs.Path(outputFiles[0].toURI());
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new
Configuration()))) {
+ ParquetMetadata footer = reader.getFooter();
+
+ // Verify bloom filters were written: at least one column should have a
bloom filter.
+ boolean hasBloomFilter = false;
+ for (BlockMetaData block : footer.getBlocks()) {
+ for (ColumnChunkMetaData col : block.getColumns()) {
+ if (col.getBloomFilterOffset() >= 0) {
+ hasBloomFilter = true;
+ }
+ }
+ }
Review Comment:

This loop to check for bloom filters can be simplified using Java Streams,
which would make the code more concise and readable.
```java
boolean hasBloomFilter =
footer.getBlocks().stream()
.flatMap(block -> block.getColumns().stream())
.anyMatch(col -> col.getBloomFilterOffset() >= 0);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]