gemini-code-assist[bot] commented on code in PR #37740:
URL: https://github.com/apache/beam/pull/37740#discussion_r2879906837


##########
sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java:
##########
@@ -518,6 +523,64 @@ public void 
testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() {
     readPipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testWriteWithWriterProperties() throws Exception {
+    int customPageSize = 256 * 1024; // 256 KB
+    int customMinRowCount = 5;
+    List<GenericRecord> records = generateGenericRecords(1000);
+
+    mainPipeline
+        .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+        .apply(
+            FileIO.<GenericRecord>write()
+                .via(
+                    ParquetIO.sink(SCHEMA)
+                        .withPageSize(customPageSize)
+                        .withDictionaryEncoding(false)
+                        .withBloomFilterEnabled(true)
+                        .withMinRowCountForPageSizeCheck(customMinRowCount))
+                .to(temporaryFolder.getRoot().getAbsolutePath()));
+    mainPipeline.run().waitUntilFinish();
+
+    // Read back the file metadata and verify the settings took effect.
+    File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> 
!name.startsWith("."));
+    assertTrue("Expected at least one output file", outputFiles != null && 
outputFiles.length > 0);
+
+    org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(outputFiles[0].toURI());
+    try (ParquetFileReader reader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new 
Configuration()))) {
+      ParquetMetadata footer = reader.getFooter();
+
+      // Verify bloom filters were written: at least one column should have a 
bloom filter.
+      boolean hasBloomFilter = false;
+      for (BlockMetaData block : footer.getBlocks()) {
+        for (ColumnChunkMetaData col : block.getColumns()) {
+          if (col.getBloomFilterOffset() >= 0) {
+            hasBloomFilter = true;
+          }
+        }
+      }
+      assertTrue("Expected bloom filters to be present", hasBloomFilter);
+
+      // Verify dictionary encoding was disabled: no columns should use 
dictionary pages.
+      for (BlockMetaData block : footer.getBlocks()) {
+        for (ColumnChunkMetaData col : block.getColumns()) {
+          assertEquals(
+              "Expected no dictionary page when dictionary encoding is 
disabled",
+              0,
+              col.getDictionaryPageOffset());
+        }
+      }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   This check for dictionary pages has a small bug and can be made more 
readable. When no dictionary page is present, `getDictionaryPageOffset()` 
returns `-1`, not `0`. Additionally, using Java Streams can make this check 
more concise.
   
   ```java
         boolean allHaveNoDictionary =
             footer.getBlocks().stream()
                 .flatMap(block -> block.getColumns().stream())
                 .allMatch(col -> col.getDictionaryPageOffset() == -1L);
         assertTrue(
             "Expected no dictionary pages when dictionary encoding is 
disabled", allHaveNoDictionary);
   ```



##########
sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java:
##########
@@ -518,6 +523,64 @@ public void 
testWriteAndReadFilesAsJsonForUnknownSchemaWithConfiguration() {
     readPipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testWriteWithWriterProperties() throws Exception {
+    int customPageSize = 256 * 1024; // 256 KB
+    int customMinRowCount = 5;
+    List<GenericRecord> records = generateGenericRecords(1000);
+
+    mainPipeline
+        .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
+        .apply(
+            FileIO.<GenericRecord>write()
+                .via(
+                    ParquetIO.sink(SCHEMA)
+                        .withPageSize(customPageSize)
+                        .withDictionaryEncoding(false)
+                        .withBloomFilterEnabled(true)
+                        .withMinRowCountForPageSizeCheck(customMinRowCount))
+                .to(temporaryFolder.getRoot().getAbsolutePath()));
+    mainPipeline.run().waitUntilFinish();
+
+    // Read back the file metadata and verify the settings took effect.
+    File[] outputFiles = temporaryFolder.getRoot().listFiles((dir, name) -> 
!name.startsWith("."));
+    assertTrue("Expected at least one output file", outputFiles != null && 
outputFiles.length > 0);
+
+    org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(outputFiles[0].toURI());
+    try (ParquetFileReader reader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(hadoopPath, new 
Configuration()))) {
+      ParquetMetadata footer = reader.getFooter();
+
+      // Verify bloom filters were written: at least one column should have a 
bloom filter.
+      boolean hasBloomFilter = false;
+      for (BlockMetaData block : footer.getBlocks()) {
+        for (ColumnChunkMetaData col : block.getColumns()) {
+          if (col.getBloomFilterOffset() >= 0) {
+            hasBloomFilter = true;
+          }
+        }
+      }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This loop to check for bloom filters can be simplified using Java Streams, 
which would make the code more concise and readable.
   
   ```java
         boolean hasBloomFilter =
             footer.getBlocks().stream()
                 .flatMap(block -> block.getColumns().stream())
                 .anyMatch(col -> col.getBloomFilterOffset() >= 0);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to