Hi there,

Using Flink 1.9.1, trying to write .tgz files with the
StreamingFileSink#BulkWriter. It seems like flushing the output stream
doesn't flush all the data written. I've verified I can create valid files
using the same APIs and data on there own, so thinking it must be something
I'm doing wrong with the bulk format. I'm writing to the local filesystem,
with the `file://` protocol.

For Tar/ Gzipping, I'm using the Apache Commons Compression library,
version 1.20.

Here's a runnable example of the issue:

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;

class Scratch {
  public static class Record implements Serializable {
    private static final long serialVersionUID = 1L;

    String id;

    public Record() {}

    public Record(String id) {
      this.id = id;
    }

    public String getId() {
      return id;
    }

    public void setId(String id) {
      this.id = id;
    }
  }

  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

    TarArchiveOutputStream taos = new TarArchiveOutputStream(new
GzipCompressorOutputStream(new
FileOutputStream("/home/austin/Downloads/test.tgz")));
    TarArchiveEntry fileEntry = new
TarArchiveEntry(String.format("%s.txt", "test"));
    String fullText = "hey\nyou\nwork";
    byte[] fullTextData = fullText.getBytes();
    fileEntry.setSize(fullTextData.length);
    taos.putArchiveEntry(fileEntry);
    taos.write(fullTextData, 0, fullTextData.length);
    taos.closeArchiveEntry();
    taos.flush();
    taos.close();

    StreamingFileSink<Record> textSink = StreamingFileSink
        .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
            new BulkWriter.Factory<Record>() {
              @Override
              public BulkWriter<Record> create(FSDataOutputStream out)
throws IOException {
                final TarArchiveOutputStream compressedOutputStream =
new TarArchiveOutputStream(new GzipCompressorOutputStream(out));

                return new BulkWriter<Record>() {
                  @Override
                  public void addElement(Record record) throws IOException {
                    TarArchiveEntry fileEntry = new
TarArchiveEntry(String.format("%s.txt", record.id));
                    byte[] fullTextData =
"hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
                    fileEntry.setSize(fullTextData.length);
                    compressedOutputStream.putArchiveEntry(fileEntry);
                    compressedOutputStream.write(fullTextData, 0,
fullTextData.length);
                    compressedOutputStream.closeArchiveEntry();
                  }

                  @Override
                  public void flush() throws IOException {
                    compressedOutputStream.flush();
                  }

                  @Override
                  public void finish() throws IOException {
                    this.flush();
                  }
                };
              }
            })
        .withBucketCheckInterval(1000)
        .build();

    env
        .fromElements(new Record("1"), new Record("2"))
        .addSink(textSink)
        .name("Streaming File Sink")
        .uid("streaming-file-sink");
    env.execute("streaming file sink test");
  }
}


>From the stat/ hex dumps, you can see that the first bits are there, but
are then cut off:

~/Downloads » stat test.tgz
  File: test.tgz
  Size: 114       Blocks: 8          IO Block: 4096   regular file
Device: 801h/2049d Inode: 30041077    Links: 1
Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
Access: 2020-02-21 19:30:06.009028283 -0500
Modify: 2020-02-21 19:30:44.509424406 -0500
Change: 2020-02-21 19:30:44.509424406 -0500
 Birth: -

~/Downloads » tar -tvf test.tgz
-rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt

~/Downloads » hd test.tgz
00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20
 |............1.. |
00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03
 |..afO...+.<.....|
00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c
 |....w0......7..L|
00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f
 |Ez..4.."......%.|
00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63
 |.c.....w.X...."c|
00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6
 |6x|....i_....S..|
00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08
 |...[.RJ}..M.....|
00000070  00 00                                             |..|
00000072



text-output/37 » tar -xzf part-0-0

gzip: stdin: unexpected end of file
tar: Child returned status 1
tar: Error is not recoverable: exiting now

text-output/37 » stat part-0-0
  File: part-0-0
  Size: 10              Blocks: 8          IO Block: 4096   regular file
Device: 801h/2049d      Inode: 4590487     Links: 1
Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
Access: 2020-02-21 19:33:06.258888702 -0500
Modify: 2020-02-21 19:33:04.466870139 -0500
Change: 2020-02-21 19:33:05.294878716 -0500
 Birth: -

text-output/37 » hd part-0-0
00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
0000000a

Is there anything simple I'm missing?

Best,
Austin

Reply via email to