Hi Dawid and Kostas, Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though.
Thanks! Austin [1]: https://github.com/austince/flink-streaming-file-sink-compression On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org> wrote: > Hi Austin, > > Dawid is correct in that you need to enable checkpointing for the > StreamingFileSink to work. > > I hope this solves the problem, > Kostas > > On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz > <dwysakow...@apache.org> wrote: > > > > Hi Austing, > > > > If I am not mistaken the StreamingFileSink by default flushes on > checkpoints. If you don't have checkpoints enabled it might happen that not > all data is flushed. > > > > I think you can also adjust that behavior with: > > > > forBulkFormat(...) > > > > .withRollingPolicy(/* your custom logic */) > > > > I also cc Kostas who should be able to correct me if I am wrong. > > > > Best, > > > > Dawid > > > > On 22/02/2020 01:59, Austin Cawley-Edwards wrote: > > > > Hi there, > > > > Using Flink 1.9.1, trying to write .tgz files with the > StreamingFileSink#BulkWriter. It seems like flushing the output stream > doesn't flush all the data written. I've verified I can create valid files > using the same APIs and data on there own, so thinking it must be something > I'm doing wrong with the bulk format. I'm writing to the local filesystem, > with the `file://` protocol. > > > > For Tar/ Gzipping, I'm using the Apache Commons Compression library, > version 1.20. > > > > Here's a runnable example of the issue: > > > > import org.apache.commons.compress.archivers.tar.TarArchiveEntry; > > import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; > > import > org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; > > import org.apache.flink.api.common.serialization.BulkWriter; > > import org.apache.flink.core.fs.FSDataOutputStream; > > import org.apache.flink.core.fs.Path; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; > > > > import java.io.FileOutputStream; > > import java.io.IOException; > > import java.io.Serializable; > > import java.nio.charset.StandardCharsets; > > > > class Scratch { > > public static class Record implements Serializable { > > private static final long serialVersionUID = 1L; > > > > String id; > > > > public Record() {} > > > > public Record(String id) { > > this.id = id; > > } > > > > public String getId() { > > return id; > > } > > > > public void setId(String id) { > > this.id = id; > > } > > } > > > > public static void main(String[] args) throws Exception { > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > TarArchiveOutputStream taos = new TarArchiveOutputStream(new > GzipCompressorOutputStream(new > FileOutputStream("/home/austin/Downloads/test.tgz"))); > > TarArchiveEntry fileEntry = new > TarArchiveEntry(String.format("%s.txt", "test")); > > String fullText = "hey\nyou\nwork"; > > byte[] fullTextData = fullText.getBytes(); > > fileEntry.setSize(fullTextData.length); > > taos.putArchiveEntry(fileEntry); > > taos.write(fullTextData, 0, fullTextData.length); > > taos.closeArchiveEntry(); > > taos.flush(); > > taos.close(); > > > > StreamingFileSink<Record> textSink = StreamingFileSink > > .forBulkFormat(new > Path("file:///home/austin/Downloads/text-output"), > > new BulkWriter.Factory<Record>() { > > @Override > > public BulkWriter<Record> create(FSDataOutputStream out) > throws IOException { > > final TarArchiveOutputStream compressedOutputStream = > new TarArchiveOutputStream(new GzipCompressorOutputStream(out)); > > > > return new BulkWriter<Record>() { > > @Override > > public void addElement(Record record) throws > IOException { > > TarArchiveEntry fileEntry = new > TarArchiveEntry(String.format("%s.txt", record.id)); > > byte[] fullTextData = > "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); > > fileEntry.setSize(fullTextData.length); > > compressedOutputStream.putArchiveEntry(fileEntry); > > compressedOutputStream.write(fullTextData, 0, > fullTextData.length); > > compressedOutputStream.closeArchiveEntry(); > > } > > > > @Override > > public void flush() throws IOException { > > compressedOutputStream.flush(); > > } > > > > @Override > > public void finish() throws IOException { > > this.flush(); > > } > > }; > > } > > }) > > .withBucketCheckInterval(1000) > > .build(); > > > > env > > .fromElements(new Record("1"), new Record("2")) > > .addSink(textSink) > > .name("Streaming File Sink") > > .uid("streaming-file-sink"); > > env.execute("streaming file sink test"); > > } > > } > > > > > > From the stat/ hex dumps, you can see that the first bits are there, but > are then cut off: > > > > ~/Downloads » stat test.tgz > > File: test.tgz > > Size: 114 Blocks: 8 IO Block: 4096 regular file > > Device: 801h/2049d Inode: 30041077 Links: 1 > > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ austin) > > Access: 2020-02-21 19:30:06.009028283 -0500 > > Modify: 2020-02-21 19:30:44.509424406 -0500 > > Change: 2020-02-21 19:30:44.509424406 -0500 > > Birth: - > > > > ~/Downloads » tar -tvf test.tgz > > -rw-r--r-- 0/0 12 2020-02-21 19:35 test.txt > > > > ~/Downloads » hd test.tgz > > 00000000 1f 8b 08 00 00 00 00 00 00 ff ed cf 31 0e 80 20 > |............1.. | > > 00000010 0c 85 61 66 4f c1 09 cc 2b 14 3c 8f 83 89 89 03 > |..afO...+.<.....| > > 00000020 09 94 a8 b7 77 30 2e ae 8a 2e fd 96 37 f6 af 4c > |....w0......7..L| > > 00000030 45 7a d9 c4 34 04 02 22 b3 c5 e9 be 00 b1 25 1f > |Ez..4.."......%.| > > 00000040 1d 63 f0 81 82 05 91 77 d1 58 b4 8c ba d4 22 63 > |.c.....w.X...."c| > > 00000050 36 78 7c eb fe dc 0b 69 5f 98 a7 bd db 53 ed d6 > |6x|....i_....S..| > > 00000060 94 97 bf 5b 94 52 4a 7d e7 00 4d ce eb e7 00 08 > |...[.RJ}..M.....| > > 00000070 00 00 |..| > > 00000072 > > > > > > > > text-output/37 » tar -xzf part-0-0 > > > > gzip: stdin: unexpected end of file > > tar: Child returned status 1 > > tar: Error is not recoverable: exiting now > > > > text-output/37 » stat part-0-0 > > File: part-0-0 > > Size: 10 Blocks: 8 IO Block: 4096 regular file > > Device: 801h/2049d Inode: 4590487 Links: 1 > > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ austin) > > Access: 2020-02-21 19:33:06.258888702 -0500 > > Modify: 2020-02-21 19:33:04.466870139 -0500 > > Change: 2020-02-21 19:33:05.294878716 -0500 > > Birth: - > > > > text-output/37 » hd part-0-0 > > 00000000 1f 8b 08 00 00 00 00 00 00 ff |..........| > > 0000000a > > > > Is there anything simple I'm missing? > > > > Best, > > Austin >