Hi all, Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas -- strange though, as I wasn't using a bounded source when I first ran into this issue. I have updated the example repo to use an unbounded source[1], and the same file corruption problems remain.
Anything else I could be doing wrong with the compression stream? Thanks again, Austin [1]: https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <kklou...@apache.org> wrote: > Hi Austin and Rafi, > > @Rafi Thanks for providing the pointers! > Unfortunately there is no progress on the FLIP (or the issue). > > @ Austin In the meantime, what you could do --assuming that your input is > bounded -- you could simply not stop the job after the whole input is > processed, then wait until the output is committed, and then cancel the > job. I know and I agree that this is not an elegant solution but it is a > temporary workaround. > > Hopefully the FLIP and related issue is going to be prioritised soon. > > Cheers, > Kostas > > On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <rafi.ar...@gmail.com> wrote: > >> Hi, >> >> This happens because StreamingFileSink does not support a finite input >> stream. >> In the docs it's mentioned under "Important Considerations": >> >> [image: image.png] >> >> This behaviour often surprises users... >> >> There's a FLIP >> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs> >> and >> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about >> fixing this. I'm not sure what's the status though, maybe Kostas can share. >> >> Thanks, >> Rafi >> >> >> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards < >> austin.caw...@gmail.com> wrote: >> >>> Hi Dawid and Kostas, >>> >>> Sorry for the late reply + thank you for the troubleshooting. I put >>> together an example repo that reproduces the issue[1], because I did have >>> checkpointing enabled in my previous case -- still must be doing something >>> wrong with that config though. >>> >>> Thanks! >>> Austin >>> >>> [1]: https://github.com/austince/flink-streaming-file-sink-compression >>> >>> >>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org> >>> wrote: >>> >>>> Hi Austin, >>>> >>>> Dawid is correct in that you need to enable checkpointing for the >>>> StreamingFileSink to work. >>>> >>>> I hope this solves the problem, >>>> Kostas >>>> >>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz >>>> <dwysakow...@apache.org> wrote: >>>> > >>>> > Hi Austing, >>>> > >>>> > If I am not mistaken the StreamingFileSink by default flushes on >>>> checkpoints. If you don't have checkpoints enabled it might happen that not >>>> all data is flushed. >>>> > >>>> > I think you can also adjust that behavior with: >>>> > >>>> > forBulkFormat(...) >>>> > >>>> > .withRollingPolicy(/* your custom logic */) >>>> > >>>> > I also cc Kostas who should be able to correct me if I am wrong. >>>> > >>>> > Best, >>>> > >>>> > Dawid >>>> > >>>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote: >>>> > >>>> > Hi there, >>>> > >>>> > Using Flink 1.9.1, trying to write .tgz files with the >>>> StreamingFileSink#BulkWriter. It seems like flushing the output stream >>>> doesn't flush all the data written. I've verified I can create valid files >>>> using the same APIs and data on there own, so thinking it must be something >>>> I'm doing wrong with the bulk format. I'm writing to the local filesystem, >>>> with the `file://` protocol. >>>> > >>>> > For Tar/ Gzipping, I'm using the Apache Commons Compression library, >>>> version 1.20. >>>> > >>>> > Here's a runnable example of the issue: >>>> > >>>> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry; >>>> > import >>>> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; >>>> > import >>>> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; >>>> > import org.apache.flink.api.common.serialization.BulkWriter; >>>> > import org.apache.flink.core.fs.FSDataOutputStream; >>>> > import org.apache.flink.core.fs.Path; >>>> > import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> > import >>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >>>> > >>>> > import java.io.FileOutputStream; >>>> > import java.io.IOException; >>>> > import java.io.Serializable; >>>> > import java.nio.charset.StandardCharsets; >>>> > >>>> > class Scratch { >>>> > public static class Record implements Serializable { >>>> > private static final long serialVersionUID = 1L; >>>> > >>>> > String id; >>>> > >>>> > public Record() {} >>>> > >>>> > public Record(String id) { >>>> > this.id = id; >>>> > } >>>> > >>>> > public String getId() { >>>> > return id; >>>> > } >>>> > >>>> > public void setId(String id) { >>>> > this.id = id; >>>> > } >>>> > } >>>> > >>>> > public static void main(String[] args) throws Exception { >>>> > final StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> > >>>> > TarArchiveOutputStream taos = new TarArchiveOutputStream(new >>>> GzipCompressorOutputStream(new >>>> FileOutputStream("/home/austin/Downloads/test.tgz"))); >>>> > TarArchiveEntry fileEntry = new >>>> TarArchiveEntry(String.format("%s.txt", "test")); >>>> > String fullText = "hey\nyou\nwork"; >>>> > byte[] fullTextData = fullText.getBytes(); >>>> > fileEntry.setSize(fullTextData.length); >>>> > taos.putArchiveEntry(fileEntry); >>>> > taos.write(fullTextData, 0, fullTextData.length); >>>> > taos.closeArchiveEntry(); >>>> > taos.flush(); >>>> > taos.close(); >>>> > >>>> > StreamingFileSink<Record> textSink = StreamingFileSink >>>> > .forBulkFormat(new >>>> Path("file:///home/austin/Downloads/text-output"), >>>> > new BulkWriter.Factory<Record>() { >>>> > @Override >>>> > public BulkWriter<Record> create(FSDataOutputStream >>>> out) throws IOException { >>>> > final TarArchiveOutputStream compressedOutputStream = >>>> new TarArchiveOutputStream(new GzipCompressorOutputStream(out)); >>>> > >>>> > return new BulkWriter<Record>() { >>>> > @Override >>>> > public void addElement(Record record) throws >>>> IOException { >>>> > TarArchiveEntry fileEntry = new >>>> TarArchiveEntry(String.format("%s.txt", record.id)); >>>> > byte[] fullTextData = >>>> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); >>>> > fileEntry.setSize(fullTextData.length); >>>> > compressedOutputStream.putArchiveEntry(fileEntry); >>>> > compressedOutputStream.write(fullTextData, 0, >>>> fullTextData.length); >>>> > compressedOutputStream.closeArchiveEntry(); >>>> > } >>>> > >>>> > @Override >>>> > public void flush() throws IOException { >>>> > compressedOutputStream.flush(); >>>> > } >>>> > >>>> > @Override >>>> > public void finish() throws IOException { >>>> > this.flush(); >>>> > } >>>> > }; >>>> > } >>>> > }) >>>> > .withBucketCheckInterval(1000) >>>> > .build(); >>>> > >>>> > env >>>> > .fromElements(new Record("1"), new Record("2")) >>>> > .addSink(textSink) >>>> > .name("Streaming File Sink") >>>> > .uid("streaming-file-sink"); >>>> > env.execute("streaming file sink test"); >>>> > } >>>> > } >>>> > >>>> > >>>> > From the stat/ hex dumps, you can see that the first bits are there, >>>> but are then cut off: >>>> > >>>> > ~/Downloads » stat test.tgz >>>> > File: test.tgz >>>> > Size: 114 Blocks: 8 IO Block: 4096 regular file >>>> > Device: 801h/2049d Inode: 30041077 Links: 1 >>>> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ >>>> austin) >>>> > Access: 2020-02-21 19:30:06.009028283 -0500 >>>> > Modify: 2020-02-21 19:30:44.509424406 -0500 >>>> > Change: 2020-02-21 19:30:44.509424406 -0500 >>>> > Birth: - >>>> > >>>> > ~/Downloads » tar -tvf test.tgz >>>> > -rw-r--r-- 0/0 12 2020-02-21 19:35 test.txt >>>> > >>>> > ~/Downloads » hd test.tgz >>>> > 00000000 1f 8b 08 00 00 00 00 00 00 ff ed cf 31 0e 80 20 >>>> |............1.. | >>>> > 00000010 0c 85 61 66 4f c1 09 cc 2b 14 3c 8f 83 89 89 03 >>>> |..afO...+.<.....| >>>> > 00000020 09 94 a8 b7 77 30 2e ae 8a 2e fd 96 37 f6 af 4c >>>> |....w0......7..L| >>>> > 00000030 45 7a d9 c4 34 04 02 22 b3 c5 e9 be 00 b1 25 1f >>>> |Ez..4.."......%.| >>>> > 00000040 1d 63 f0 81 82 05 91 77 d1 58 b4 8c ba d4 22 63 >>>> |.c.....w.X...."c| >>>> > 00000050 36 78 7c eb fe dc 0b 69 5f 98 a7 bd db 53 ed d6 >>>> |6x|....i_....S..| >>>> > 00000060 94 97 bf 5b 94 52 4a 7d e7 00 4d ce eb e7 00 08 >>>> |...[.RJ}..M.....| >>>> > 00000070 00 00 |..| >>>> > 00000072 >>>> > >>>> > >>>> > >>>> > text-output/37 » tar -xzf part-0-0 >>>> > >>>> > gzip: stdin: unexpected end of file >>>> > tar: Child returned status 1 >>>> > tar: Error is not recoverable: exiting now >>>> > >>>> > text-output/37 » stat part-0-0 >>>> > File: part-0-0 >>>> > Size: 10 Blocks: 8 IO Block: 4096 regular >>>> file >>>> > Device: 801h/2049d Inode: 4590487 Links: 1 >>>> > Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ >>>> austin) >>>> > Access: 2020-02-21 19:33:06.258888702 -0500 >>>> > Modify: 2020-02-21 19:33:04.466870139 -0500 >>>> > Change: 2020-02-21 19:33:05.294878716 -0500 >>>> > Birth: - >>>> > >>>> > text-output/37 » hd part-0-0 >>>> > 00000000 1f 8b 08 00 00 00 00 00 00 ff >>>> |..........| >>>> > 0000000a >>>> > >>>> > Is there anything simple I'm missing? >>>> > >>>> > Best, >>>> > Austin >>>> >>>