Hi Austing,

If I am not mistaken the StreamingFileSink by default flushes on
checkpoints. If you don't have checkpoints enabled it might happen that
not all data is flushed.

I think you can also adjust that behavior with:

forBulkFormat(...)

.withRollingPolicy(/* your custom logic */)

I also cc Kostas who should be able to correct me if I am wrong.

Best,

Dawid

On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the
> StreamingFileSink#BulkWriter. It seems like flushing the output stream
> doesn't flush all the data written. I've verified I can create valid
> files using the same APIs and data on there own, so thinking it must
> be something I'm doing wrong with the bulk format. I'm writing to the
> local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library,
> version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import 
> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import 
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; 
> import org.apache.flink.api.common.serialization.BulkWriter; import 
> org.apache.flink.core.fs.FSDataOutputStream; import 
> org.apache.flink.core.fs.Path; import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; 
> import java.io.FileOutputStream; import java.io.IOException; import 
> java.io.Serializable; import java.nio.charset.StandardCharsets; class Scratch 
> {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L; String id; public 
> Record() {}
>
>     public Record(String id) {
>       this.id = id; }
>
>     public String getId() {
>       return id; }
>
>     public void setId(String id) {
>       this.id = id; }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(); TarArchiveOutputStream 
> taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new 
> FileOutputStream("/home/austin/Downloads/test.tgz"))); TarArchiveEntry 
> fileEntry = new TarArchiveEntry(String.format("%s.txt", "test")); String 
> fullText = "hey\nyou\nwork"; byte[] fullTextData = fullText.getBytes(); 
> fileEntry.setSize(fullTextData.length); taos.putArchiveEntry(fileEntry); 
> taos.write(fullTextData, 0, fullTextData.length); taos.closeArchiveEntry(); 
> taos.flush(); taos.close(); StreamingFileSink<Record> textSink = 
> StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"), 
> new BulkWriter.Factory<Record>() {
>               @Override public BulkWriter<Record> create(FSDataOutputStream 
> out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new 
> TarArchiveOutputStream(new GzipCompressorOutputStream(out)); return new 
> BulkWriter<Record>() {
>                   @Override public void addElement(Record record) throws 
> IOException {
>                     TarArchiveEntry fileEntry = new 
> TarArchiveEntry(String.format("%s.txt", record.id)); byte[] fullTextData = 
> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); 
> fileEntry.setSize(fullTextData.length); 
> compressedOutputStream.putArchiveEntry(fileEntry); 
> compressedOutputStream.write(fullTextData, 0, fullTextData.length); 
> compressedOutputStream.closeArchiveEntry(); }
>
>                   @Override public void flush() throws IOException {
>                     compressedOutputStream.flush(); }
>
>                   @Override public void finish() throws IOException {
>                     this.flush(); }
>                 }; }
>             })
>         .withBucketCheckInterval(1000)
>         .build(); env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink"); env.execute("streaming file sink test"); 
> }
> }
>
> From the stat/ hex dumps, you can see that the first bits are there,
> but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20
>  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03
>  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c
>  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f
>  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63
>  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6
>  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08
>  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to