dmvk commented on code in PR #22481:
URL: https://github.com/apache/flink/pull/22481#discussion_r1183474482


##########
flink-formats/flink-orc-nohive/pom.xml:
##########
@@ -90,6 +90,37 @@ under the License.
                        </exclusions>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+                       <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>ch.qos.reload4j</groupId>
+                                       <artifactId>reload4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-reload4j</artifactId>
+                               </exclusion>
+                       </exclusions>

Review Comment:
   Why do we need these excludes?



##########
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java:
##########
@@ -43,50 +43,51 @@ public class OrcBulkWriterTestUtil {
     public static final String USER_METADATA_KEY = "userKey";
     public static final ByteBuffer USER_METADATA_VALUE = 
ByteBuffer.wrap("hello".getBytes());
 
-    public static void validate(File files, List<Record> expected) throws 
IOException {
-        final File[] buckets = files.listFiles();
-        assertThat(buckets).isNotNull();
-        assertThat(buckets).hasSize(1);
+    public static void validate(File files, List<Record> expected, 
CompressionKind compressionKind)
+            throws IOException {
+        assertThat(files).isNotNull();
+        assertThat(files.exists()).isTrue();
 
+        assertThat(expected).isNotNull();
+        assertThat(expected).isNotEmpty();
+        validateBucketAndFileSize(files, 1);
+
+        final File[] buckets = files.listFiles();
         final File[] partFiles = buckets[0].listFiles();
-        assertThat(partFiles).isNotNull();
 
         for (File partFile : partFiles) {
-            assertThat(partFile.length()).isGreaterThan(0);
+            final Reader reader = getOrcReader(partFile);

Review Comment:
   ```suggestion
               try (final Reader reader = getOrcReader(partFile)) {
   ```



##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/NoCloseFSDataOutputStream.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.writer;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A slightly customised clone of {@link 
org.apache.hadoop.fs.FSDataOutputStream}.
+ *
+ * <p>This implementation does not close the underlying flink stream to avoid 
exceptions when
+ * checkpointing.
+ */
+@Internal
+public class NoCloseFSDataOutputStream extends FSDataOutputStream {
+
+    public NoCloseFSDataOutputStream(OutputStream out) throws IOException {
+        super(out);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Don't close the internal stream here to avoid
+        // Stream Closed or ClosedChannelException when Flink performs 
checkpoint.

Review Comment:
   is this tested in any way? how do we close files then to avoid leaking 
resources?



##########
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/util/OrcBulkWriterTestUtil.java:
##########
@@ -95,4 +96,21 @@ private static List<Record> getResults(Reader reader) throws 
IOException {
 
         return results;
     }
+
+    private static void validateBucketAndFileSize(File outputDir, int 
bucketCount) {
+        final File[] buckets = outputDir.listFiles();
+        assertThat(buckets).isNotNull();
+        assertThat(buckets.length).isEqualTo(bucketCount);
+
+        final File[] partFiles = buckets[0].listFiles();
+        assertThat(partFiles.length).isNotNull();
+    }
+
+    private static Reader getOrcReader(File orcFile) throws IOException {

Review Comment:
   nit
   ```suggestion
       private static Reader createOrcReader(File orcFile) throws IOException {
   ```



##########
flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java:
##########
@@ -105,7 +105,9 @@ protected OrcColumnarRowSplitReader createReader(
             throws IOException {
         return OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(
                 new Configuration(),
-                IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + 
i).toArray(String[]::new),
+                IntStream.range(0, fullTypes.length)
+                        .mapToObj(i -> "_col" + i)
+                        .toArray(String[]::new),

Review Comment:
   Why do we need this change? Does the test fail without it?



##########
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java:
##########
@@ -154,11 +154,11 @@ void testReadFileWithSelectFields() throws IOException {
         long totalF0 = 0;
 
         Map<String, Object> partSpec = new HashMap<>();
-        partSpec.put("f1", 1);
-        partSpec.put("f3", 3L);
-        partSpec.put("f5", "f5");
-        partSpec.put("f8", BigDecimal.valueOf(5.333));
-        partSpec.put("f13", "f13");
+        partSpec.put("_col1", 1);

Review Comment:
   Do we need this change? There seem to be a lot of these renames that 
generate unnecessary noise ("hiding what's important in the PR").
   
   Consider removing them or at least moving them into a separate commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to