Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put
together an example repo that reproduces the issue[1], because I did have
checkpointing enabled in my previous case -- still must be doing something
wrong with that config though.

Thanks!
Austin

[1]: https://github.com/austince/flink-streaming-file-sink-compression


On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <kklou...@apache.org> wrote:

> Hi Austin,
>
> Dawid is correct in that you need to enable checkpointing for the
> StreamingFileSink to work.
>
> I hope this solves the problem,
> Kostas
>
> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
> <dwysakow...@apache.org> wrote:
> >
> > Hi Austing,
> >
> > If I am not mistaken the StreamingFileSink by default flushes on
> checkpoints. If you don't have checkpoints enabled it might happen that not
> all data is flushed.
> >
> > I think you can also adjust that behavior with:
> >
> > forBulkFormat(...)
> >
> > .withRollingPolicy(/* your custom logic */)
> >
> > I also cc Kostas who should be able to correct me if I am wrong.
> >
> > Best,
> >
> > Dawid
> >
> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
> >
> > Hi there,
> >
> > Using Flink 1.9.1, trying to write .tgz files with the
> StreamingFileSink#BulkWriter. It seems like flushing the output stream
> doesn't flush all the data written. I've verified I can create valid files
> using the same APIs and data on there own, so thinking it must be something
> I'm doing wrong with the bulk format. I'm writing to the local filesystem,
> with the `file://` protocol.
> >
> > For Tar/ Gzipping, I'm using the Apache Commons Compression library,
> version 1.20.
> >
> > Here's a runnable example of the issue:
> >
> > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> > import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> > import
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> > import org.apache.flink.api.common.serialization.BulkWriter;
> > import org.apache.flink.core.fs.FSDataOutputStream;
> > import org.apache.flink.core.fs.Path;
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> >
> > import java.io.FileOutputStream;
> > import java.io.IOException;
> > import java.io.Serializable;
> > import java.nio.charset.StandardCharsets;
> >
> > class Scratch {
> >   public static class Record implements Serializable {
> >     private static final long serialVersionUID = 1L;
> >
> >     String id;
> >
> >     public Record() {}
> >
> >     public Record(String id) {
> >       this.id = id;
> >     }
> >
> >     public String getId() {
> >       return id;
> >     }
> >
> >     public void setId(String id) {
> >       this.id = id;
> >     }
> >   }
> >
> >   public static void main(String[] args) throws Exception {
> >     final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >     TarArchiveOutputStream taos = new TarArchiveOutputStream(new
> GzipCompressorOutputStream(new
> FileOutputStream("/home/austin/Downloads/test.tgz")));
> >     TarArchiveEntry fileEntry = new
> TarArchiveEntry(String.format("%s.txt", "test"));
> >     String fullText = "hey\nyou\nwork";
> >     byte[] fullTextData = fullText.getBytes();
> >     fileEntry.setSize(fullTextData.length);
> >     taos.putArchiveEntry(fileEntry);
> >     taos.write(fullTextData, 0, fullTextData.length);
> >     taos.closeArchiveEntry();
> >     taos.flush();
> >     taos.close();
> >
> >     StreamingFileSink<Record> textSink = StreamingFileSink
> >         .forBulkFormat(new
> Path("file:///home/austin/Downloads/text-output"),
> >             new BulkWriter.Factory<Record>() {
> >               @Override
> >               public BulkWriter<Record> create(FSDataOutputStream out)
> throws IOException {
> >                 final TarArchiveOutputStream compressedOutputStream =
> new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
> >
> >                 return new BulkWriter<Record>() {
> >                   @Override
> >                   public void addElement(Record record) throws
> IOException {
> >                     TarArchiveEntry fileEntry = new
> TarArchiveEntry(String.format("%s.txt", record.id));
> >                     byte[] fullTextData =
> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
> >                     fileEntry.setSize(fullTextData.length);
> >                     compressedOutputStream.putArchiveEntry(fileEntry);
> >                     compressedOutputStream.write(fullTextData, 0,
> fullTextData.length);
> >                     compressedOutputStream.closeArchiveEntry();
> >                   }
> >
> >                   @Override
> >                   public void flush() throws IOException {
> >                     compressedOutputStream.flush();
> >                   }
> >
> >                   @Override
> >                   public void finish() throws IOException {
> >                     this.flush();
> >                   }
> >                 };
> >               }
> >             })
> >         .withBucketCheckInterval(1000)
> >         .build();
> >
> >     env
> >         .fromElements(new Record("1"), new Record("2"))
> >         .addSink(textSink)
> >         .name("Streaming File Sink")
> >         .uid("streaming-file-sink");
> >     env.execute("streaming file sink test");
> >   }
> > }
> >
> >
> > From the stat/ hex dumps, you can see that the first bits are there, but
> are then cut off:
> >
> > ~/Downloads » stat test.tgz
> >   File: test.tgz
> >   Size: 114       Blocks: 8          IO Block: 4096   regular file
> > Device: 801h/2049d Inode: 30041077    Links: 1
> > Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> > Access: 2020-02-21 19:30:06.009028283 -0500
> > Modify: 2020-02-21 19:30:44.509424406 -0500
> > Change: 2020-02-21 19:30:44.509424406 -0500
> >  Birth: -
> >
> > ~/Downloads » tar -tvf test.tgz
> > -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
> >
> > ~/Downloads » hd test.tgz
> > 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20
> |............1.. |
> > 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03
> |..afO...+.<.....|
> > 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c
> |....w0......7..L|
> > 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f
> |Ez..4.."......%.|
> > 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63
> |.c.....w.X...."c|
> > 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6
> |6x|....i_....S..|
> > 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08
> |...[.RJ}..M.....|
> > 00000070  00 00                                             |..|
> > 00000072
> >
> >
> >
> > text-output/37 » tar -xzf part-0-0
> >
> > gzip: stdin: unexpected end of file
> > tar: Child returned status 1
> > tar: Error is not recoverable: exiting now
> >
> > text-output/37 » stat part-0-0
> >   File: part-0-0
> >   Size: 10              Blocks: 8          IO Block: 4096   regular file
> > Device: 801h/2049d      Inode: 4590487     Links: 1
> > Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> > Access: 2020-02-21 19:33:06.258888702 -0500
> > Modify: 2020-02-21 19:33:04.466870139 -0500
> > Change: 2020-02-21 19:33:05.294878716 -0500
> >  Birth: -
> >
> > text-output/37 » hd part-0-0
> > 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> > 0000000a
> >
> > Is there anything simple I'm missing?
> >
> > Best,
> > Austin
>

Reply via email to