Hi Austin and Rafi, @Rafi Thanks for providing the pointers! Unfortunately there is no progress on the FLIP (or the issue).
@ Austin In the meantime, what you could do --assuming that your input is bounded -- you could simply not stop the job after the whole input is processed, then wait until the output is committed, and then cancel the job. I know and I agree that this is not an elegant solution but it is a temporary workaround. Hopefully the FLIP and related issue is going to be prioritised soon. Cheers, Kostas On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi, > > This happens because StreamingFileSink does not support a finite input > stream. > In the docs it's mentioned under "Important Considerations": > > [image: image.png] > > This behaviour often surprises users... > > There's a FLIP > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs> > and > an issue <https://issues.apache.org/jira/browse/FLINK-13103> about fixing > this. I'm not sure what's the status though, maybe Kostas can share. > > Thanks, > Rafi > > > On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Hi Dawid and Kostas, >> >> Sorry for the late reply + thank you for the troubleshooting. I put >> together an example repo that reproduces the issue[1], because I did have >> checkpointing enabled in my previous case -- still must be doing something >> wrong with that config though. >> >> Thanks! >> Austin >> >> [1]: https://github.com/austince/flink-streaming-file-sink-compression >> >> >> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org> >> wrote: >> >>> Hi Austin, >>> >>> Dawid is correct in that you need to enable checkpointing for the >>> StreamingFileSink to work. >>> >>> I hope this solves the problem, >>> Kostas >>> >>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz >>> <dwysakow...@apache.org> wrote: >>> > >>> > Hi Austing, >>> > >>> > If I am not mistaken the StreamingFileSink by default flushes on >>> checkpoints. If you don't have checkpoints enabled it might happen that not >>> all data is flushed. >>> > >>> > I think you can also adjust that behavior with: >>> > >>> > forBulkFormat(...) >>> > >>> > .withRollingPolicy(/* your custom logic */) >>> > >>> > I also cc Kostas who should be able to correct me if I am wrong. >>> > >>> > Best, >>> > >>> > Dawid >>> > >>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote: >>> > >>> > Hi there, >>> > >>> > Using Flink 1.9.1, trying to write .tgz files with the >>> StreamingFileSink#BulkWriter. It seems like flushing the output stream >>> doesn't flush all the data written. I've verified I can create valid files >>> using the same APIs and data on there own, so thinking it must be something >>> I'm doing wrong with the bulk format. I'm writing to the local filesystem, >>> with the `file://` protocol. >>> > >>> > For Tar/ Gzipping, I'm using the Apache Commons Compression library, >>> version 1.20. >>> > >>> > Here's a runnable example of the issue: >>> > >>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry; >>> > import >>> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; >>> > import >>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; >>> > import org.apache.flink.api.common.serialization.BulkWriter; >>> > import org.apache.flink.core.fs.FSDataOutputStream; >>> > import org.apache.flink.core.fs.Path; >>> > import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> > import >>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >>> > >>> > import java.io.FileOutputStream; >>> > import java.io.IOException; >>> > import java.io.Serializable; >>> > import java.nio.charset.StandardCharsets; >>> > >>> > class Scratch { >>> > public static class Record implements Serializable { >>> > private static final long serialVersionUID = 1L; >>> > >>> > String id; >>> > >>> > public Record() {} >>> > >>> > public Record(String id) { >>> > this.id = id; >>> > } >>> > >>> > public String getId() { >>> > return id; >>> > } >>> > >>> > public void setId(String id) { >>> > this.id = id; >>> > } >>> > } >>> > >>> > public static void main(String[] args) throws Exception { >>> > final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> > >>> > TarArchiveOutputStream taos = new TarArchiveOutputStream(new >>> GzipCompressorOutputStream(new >>> FileOutputStream("/home/austin/Downloads/test.tgz"))); >>> > TarArchiveEntry fileEntry = new >>> TarArchiveEntry(String.format("%s.txt", "test")); >>> > String fullText = "hey\nyou\nwork"; >>> > byte[] fullTextData = fullText.getBytes(); >>> > fileEntry.setSize(fullTextData.length); >>> > taos.putArchiveEntry(fileEntry); >>> > taos.write(fullTextData, 0, fullTextData.length); >>> > taos.closeArchiveEntry(); >>> > taos.flush(); >>> > taos.close(); >>> > >>> > StreamingFileSink<Record> textSink = StreamingFileSink >>> > .forBulkFormat(new >>> Path("file:///home/austin/Downloads/text-output"), >>> > new BulkWriter.Factory<Record>() { >>> > @Override >>> > public BulkWriter<Record> create(FSDataOutputStream out) >>> throws IOException { >>> > final TarArchiveOutputStream compressedOutputStream = >>> new TarArchiveOutputStream(new GzipCompressorOutputStream(out)); >>> > >>> > return new BulkWriter<Record>() { >>> > @Override >>> > public void addElement(Record record) throws >>> IOException { >>> > TarArchiveEntry fileEntry = new >>> TarArchiveEntry(String.format("%s.txt", record.id)); >>> > byte[] fullTextData = >>> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); >>> > fileEntry.setSize(fullTextData.length); >>> > compressedOutputStream.putArchiveEntry(fileEntry); >>> > compressedOutputStream.write(fullTextData, 0, >>> fullTextData.length); >>> > compressedOutputStream.closeArchiveEntry(); >>> > } >>> > >>> > @Override >>> > public void flush() throws IOException { >>> > compressedOutputStream.flush(); >>> > } >>> > >>> > @Override >>> > public void finish() throws IOException { >>> > this.flush(); >>> > } >>> > }; >>> > } >>> > }) >>> > .withBucketCheckInterval(1000) >>> > .build(); >>> > >>> > env >>> > .fromElements(new Record("1"), new Record("2")) >>> > .addSink(textSink) >>> > .name("Streaming File Sink") >>> > .uid("streaming-file-sink"); >>> > env.execute("streaming file sink test"); >>> > } >>> > } >>> > >>> > >>> > From the stat/ hex dumps, you can see that the first bits are there, >>> but are then cut off: >>> > >>> > ~/Downloads » stat test.tgz >>> > File: test.tgz >>> > Size: 114 Blocks: 8 IO Block: 4096 regular file >>> > Device: 801h/2049d Inode: 30041077 Links: 1 >>> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ >>> austin) >>> > Access: 2020-02-21 19:30:06.009028283 -0500 >>> > Modify: 2020-02-21 19:30:44.509424406 -0500 >>> > Change: 2020-02-21 19:30:44.509424406 -0500 >>> > Birth: - >>> > >>> > ~/Downloads » tar -tvf test.tgz >>> > -rw-r--r-- 0/0 12 2020-02-21 19:35 test.txt >>> > >>> > ~/Downloads » hd test.tgz >>> > 00000000 1f 8b 08 00 00 00 00 00 00 ff ed cf 31 0e 80 20 >>> |............1.. | >>> > 00000010 0c 85 61 66 4f c1 09 cc 2b 14 3c 8f 83 89 89 03 >>> |..afO...+.<.....| >>> > 00000020 09 94 a8 b7 77 30 2e ae 8a 2e fd 96 37 f6 af 4c >>> |....w0......7..L| >>> > 00000030 45 7a d9 c4 34 04 02 22 b3 c5 e9 be 00 b1 25 1f >>> |Ez..4.."......%.| >>> > 00000040 1d 63 f0 81 82 05 91 77 d1 58 b4 8c ba d4 22 63 >>> |.c.....w.X...."c| >>> > 00000050 36 78 7c eb fe dc 0b 69 5f 98 a7 bd db 53 ed d6 >>> |6x|....i_....S..| >>> > 00000060 94 97 bf 5b 94 52 4a 7d e7 00 4d ce eb e7 00 08 >>> |...[.RJ}..M.....| >>> > 00000070 00 00 |..| >>> > 00000072 >>> > >>> > >>> > >>> > text-output/37 » tar -xzf part-0-0 >>> > >>> > gzip: stdin: unexpected end of file >>> > tar: Child returned status 1 >>> > tar: Error is not recoverable: exiting now >>> > >>> > text-output/37 » stat part-0-0 >>> > File: part-0-0 >>> > Size: 10 Blocks: 8 IO Block: 4096 regular >>> file >>> > Device: 801h/2049d Inode: 4590487 Links: 1 >>> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ >>> austin) >>> > Access: 2020-02-21 19:33:06.258888702 -0500 >>> > Modify: 2020-02-21 19:33:04.466870139 -0500 >>> > Change: 2020-02-21 19:33:05.294878716 -0500 >>> > Birth: - >>> > >>> > text-output/37 » hd part-0-0 >>> > 00000000 1f 8b 08 00 00 00 00 00 00 ff >>> |..........| >>> > 0000000a >>> > >>> > Is there anything simple I'm missing? >>> > >>> > Best, >>> > Austin >>> >>