Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore 
in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote:
> 
> Hi,
> 
> I'm trying to stream events in Prorobuf format into a parquet file.
> I looked into both streaming-file options: BucketingSink & StreamingFileSink.
> I first tried using the newer StreamingFileSink with the forBulkFormat API. I 
> noticed there's currently support only for the Avro format with the 
> ParquetAvroWriters.
> I followed the same convention as Avro and wrote a ParquetProtoWriters 
> builder class:
> 
> public class ParquetProtoWriters {
> 
>     private static final int pageSize = 64 * 1024;
> 
>     public static <T extends Message> ParquetWriterFactory<T> forType(final 
> Class<T> protoClass) {
>         final ParquetBuilder<T> builder = (out) -> 
> createProtoParquetWriter(protoClass, out);
>         return new ParquetWriterFactory<>(builder);
>     }
> 
>     private static <T extends Message> ParquetWriter<T> 
> createProtoParquetWriter(
>             Class<T> type,
>             OutputFile out) throws IOException {
> 
>         return ProtoParquetWriter.<T>builder(out)
>                 .withPageSize(pageSize)
>                 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>                 .withProtoClass(type)
>                 .build();
>     }
> }
> And then I use it as follows:
> StreamingFileSink
>         .forBulkFormat(new Path("some-path), 
> ParquetProtoWriters.forType(SomeProtoType.class))
>         .build();
> I ran tests on the ParquetProtoWriters itself and it writes everything 
> properly and i'm able to read the files.
> 
> When I use the sink as part of a job I see illegal Parquet files created:
> # parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
> file (too small length: 4)
> 
> Can anyone suggest what am I missing here?
> 
> When trying to use the BucketingSink, I wrote a Writer class for Protobuf and 
> everything worked perfectly:
> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements 
> Writer<T> {
> 
>     private static final long serialVersionUID = -975302556515811398L;
> 
>     private Path path;
>     private Class<? extends Message> protoClass;
>     private transient ParquetWriter<T> writer;
> 
>     private int position;
>     private final CompressionCodecName compressionCodecName = 
> CompressionCodecName.SNAPPY;
>     private final int pageSize = 64 * 1024;
> 
>     public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
>         this.protoClass = protoClass;
>     }
> 
>     @Override
>     public void open(FileSystem fs, Path path) throws IOException {
>         this.position = 0;
>         this.path = path;
> 
>         if (writer != null) {
>             writer.close();
>         }
> 
>         writer = createWriter();
>     }
> 
>     @Override
>     public long flush() throws IOException {
>         Preconditions.checkNotNull(writer);
>         position += writer.getDataSize();
>         writer.close();
>         writer = createWriter();
> 
>         return position;
>     }
> 
>     @Override
>     public long getPos() {
>         Preconditions.checkNotNull(writer);
>         return position + writer.getDataSize();
>     }
> 
>     @Override
>     public void close() throws IOException {
>         if (writer != null) {
>             writer.close();
>             writer = null;
>         }
>     }
> 
>     @Override
>     public void write(T element) throws IOException {
>         Preconditions.checkNotNull(writer);
>         writer.write(element);
>     }
> 
>     @Override
>     public Writer<T> duplicate() {
>         return new FlinkProtoParquetWriter<>(protoClass);
>     }
> 
>     private ParquetWriter<T> createWriter() throws IOException {
>         return ProtoParquetWriter
>                 .<T>builder(path)
>                 .withPageSize(pageSize)
>                 .withCompressionCodec(compressionCodecName)
>                 .withProtoClass(protoClass)
>                 .build();
>     }
> }
> 
> Rafi

Reply via email to