kl0u commented on a change in pull request #11307: [FLINK-16371] [BulkWriter] 
Fix Hadoop Compression BulkWriter
URL: https://github.com/apache/flink/pull/11307#discussion_r389638907
 
 

 ##########
 File path: 
flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
 ##########
 @@ -59,62 +56,87 @@
 
        @ClassRule
        public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+       private static Configuration confWithCustomCodec;
+
+       @BeforeClass
+       public static void before() {
+               confWithCustomCodec = new Configuration();
+               confWithCustomCodec.set("io.compression.codecs", 
"org.apache.flink.formats.compress.CustomCompressionCodec");
+       }
 
        @Test
-       public void testBzip2CompressByName() throws Exception {
+       public void testBzip2CompressByAlias() throws Exception {
                testCompressByName("Bzip2");
        }
 
        @Test
-       public void testBzip2CompressCodec() throws Exception {
-               BZip2Codec codec = new BZip2Codec();
-               codec.setConf(new Configuration());
-               testCompressCodec(codec);
+       public void testBzip2CompressByName() throws Exception {
+               testCompressByName("Bzip2Codec");
        }
 
        @Test
-       public void testGzipCompressByName() throws Exception {
+       public void testGzipCompressByAlias() throws Exception {
                testCompressByName("Gzip");
        }
 
        @Test
-       public void testGzipCompressCodec() throws Exception {
-               GzipCodec codec = new GzipCodec();
-               codec.setConf(new Configuration());
-               testCompressCodec(codec);
+       public void testGzipCompressByName() throws Exception {
+               testCompressByName("GzipCodec");
+       }
+
+       @Test
+       public void testDeflateCompressByAlias() throws Exception {
+               testCompressByName("deflate");
        }
 
        @Test
-       public void testDeflateCompressByName() throws Exception {
-               DeflateCodec codec = new DeflateCodec();
-               codec.setConf(new Configuration());
-               testCompressCodec(codec);
+       public void testDeflateCompressByClassName() throws Exception {
+               
testCompressByName("org.apache.hadoop.io.compress.DeflateCodec");
        }
 
        @Test
        public void testDefaultCompressByName() throws Exception {
-               DefaultCodec codec = new DefaultCodec();
-               codec.setConf(new Configuration());
-               testCompressCodec(codec);
+               testCompressByName("DefaultCodec");
        }
 
-       private void testCompressByName(String codec) throws Exception {
-               CompressWriterFactory<String> writer = 
CompressWriters.forExtractor(new 
DefaultExtractor<String>()).withHadoopCompression(codec);
-               List<String> lines = Arrays.asList("line1", "line2", "line3");
+       @Test
+       public void testDefaultCompressByClassName() throws Exception {
+               
testCompressByName("org.apache.hadoop.io.compress.DefaultCodec");
+       }
 
-               File directory = prepareCompressedFile(writer, lines);
+       @Test(expected = NullPointerException.class)
+       public void testCompressFailureWithUnknownCodec() throws Exception {
+               testCompressByName("com.bla.bla.UnknownCodec");
+       }
+
+       @Test
+       public void testCustomCompressionCodecByClassName() throws Exception {
+               
testCompressByName("org.apache.flink.formats.compress.CustomCompressionCodec", 
confWithCustomCodec);
+       }
 
-               validateResults(directory, lines, new 
CompressionCodecFactory(new Configuration()).getCodecByName(codec));
+       @Test
+       public void testCustomCompressionCodecByAlias() throws Exception {
+               testCompressByName("CustomCompressionCodec", 
confWithCustomCodec);
+       }
+
+       @Test
+       public void testCustomCompressionCodecByName() throws Exception {
+               testCompressByName("CustomCompression", confWithCustomCodec);
        }
 
-       private void testCompressCodec(CompressionCodec codec) throws Exception 
{
+       private void testCompressByName(String codec) throws Exception {
+               testCompressByName(codec, new Configuration());
+       }
 
-               CompressWriterFactory<String> writer = 
CompressWriters.forExtractor(new 
DefaultExtractor<String>()).withHadoopCompression(codec);
+       private void testCompressByName(String codec, Configuration 
configuration) throws Exception {
+               CompressWriterFactory<String> writer = 
CompressWriters.forExtractor(new DefaultExtractor<String>())
+                       .withHadoopCompression(codec, configuration);
                List<String> lines = Arrays.asList("line1", "line2", "line3");
 
                File directory = prepareCompressedFile(writer, lines);
+               Configuration conf = (configuration != null) ? configuration : 
new Configuration();
 
 Review comment:
   This is not `null` in any test, right? So why not just using the 
`configuration` argument directly?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to