Hi Austing, If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
I think you can also adjust that behavior with: forBulkFormat(...) .withRollingPolicy(/* your custom logic */) I also cc Kostas who should be able to correct me if I am wrong. Best, Dawid On 22/02/2020 01:59, Austin Cawley-Edwards wrote: > Hi there, > > Using Flink 1.9.1, trying to write .tgz files with the > StreamingFileSink#BulkWriter. It seems like flushing the output stream > doesn't flush all the data written. I've verified I can create valid > files using the same APIs and data on there own, so thinking it must > be something I'm doing wrong with the bulk format. I'm writing to the > local filesystem, with the `file://` protocol. > > For Tar/ Gzipping, I'm using the Apache Commons Compression library, > version 1.20. > > Here's a runnable example of the issue: > > import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import > org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import > org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; > import org.apache.flink.api.common.serialization.BulkWriter; import > org.apache.flink.core.fs.FSDataOutputStream; import > org.apache.flink.core.fs.Path; import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; > import java.io.FileOutputStream; import java.io.IOException; import > java.io.Serializable; import java.nio.charset.StandardCharsets; class Scratch > { > public static class Record implements Serializable { > private static final long serialVersionUID = 1L; String id; public > Record() {} > > public Record(String id) { > this.id = id; } > > public String getId() { > return id; } > > public void setId(String id) { > this.id = id; } > } > > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); TarArchiveOutputStream > taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new > FileOutputStream("/home/austin/Downloads/test.tgz"))); TarArchiveEntry > fileEntry = new TarArchiveEntry(String.format("%s.txt", "test")); String > fullText = "hey\nyou\nwork"; byte[] fullTextData = fullText.getBytes(); > fileEntry.setSize(fullTextData.length); taos.putArchiveEntry(fileEntry); > taos.write(fullTextData, 0, fullTextData.length); taos.closeArchiveEntry(); > taos.flush(); taos.close(); StreamingFileSink<Record> textSink = > StreamingFileSink > .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"), > new BulkWriter.Factory<Record>() { > @Override public BulkWriter<Record> create(FSDataOutputStream > out) throws IOException { > final TarArchiveOutputStream compressedOutputStream = new > TarArchiveOutputStream(new GzipCompressorOutputStream(out)); return new > BulkWriter<Record>() { > @Override public void addElement(Record record) throws > IOException { > TarArchiveEntry fileEntry = new > TarArchiveEntry(String.format("%s.txt", record.id)); byte[] fullTextData = > "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); > fileEntry.setSize(fullTextData.length); > compressedOutputStream.putArchiveEntry(fileEntry); > compressedOutputStream.write(fullTextData, 0, fullTextData.length); > compressedOutputStream.closeArchiveEntry(); } > > @Override public void flush() throws IOException { > compressedOutputStream.flush(); } > > @Override public void finish() throws IOException { > this.flush(); } > }; } > }) > .withBucketCheckInterval(1000) > .build(); env > .fromElements(new Record("1"), new Record("2")) > .addSink(textSink) > .name("Streaming File Sink") > .uid("streaming-file-sink"); env.execute("streaming file sink test"); > } > } > > From the stat/ hex dumps, you can see that the first bits are there, > but are then cut off: > > ~/Downloads » stat test.tgz > File: test.tgz > Size: 114 Blocks: 8 IO Block: 4096 regular file > Device: 801h/2049d Inode: 30041077 Links: 1 > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ austin) > Access: 2020-02-21 19:30:06.009028283 -0500 > Modify: 2020-02-21 19:30:44.509424406 -0500 > Change: 2020-02-21 19:30:44.509424406 -0500 > Birth: - > > ~/Downloads » tar -tvf test.tgz > -rw-r--r-- 0/0 12 2020-02-21 19:35 test.txt > > ~/Downloads » hd test.tgz > 00000000 1f 8b 08 00 00 00 00 00 00 ff ed cf 31 0e 80 20 > |............1.. | > 00000010 0c 85 61 66 4f c1 09 cc 2b 14 3c 8f 83 89 89 03 > |..afO...+.<.....| > 00000020 09 94 a8 b7 77 30 2e ae 8a 2e fd 96 37 f6 af 4c > |....w0......7..L| > 00000030 45 7a d9 c4 34 04 02 22 b3 c5 e9 be 00 b1 25 1f > |Ez..4.."......%.| > 00000040 1d 63 f0 81 82 05 91 77 d1 58 b4 8c ba d4 22 63 > |.c.....w.X...."c| > 00000050 36 78 7c eb fe dc 0b 69 5f 98 a7 bd db 53 ed d6 > |6x|....i_....S..| > 00000060 94 97 bf 5b 94 52 4a 7d e7 00 4d ce eb e7 00 08 > |...[.RJ}..M.....| > 00000070 00 00 |..| > 00000072 > > > > text-output/37 » tar -xzf part-0-0 > > gzip: stdin: unexpected end of file > tar: Child returned status 1 > tar: Error is not recoverable: exiting now > > text-output/37 » stat part-0-0 > File: part-0-0 > Size: 10 Blocks: 8 IO Block: 4096 regular file > Device: 801h/2049d Inode: 4590487 Links: 1 > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ austin) > Access: 2020-02-21 19:33:06.258888702 -0500 > Modify: 2020-02-21 19:33:04.466870139 -0500 > Change: 2020-02-21 19:33:05.294878716 -0500 > Birth: - > > text-output/37 » hd part-0-0 > 00000000 1f 8b 08 00 00 00 00 00 00 ff |..........| > 0000000a > > Is there anything simple I'm missing? > > Best, > Austin
signature.asc
Description: OpenPGP digital signature