Hi, This happens because StreamingFileSink does not support a finite input stream. In the docs it's mentioned under "Important Considerations":
[image: image.png] This behaviour often surprises users... There's a FLIP <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs> and an issue <https://issues.apache.org/jira/browse/FLINK-13103> about fixing this. I'm not sure what's the status though, maybe Kostas can share. Thanks, Rafi On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hi Dawid and Kostas, > > Sorry for the late reply + thank you for the troubleshooting. I put > together an example repo that reproduces the issue[1], because I did have > checkpointing enabled in my previous case -- still must be doing something > wrong with that config though. > > Thanks! > Austin > > [1]: https://github.com/austince/flink-streaming-file-sink-compression > > > On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org> > wrote: > >> Hi Austin, >> >> Dawid is correct in that you need to enable checkpointing for the >> StreamingFileSink to work. >> >> I hope this solves the problem, >> Kostas >> >> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz >> <dwysakow...@apache.org> wrote: >> > >> > Hi Austing, >> > >> > If I am not mistaken the StreamingFileSink by default flushes on >> checkpoints. If you don't have checkpoints enabled it might happen that not >> all data is flushed. >> > >> > I think you can also adjust that behavior with: >> > >> > forBulkFormat(...) >> > >> > .withRollingPolicy(/* your custom logic */) >> > >> > I also cc Kostas who should be able to correct me if I am wrong. >> > >> > Best, >> > >> > Dawid >> > >> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote: >> > >> > Hi there, >> > >> > Using Flink 1.9.1, trying to write .tgz files with the >> StreamingFileSink#BulkWriter. It seems like flushing the output stream >> doesn't flush all the data written. I've verified I can create valid files >> using the same APIs and data on there own, so thinking it must be something >> I'm doing wrong with the bulk format. I'm writing to the local filesystem, >> with the `file://` protocol. >> > >> > For Tar/ Gzipping, I'm using the Apache Commons Compression library, >> version 1.20. >> > >> > Here's a runnable example of the issue: >> > >> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry; >> > import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; >> > import >> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; >> > import org.apache.flink.api.common.serialization.BulkWriter; >> > import org.apache.flink.core.fs.FSDataOutputStream; >> > import org.apache.flink.core.fs.Path; >> > import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> > import >> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >> > >> > import java.io.FileOutputStream; >> > import java.io.IOException; >> > import java.io.Serializable; >> > import java.nio.charset.StandardCharsets; >> > >> > class Scratch { >> > public static class Record implements Serializable { >> > private static final long serialVersionUID = 1L; >> > >> > String id; >> > >> > public Record() {} >> > >> > public Record(String id) { >> > this.id = id; >> > } >> > >> > public String getId() { >> > return id; >> > } >> > >> > public void setId(String id) { >> > this.id = id; >> > } >> > } >> > >> > public static void main(String[] args) throws Exception { >> > final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> > >> > TarArchiveOutputStream taos = new TarArchiveOutputStream(new >> GzipCompressorOutputStream(new >> FileOutputStream("/home/austin/Downloads/test.tgz"))); >> > TarArchiveEntry fileEntry = new >> TarArchiveEntry(String.format("%s.txt", "test")); >> > String fullText = "hey\nyou\nwork"; >> > byte[] fullTextData = fullText.getBytes(); >> > fileEntry.setSize(fullTextData.length); >> > taos.putArchiveEntry(fileEntry); >> > taos.write(fullTextData, 0, fullTextData.length); >> > taos.closeArchiveEntry(); >> > taos.flush(); >> > taos.close(); >> > >> > StreamingFileSink<Record> textSink = StreamingFileSink >> > .forBulkFormat(new >> Path("file:///home/austin/Downloads/text-output"), >> > new BulkWriter.Factory<Record>() { >> > @Override >> > public BulkWriter<Record> create(FSDataOutputStream out) >> throws IOException { >> > final TarArchiveOutputStream compressedOutputStream = >> new TarArchiveOutputStream(new GzipCompressorOutputStream(out)); >> > >> > return new BulkWriter<Record>() { >> > @Override >> > public void addElement(Record record) throws >> IOException { >> > TarArchiveEntry fileEntry = new >> TarArchiveEntry(String.format("%s.txt", record.id)); >> > byte[] fullTextData = >> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); >> > fileEntry.setSize(fullTextData.length); >> > compressedOutputStream.putArchiveEntry(fileEntry); >> > compressedOutputStream.write(fullTextData, 0, >> fullTextData.length); >> > compressedOutputStream.closeArchiveEntry(); >> > } >> > >> > @Override >> > public void flush() throws IOException { >> > compressedOutputStream.flush(); >> > } >> > >> > @Override >> > public void finish() throws IOException { >> > this.flush(); >> > } >> > }; >> > } >> > }) >> > .withBucketCheckInterval(1000) >> > .build(); >> > >> > env >> > .fromElements(new Record("1"), new Record("2")) >> > .addSink(textSink) >> > .name("Streaming File Sink") >> > .uid("streaming-file-sink"); >> > env.execute("streaming file sink test"); >> > } >> > } >> > >> > >> > From the stat/ hex dumps, you can see that the first bits are there, >> but are then cut off: >> > >> > ~/Downloads » stat test.tgz >> > File: test.tgz >> > Size: 114 Blocks: 8 IO Block: 4096 regular file >> > Device: 801h/2049d Inode: 30041077 Links: 1 >> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ austin) >> > Access: 2020-02-21 19:30:06.009028283 -0500 >> > Modify: 2020-02-21 19:30:44.509424406 -0500 >> > Change: 2020-02-21 19:30:44.509424406 -0500 >> > Birth: - >> > >> > ~/Downloads » tar -tvf test.tgz >> > -rw-r--r-- 0/0 12 2020-02-21 19:35 test.txt >> > >> > ~/Downloads » hd test.tgz >> > 00000000 1f 8b 08 00 00 00 00 00 00 ff ed cf 31 0e 80 20 >> |............1.. | >> > 00000010 0c 85 61 66 4f c1 09 cc 2b 14 3c 8f 83 89 89 03 >> |..afO...+.<.....| >> > 00000020 09 94 a8 b7 77 30 2e ae 8a 2e fd 96 37 f6 af 4c >> |....w0......7..L| >> > 00000030 45 7a d9 c4 34 04 02 22 b3 c5 e9 be 00 b1 25 1f >> |Ez..4.."......%.| >> > 00000040 1d 63 f0 81 82 05 91 77 d1 58 b4 8c ba d4 22 63 >> |.c.....w.X...."c| >> > 00000050 36 78 7c eb fe dc 0b 69 5f 98 a7 bd db 53 ed d6 >> |6x|....i_....S..| >> > 00000060 94 97 bf 5b 94 52 4a 7d e7 00 4d ce eb e7 00 08 >> |...[.RJ}..M.....| >> > 00000070 00 00 |..| >> > 00000072 >> > >> > >> > >> > text-output/37 » tar -xzf part-0-0 >> > >> > gzip: stdin: unexpected end of file >> > tar: Child returned status 1 >> > tar: Error is not recoverable: exiting now >> > >> > text-output/37 » stat part-0-0 >> > File: part-0-0 >> > Size: 10 Blocks: 8 IO Block: 4096 regular file >> > Device: 801h/2049d Inode: 4590487 Links: 1 >> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ austin) >> > Access: 2020-02-21 19:33:06.258888702 -0500 >> > Modify: 2020-02-21 19:33:04.466870139 -0500 >> > Change: 2020-02-21 19:33:05.294878716 -0500 >> > Birth: - >> > >> > text-output/37 » hd part-0-0 >> > 00000000 1f 8b 08 00 00 00 00 00 00 ff |..........| >> > 0000000a >> > >> > Is there anything simple I'm missing? >> > >> > Best, >> > Austin >> >