Hi Niels,

Thanks for start this discussion. Share some thought about your
questions/proposals.

> Judge whether splittable for each individual file
Looks good to me.

> BZIP2 support splittable
Looks good to me.

> the Flink implementation is controlled by the number of splits
Can you check again? I think Flink is also affected by block size [1].

> Looking at how Hadoop does this I see that the FileInputFormat has a
method isSplitable
Now Flink do this in FileInputFormat, but also can do this like Hadoop

I can see the split strategy in Hadoop orc and parquet are quite complex, I
don't have a lot of in-depth research, but I think these implementations
should be meaningful.

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L644

Best,
Jingsong Lee

On Thu, Apr 23, 2020 at 4:13 PM Niels Basjes <ni...@basjes.nl> wrote:

> Hi,
>
> I wanted to spend some time on this idea I asked about a year ago.
>
> So I had a look at the actual code and found this where the file splits are
> calculated
>
>
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601
>
> What I realized is that apparently the current system for determining the
> FileInputSplit s
> 1) Lists  the collection of input files.
> 2) If a file is NOT splittable set a global flag to false.
> 3) ONLY if all of them are splittable then will they be split.
>
> So (if I understand correctly) if I have an input directory with a 100GB
> uncompressed text file and a 1 KB gzipped file then BOTH will NOT be split.
> Why is that done this way?
>
> I also found that the determination if a file is NOT splittable completely
> rests on the fact that the decompression that is to be used is to be
> provided by an InflaterInputStreamFactory.
> Which is funny because the BZIP2 as a compression IS splittable but
> according to this implementation it isn't.
>
> I also noticed that the Hadoop split calculation is controlled by a split
> size, the Flink implementation is controlled by the number of splits.
> This seems logical to me as in Hadoop (MapReduce) the number of tasks
> running is dynamic. In contrast with Flink where the number of parallel
> tasks is a given setting.
>
> Looking at how Hadoop does this I see that the FileInputFormat has a
> method isSplitable
> (yes with typo, and with a terribly dangerous default value:
> MAPREDUCE-2094)
> which gives the answer on a per file basis.
> Some file formats (like TextInputFormat) look at the codec that was found
> to see if it implements the SplittableCompressionCodec interface to
> determine if the file is splittable.
> Other file formats (like Avro and Parquet) use something else to determine
> this.
>
> Overall I currently think the way this has been implemented in Flink is not
> very good.
>
> However looking at the gap to bridge to make it similar to what Hadoop has
> seems like a huge step.
>
> I expect that many classes and interfaces will need to change dramatically
> to make this happen.
>
> My main question to you guys: Do you think it is worth it?
>
> Niels Basjes
>
>
> On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi Niels,
> >
> > Jörn is right, although offering different methods, Flink's InputFormat
> is
> > very similar to Hadoop's InputFormat interface.
> > The InputFormat.createInputSplits() method generates splits that can be
> > read in parallel.
> > The FileInputFormat splits files by fixed boundaries (usually HDFS
> > blocksize) and expects the InputFormat to find the right place to start
> and
> > end.
> > For line-wise read files (TextInputFormat) or files with a record
> delimiter
> > (DelimiterInputFormat), the formats read the first record after they
> found
> > the first delimiter in their split and stop at the first delimiter after
> > the split boundary.
> > The BinaryInputFormat extends FileInputFormat but overrides the
> > createInputSplits method.
> >
> > So, how exactly a file is read in parallel depends on the
> > createInputSplits() method of the InputFormat.
> >
> > Hope this helps,
> > Fabian
> >
> >
> > 2018-02-18 13:36 GMT+01:00 Jörn Franke <jornfra...@gmail.com>:
> >
> > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore
> you
> > > can set for custom Fileibputformats the attribute unsplittable = true
> if
> > > your file format cannot be split
> > >
> > > > On 18. Feb 2018, at 13:28, Niels Basjes <ni...@basjes.nl> wrote:
> > > >
> > > > Hi,
> > > >
> > > > In Hadoop MapReduce there is the notion of "splittable" in the
> > > > FileInputFormat. This has the effect that a single input file can be
> > fed
> > > > into multiple separate instances of the mapper that read the data.
> > > > A lot has been documented (i.e. text is splittable per line, gzipped
> > text
> > > > is not splittable) and designed into the various file formats (like
> > Avro
> > > > and Parquet) to allow splittability.
> > > >
> > > > The goal is that reading and parsing files can be done by multiple
> > > > cpus/systems in parallel.
> > > >
> > > > How is this handled in Flink?
> > > > Can Flink read a single file in parallel?
> > > > How does Flink administrate/handle the possibilities regarding the
> > > various
> > > > file formats?
> > > >
> > > >
> > > > The reason I ask is because I want to see if I can port this (now
> > Hadoop
> > > > specific) hobby project of mine to work with Flink:
> > > > https://github.com/nielsbasjes/splittablegzip
> > > >
> > > > Thanks.
> > > >
> > > > --
> > > > Best regards / Met vriendelijke groeten,
> > > >
> > > > Niels Basjes
> > >
> >
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


-- 
Best, Jingsong Lee

Reply via email to