[ https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941797#comment-15941797 ]
Luke Hutchison commented on FLINK-6185: --------------------------------------- Here's my simple gzip OutputFormat though, in case anyone else is looking for a quick solution: {code} import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.util.zip.GZIPOutputStream; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; public class GZippableTextOutputFormat<T> implements OutputFormat<T> { private static final long serialVersionUID = 1L; private File file; private PrintWriter writer; private boolean gzip; public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean gzip) { if (writeMode != WriteMode.OVERWRITE) { // Make this explicit, since we're about to overwrite the file throw new IllegalArgumentException("writeMode must be WriteMode.OVERWRITE"); } this.file = file.getPath().endsWith(".gz") == gzip ? file : new File(file.getPath() + ".gz"); this.gzip = gzip; } @Override public void open(int taskNumber, int numTasks) throws IOException { if (taskNumber < 0) { throw new IllegalArgumentException("Invalid task number"); } if (numTasks == 0 || numTasks > 1) { throw new IllegalArgumentException( "must call setParallelism(1) to use " + ZippedJSONCollectionOutputFormat.class.getName()); } try { writer = gzip ? new PrintWriter(new GZIPOutputStream(new FileOutputStream(file))) : new PrintWriter(new FileOutputStream(file)); } catch (Exception e) { close(); throw new RuntimeException(e); } } public File getFile() { return file; } @Override public void configure(Configuration parameters) { } @Override public void writeRecord(T record) throws IOException { writer.println(record.toString()); } @Override public void close() throws IOException { writer.close(); } @Override public String toString() { return this.getClass().getSimpleName() + "(" + file + ")"; } } {code} > Output writers and OutputFormats need to support compression > ------------------------------------------------------------ > > Key: FLINK-6185 > URL: https://issues.apache.org/jira/browse/FLINK-6185 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.2.0 > Reporter: Luke Hutchison > Priority: Minor > > File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such > as {{FileOutputFormat}} and its subclasses, and methods such as > {{DataSet#writeAsText()}}) need the ability to transparently decompress and > compress files. Primarily gzip would be useful, but it would be nice if this > were pluggable to support bzip2, xz, etc. > There could be options for autodetect (based on file extension and/or file > content), which could be the default, as well as no compression or a selected > compression method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)