Thanks Piotr & Kostas. Really looking forward to this :)
Rafi On Wed, Mar 27, 2019 at 10:58 AM Piotr Nowojski <pi...@ververica.com> wrote: > Hi Rafi, > > There is also an ongoing effort to support bounded streams in DataStream > API [1], which might provide the backbone for the functionalists that you > need. > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-11875 > > On 25 Mar 2019, at 10:00, Kostas Kloudas <k.klou...@ververica.com> wrote: > > Hi Rafi, > > Although not the most elegant, but one solution could be to write your > program using the file > source in PROCESS_CONTINUOUSLY mode, as described here > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html > > and when you are sure that the processing of your file is done, then you > cancel the job. > > As I said, this is not the most elegant solution but it will do the job. > > Cheers, > Kostas > > > > On Mon, Mar 25, 2019 at 9:49 AM Rafi Aroch <rafi.ar...@gmail.com> wrote: > >> Hi Kostas, >> >> Thank you. >> I'm currently testing my job against a small file, so it's finishing >> before the checkpointing starts. >> But also if it was a larger file and checkpoint did happen, there would >> always be the tailing events starting after the last checkpoint until the >> source has finished. >> So would these events be lost? >> In this case, any flow which is (bounded stream) => (StreamingFileSink) >> would not give the expected results... >> >> The other alternative would be using BucketingSink, but it would not >> guaranty exactly-once into S3 which is not preferable. >> >> Can you suggest any workaround? Somehow making sure checkpointing is >> triggered at the end? >> >> Rafi >> >> >> On Thu, Mar 21, 2019 at 9:40 PM Kostas Kloudas <k.klou...@ververica.com> >> wrote: >> >>> Sorry Rafi, >>> >>> I just read your previous response where you say that you have already >>> activated checkpointing. >>> My bad for not paying attention. >>> >>> Unfortunately, currently in-progress files only roll (or get finalized) >>> on checkpoint barriers and NOT when calling close(). >>> This is due to the fact that at the function level, Flink does not >>> differentiate between failures and normal termination. >>> But there are plans to fix it: >>> https://issues.apache.org/jira/browse/FLINK-2646 >>> >>> So given the above, you should check if checkpoints go through your >>> pipeline or not before your source >>> stream reaches its end. If there are no checkpoints, then your >>> in-progress files will not be finalized and >>> Parquet, for example, will not write the footer that is needed to be >>> able to properly read the file. >>> >>> Kostas >>> >>> >>> On Thu, Mar 21, 2019 at 8:03 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: >>> >>>> Hi Kostas, >>>> >>>> Yes I have. >>>> >>>> Rafi >>>> >>>> On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <kklou...@gmail.com> wrote: >>>> >>>>> Hi Rafi, >>>>> >>>>> Have you enabled checkpointing for you job? >>>>> >>>>> Cheers, >>>>> Kostas >>>>> >>>>> On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <rafi.ar...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Piotr and Kostas, >>>>>> >>>>>> Thanks for your reply. >>>>>> >>>>>> The issue is that I don't see any committed files, only in-progress. >>>>>> I tried to debug the code for more details. I see that in >>>>>> *BulkPartWriter* I do reach the *write* methods and see events >>>>>> getting written, but I never reach the *closeForCommit*. I reach >>>>>> straight to the *close* function where all parts are disposed. >>>>>> >>>>>> In my job I have a finite stream (source is reading from parquet >>>>>> file/s). Doing some windowed aggregation and writing back to a parquet >>>>>> file. >>>>>> As far as I know, it should commit files during checkpoints and when >>>>>> the stream has finished. I did enabled checkpointing. >>>>>> I did verify that if I connect to other sinks, I see the events. >>>>>> >>>>>> Let me know if I can provide any further information that could be >>>>>> helpful. >>>>>> >>>>>> Would appreciate your help. >>>>>> >>>>>> Thanks, >>>>>> Rafi >>>>>> >>>>>> >>>>>> On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <kklou...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Rafi, >>>>>>> >>>>>>> Piotr is correct. In-progress files are not necessarily readable. >>>>>>> The valid files are the ones that are "committed" or finalized. >>>>>>> >>>>>>> Cheers, >>>>>>> Kostas >>>>>>> >>>>>>> On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <pi...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I’m not sure, but shouldn’t you be just reading committed files and >>>>>>>> ignore in-progress? Maybe Kostas could add more insight to this topic. >>>>>>>> >>>>>>>> Piotr Nowojski >>>>>>>> >>>>>>>> On 20 Mar 2019, at 12:23, Rafi Aroch <rafi.ar...@gmail.com> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm trying to stream events in Prorobuf format into a parquet file. >>>>>>>> I looked into both streaming-file options: BucketingSink & >>>>>>>> StreamingFileSink. >>>>>>>> I first tried using the newer *StreamingFileSink* with the >>>>>>>> *forBulkFormat >>>>>>>> *API. I noticed there's currently support only for the Avro format >>>>>>>> with the *ParquetAvroWriters*. >>>>>>>> I followed the same convention as Avro and wrote a >>>>>>>> *ParquetProtoWriters* builder class: >>>>>>>> >>>>>>>> public class ParquetProtoWriters { >>>>>>>> >>>>>>>> private static final int pageSize = 64 * 1024; >>>>>>>> >>>>>>>> public static <T extends Message> ParquetWriterFactory<T> >>>>>>>> forType(final Class<T> protoClass) { >>>>>>>> final ParquetBuilder<T> builder = (out) -> >>>>>>>> createProtoParquetWriter(protoClass, out); >>>>>>>> return new ParquetWriterFactory<>(builder); >>>>>>>> } >>>>>>>> >>>>>>>> private static <T extends Message> ParquetWriter<T> >>>>>>>> createProtoParquetWriter( >>>>>>>> Class<T> type, >>>>>>>> OutputFile out) throws IOException { >>>>>>>> >>>>>>>> return ProtoParquetWriter.<T>builder(out) >>>>>>>> .withPageSize(pageSize) >>>>>>>> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) >>>>>>>> .withCompressionCodec(CompressionCodecName.SNAPPY) >>>>>>>> .withProtoClass(type) >>>>>>>> .build(); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> And then I use it as follows: >>>>>>>> >>>>>>>> StreamingFileSink >>>>>>>> .forBulkFormat(new Path("some-path), >>>>>>>> ParquetProtoWriters.forType(SomeProtoType.class)) >>>>>>>> .build(); >>>>>>>> >>>>>>>> I ran tests on the *ParquetProtoWriters *itself and it writes >>>>>>>> everything properly and i'm able to read the files. >>>>>>>> >>>>>>>> When I use the sink as part of a job *I see illegal Parquet files >>>>>>>> created*: >>>>>>>> >>>>>>>> # parquet-tools cat >>>>>>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea >>>>>>>> .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a >>>>>>>> Parquet file (too small length: 4) >>>>>>>> >>>>>>>> >>>>>>>> Can anyone suggest what am I missing here? >>>>>>>> >>>>>>>> When trying to use the *BucketingSink*, I wrote a Writer class for >>>>>>>> Protobuf and everything worked perfectly: >>>>>>>> >>>>>>>> public class FlinkProtoParquetWriter<T extends MessageOrBuilder> >>>>>>>> implements Writer<T> { >>>>>>>> >>>>>>>> private static final long serialVersionUID = -975302556515811398L; >>>>>>>> >>>>>>>> private Path path; >>>>>>>> private Class<? extends Message> protoClass; >>>>>>>> private transient ParquetWriter<T> writer; >>>>>>>> >>>>>>>> private int position; >>>>>>>> private final CompressionCodecName compressionCodecName = >>>>>>>> CompressionCodecName.SNAPPY; >>>>>>>> private final int pageSize = 64 * 1024; >>>>>>>> >>>>>>>> public FlinkProtoParquetWriter(Class<? extends Message> >>>>>>>> protoClass) { >>>>>>>> this.protoClass = protoClass; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void open(FileSystem fs, Path path) throws IOException { >>>>>>>> this.position = 0; >>>>>>>> this.path = path; >>>>>>>> >>>>>>>> if (writer != null) { >>>>>>>> writer.close(); >>>>>>>> } >>>>>>>> >>>>>>>> writer = createWriter(); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public long flush() throws IOException { >>>>>>>> Preconditions.checkNotNull(writer); >>>>>>>> position += writer.getDataSize(); >>>>>>>> writer.close(); >>>>>>>> writer = createWriter(); >>>>>>>> >>>>>>>> return position; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public long getPos() { >>>>>>>> Preconditions.checkNotNull(writer); >>>>>>>> return position + writer.getDataSize(); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void close() throws IOException { >>>>>>>> if (writer != null) { >>>>>>>> writer.close(); >>>>>>>> writer = null; >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void write(T element) throws IOException { >>>>>>>> Preconditions.checkNotNull(writer); >>>>>>>> writer.write(element); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public Writer<T> duplicate() { >>>>>>>> return new FlinkProtoParquetWriter<>(protoClass); >>>>>>>> } >>>>>>>> >>>>>>>> private ParquetWriter<T> createWriter() throws IOException { >>>>>>>> return ProtoParquetWriter >>>>>>>> .<T>builder(path) >>>>>>>> .withPageSize(pageSize) >>>>>>>> .withCompressionCodec(compressionCodecName) >>>>>>>> .withProtoClass(protoClass) >>>>>>>> .build(); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> Rafi >>>>>>>> >>>>>>>> >>>>>>>> >>> >>> -- >>> Kostas Kloudas | Software Engineer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> -- >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> Stream Processing | Event Driven | Real Time >>> -- >>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> -- >>> Data Artisans GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>> >> > > -- > Kostas Kloudas | Software Engineer > > <https://www.ververica.com/> > > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > > >