kl0u commented on a change in pull request #11307: [FLINK-16371] [BulkWriter] Fix Hadoop Compression BulkWriter URL: https://github.com/apache/flink/pull/11307#discussion_r389638907
########## File path: flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java ########## @@ -59,62 +56,87 @@ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + private static Configuration confWithCustomCodec; + + @BeforeClass + public static void before() { + confWithCustomCodec = new Configuration(); + confWithCustomCodec.set("io.compression.codecs", "org.apache.flink.formats.compress.CustomCompressionCodec"); + } @Test - public void testBzip2CompressByName() throws Exception { + public void testBzip2CompressByAlias() throws Exception { testCompressByName("Bzip2"); } @Test - public void testBzip2CompressCodec() throws Exception { - BZip2Codec codec = new BZip2Codec(); - codec.setConf(new Configuration()); - testCompressCodec(codec); + public void testBzip2CompressByName() throws Exception { + testCompressByName("Bzip2Codec"); } @Test - public void testGzipCompressByName() throws Exception { + public void testGzipCompressByAlias() throws Exception { testCompressByName("Gzip"); } @Test - public void testGzipCompressCodec() throws Exception { - GzipCodec codec = new GzipCodec(); - codec.setConf(new Configuration()); - testCompressCodec(codec); + public void testGzipCompressByName() throws Exception { + testCompressByName("GzipCodec"); + } + + @Test + public void testDeflateCompressByAlias() throws Exception { + testCompressByName("deflate"); } @Test - public void testDeflateCompressByName() throws Exception { - DeflateCodec codec = new DeflateCodec(); - codec.setConf(new Configuration()); - testCompressCodec(codec); + public void testDeflateCompressByClassName() throws Exception { + testCompressByName("org.apache.hadoop.io.compress.DeflateCodec"); } @Test public void testDefaultCompressByName() throws Exception { - DefaultCodec codec = new DefaultCodec(); - codec.setConf(new Configuration()); - testCompressCodec(codec); + testCompressByName("DefaultCodec"); } - private void testCompressByName(String codec) throws Exception { - CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(codec); - List<String> lines = Arrays.asList("line1", "line2", "line3"); + @Test + public void testDefaultCompressByClassName() throws Exception { + testCompressByName("org.apache.hadoop.io.compress.DefaultCodec"); + } - File directory = prepareCompressedFile(writer, lines); + @Test(expected = NullPointerException.class) + public void testCompressFailureWithUnknownCodec() throws Exception { + testCompressByName("com.bla.bla.UnknownCodec"); + } + + @Test + public void testCustomCompressionCodecByClassName() throws Exception { + testCompressByName("org.apache.flink.formats.compress.CustomCompressionCodec", confWithCustomCodec); + } - validateResults(directory, lines, new CompressionCodecFactory(new Configuration()).getCodecByName(codec)); + @Test + public void testCustomCompressionCodecByAlias() throws Exception { + testCompressByName("CustomCompressionCodec", confWithCustomCodec); + } + + @Test + public void testCustomCompressionCodecByName() throws Exception { + testCompressByName("CustomCompression", confWithCustomCodec); } - private void testCompressCodec(CompressionCodec codec) throws Exception { + private void testCompressByName(String codec) throws Exception { + testCompressByName(codec, new Configuration()); + } - CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(codec); + private void testCompressByName(String codec, Configuration configuration) throws Exception { + CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()) + .withHadoopCompression(codec, configuration); List<String> lines = Arrays.asList("line1", "line2", "line3"); File directory = prepareCompressedFile(writer, lines); + Configuration conf = (configuration != null) ? configuration : new Configuration(); Review comment: This is not `null` in any test, right? So why not just using the `configuration` argument directly? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services