Hi Kostas, Yes I have.
Rafi On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <kklou...@gmail.com> wrote: > Hi Rafi, > > Have you enabled checkpointing for you job? > > Cheers, > Kostas > > On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > >> Hi Piotr and Kostas, >> >> Thanks for your reply. >> >> The issue is that I don't see any committed files, only in-progress. >> I tried to debug the code for more details. I see that in >> *BulkPartWriter* I do reach the *write* methods and see events getting >> written, but I never reach the *closeForCommit*. I reach straight to the >> *close* function where all parts are disposed. >> >> In my job I have a finite stream (source is reading from parquet file/s). >> Doing some windowed aggregation and writing back to a parquet file. >> As far as I know, it should commit files during checkpoints and when the >> stream has finished. I did enabled checkpointing. >> I did verify that if I connect to other sinks, I see the events. >> >> Let me know if I can provide any further information that could be >> helpful. >> >> Would appreciate your help. >> >> Thanks, >> Rafi >> >> >> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com> >> wrote: >> >>> Hi Rafi, >>> >>> Piotr is correct. In-progress files are not necessarily readable. >>> The valid files are the ones that are "committed" or finalized. >>> >>> Cheers, >>> Kostas >>> >>> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I’m not sure, but shouldn’t you be just reading committed files and >>>> ignore in-progress? Maybe Kostas could add more insight to this topic. >>>> >>>> Piotr Nowojski >>>> >>>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote: >>>> >>>> Hi, >>>> >>>> I'm trying to stream events in Prorobuf format into a parquet file. >>>> I looked into both streaming-file options: BucketingSink & >>>> StreamingFileSink. >>>> I first tried using the newer *StreamingFileSink* with the *forBulkFormat >>>> *API. I noticed there's currently support only for the Avro format >>>> with the *ParquetAvroWriters*. >>>> I followed the same convention as Avro and wrote a >>>> *ParquetProtoWriters* builder class: >>>> >>>> public class ParquetProtoWriters { >>>> >>>> private static final int pageSize = 64 * 1024; >>>> >>>> public static <T extends Message> ParquetWriterFactory<T> >>>> forType(final Class<T> protoClass) { >>>> final ParquetBuilder<T> builder = (out) -> >>>> createProtoParquetWriter(protoClass, out); >>>> return new ParquetWriterFactory<>(builder); >>>> } >>>> >>>> private static <T extends Message> ParquetWriter<T> >>>> createProtoParquetWriter( >>>> Class<T> type, >>>> OutputFile out) throws IOException { >>>> >>>> return ProtoParquetWriter.<T>builder(out) >>>> .withPageSize(pageSize) >>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>> .withCompressionCodec(CompressionCodecName.SNAPPY) >>>> .withProtoClass(type) >>>> .build(); >>>> } >>>> } >>>> >>>> And then I use it as follows: >>>> >>>> StreamingFileSink >>>> .forBulkFormat(new Path("some-path), >>>> ParquetProtoWriters.forType(SomeProtoType.class)) >>>> .build(); >>>> >>>> I ran tests on the *ParquetProtoWriters *itself and it writes >>>> everything properly and i'm able to read the files. >>>> >>>> When I use the sink as part of a job *I see illegal Parquet files >>>> created*: >>>> >>>> # parquet-tools cat >>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea >>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet >>>> file (too small length: 4) >>>> >>>> >>>> Can anyone suggest what am I missing here? >>>> >>>> When trying to use the *BucketingSink*, I wrote a Writer class for >>>> Protobuf and everything worked perfectly: >>>> >>>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> >>>> implements Writer<T> { >>>> >>>> private static final long serialVersionUID = -975302556515811398L; >>>> >>>> private Path path; >>>> private Class<? extends Message> protoClass; >>>> private transient ParquetWriter<T> writer; >>>> >>>> private int position; >>>> private final CompressionCodecName compressionCodecName = >>>> CompressionCodecName.SNAPPY; >>>> private final int pageSize = 64 * 1024; >>>> >>>> public FlinkProtoParquetWriter(Class<? extends Message> protoClass) { >>>> this.protoClass = protoClass; >>>> } >>>> >>>> @Override >>>> public void open(FileSystem fs, Path path) throws IOException { >>>> this.position = 0; >>>> this.path = path; >>>> >>>> if (writer != null) { >>>> writer.close(); >>>> } >>>> >>>> writer = createWriter(); >>>> } >>>> >>>> @Override >>>> public long flush() throws IOException { >>>> Preconditions.checkNotNull(writer); >>>> position += writer.getDataSize(); >>>> writer.close(); >>>> writer = createWriter(); >>>> >>>> return position; >>>> } >>>> >>>> @Override >>>> public long getPos() { >>>> Preconditions.checkNotNull(writer); >>>> return position + writer.getDataSize(); >>>> } >>>> >>>> @Override >>>> public void close() throws IOException { >>>> if (writer != null) { >>>> writer.close(); >>>> writer = null; >>>> } >>>> } >>>> >>>> @Override >>>> public void write(T element) throws IOException { >>>> Preconditions.checkNotNull(writer); >>>> writer.write(element); >>>> } >>>> >>>> @Override >>>> public Writer<T> duplicate() { >>>> return new FlinkProtoParquetWriter<>(protoClass); >>>> } >>>> >>>> private ParquetWriter<T> createWriter() throws IOException { >>>> return ProtoParquetWriter >>>> .<T>builder(path) >>>> .withPageSize(pageSize) >>>> .withCompressionCodec(compressionCodecName) >>>> .withProtoClass(protoClass) >>>> .build(); >>>> } >>>> } >>>> >>>> >>>> Rafi >>>> >>>> >>>>