Hi Kostas,

Yes I have.

Rafi

On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi Rafi,
>
> Have you enabled checkpointing for you job?
>
> Cheers,
> Kostas
>
> On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>
>> Hi Piotr and Kostas,
>>
>> Thanks for your reply.
>>
>> The issue is that I don't see any committed files, only in-progress.
>> I tried to debug the code for more details. I see that in
>> *BulkPartWriter* I do reach the *write* methods and see events getting
>> written, but I never reach the *closeForCommit*. I reach straight to the
>> *close* function where all parts are disposed.
>>
>> In my job I have a finite stream (source is reading from parquet file/s).
>> Doing some windowed aggregation and writing back to a parquet file.
>> As far as I know, it should commit files during checkpoints and when the
>> stream has finished. I did enabled checkpointing.
>> I did verify that if I connect to other sinks, I see the events.
>>
>> Let me know if I can provide any further information that could be
>> helpful.
>>
>> Would appreciate your help.
>>
>> Thanks,
>> Rafi
>>
>>
>> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com>
>> wrote:
>>
>>> Hi Rafi,
>>>
>>> Piotr is correct. In-progress files are not necessarily readable.
>>> The valid files are the ones that are "committed" or finalized.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I’m not sure, but shouldn’t you be just reading committed files and
>>>> ignore in-progress? Maybe Kostas could add more insight to this topic.
>>>>
>>>> Piotr Nowojski
>>>>
>>>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to stream events in Prorobuf format into a parquet file.
>>>> I looked into both streaming-file options: BucketingSink &
>>>> StreamingFileSink.
>>>> I first tried using the newer *StreamingFileSink* with the *forBulkFormat
>>>> *API. I noticed there's currently support only for the Avro format
>>>> with the *ParquetAvroWriters*.
>>>> I followed the same convention as Avro and wrote a
>>>> *ParquetProtoWriters* builder class:
>>>>
>>>> public class ParquetProtoWriters {
>>>>
>>>>     private static final int pageSize = 64 * 1024;
>>>>
>>>>     public static <T extends Message> ParquetWriterFactory<T> 
>>>> forType(final Class<T> protoClass) {
>>>>         final ParquetBuilder<T> builder = (out) -> 
>>>> createProtoParquetWriter(protoClass, out);
>>>>         return new ParquetWriterFactory<>(builder);
>>>>     }
>>>>
>>>>     private static <T extends Message> ParquetWriter<T> 
>>>> createProtoParquetWriter(
>>>>             Class<T> type,
>>>>             OutputFile out) throws IOException {
>>>>
>>>>         return ProtoParquetWriter.<T>builder(out)
>>>>                 .withPageSize(pageSize)
>>>>                 .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>>>>                 .withProtoClass(type)
>>>>                 .build();
>>>>     }
>>>> }
>>>>
>>>> And then I use it as follows:
>>>>
>>>> StreamingFileSink
>>>>         .forBulkFormat(new Path("some-path), 
>>>> ParquetProtoWriters.forType(SomeProtoType.class))
>>>>         .build();
>>>>
>>>> I ran tests on the *ParquetProtoWriters *itself and it writes
>>>> everything properly and i'm able to read the files.
>>>>
>>>> When I use the sink as part of a job *I see illegal Parquet files
>>>> created*:
>>>>
>>>> # parquet-tools cat 
>>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
>>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet 
>>>> file (too small length: 4)
>>>>
>>>>
>>>> Can anyone suggest what am I missing here?
>>>>
>>>> When trying to use the *BucketingSink*, I wrote a Writer class for
>>>> Protobuf and everything worked perfectly:
>>>>
>>>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> 
>>>> implements Writer<T> {
>>>>
>>>>     private static final long serialVersionUID = -975302556515811398L;
>>>>
>>>>     private Path path;
>>>>     private Class<? extends Message> protoClass;
>>>>     private transient ParquetWriter<T> writer;
>>>>
>>>>     private int position;
>>>>     private final CompressionCodecName compressionCodecName = 
>>>> CompressionCodecName.SNAPPY;
>>>>     private final int pageSize = 64 * 1024;
>>>>
>>>>     public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
>>>>         this.protoClass = protoClass;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>>         this.position = 0;
>>>>         this.path = path;
>>>>
>>>>         if (writer != null) {
>>>>             writer.close();
>>>>         }
>>>>
>>>>         writer = createWriter();
>>>>     }
>>>>
>>>>     @Override
>>>>     public long flush() throws IOException {
>>>>         Preconditions.checkNotNull(writer);
>>>>         position += writer.getDataSize();
>>>>         writer.close();
>>>>         writer = createWriter();
>>>>
>>>>         return position;
>>>>     }
>>>>
>>>>     @Override
>>>>     public long getPos() {
>>>>         Preconditions.checkNotNull(writer);
>>>>         return position + writer.getDataSize();
>>>>     }
>>>>
>>>>     @Override
>>>>     public void close() throws IOException {
>>>>         if (writer != null) {
>>>>             writer.close();
>>>>             writer = null;
>>>>         }
>>>>     }
>>>>
>>>>     @Override
>>>>     public void write(T element) throws IOException {
>>>>         Preconditions.checkNotNull(writer);
>>>>         writer.write(element);
>>>>     }
>>>>
>>>>     @Override
>>>>     public Writer<T> duplicate() {
>>>>         return new FlinkProtoParquetWriter<>(protoClass);
>>>>     }
>>>>
>>>>     private ParquetWriter<T> createWriter() throws IOException {
>>>>         return ProtoParquetWriter
>>>>                 .<T>builder(path)
>>>>                 .withPageSize(pageSize)
>>>>                 .withCompressionCodec(compressionCodecName)
>>>>                 .withProtoClass(protoClass)
>>>>                 .build();
>>>>     }
>>>> }
>>>>
>>>>
>>>> Rafi
>>>>
>>>>
>>>>

Reply via email to