Hi,

Yes, you are correct. Flink does use a minSplitSize to determine the
splits. I missed that part.
Also this is the part I do not intend to change.

In this I would focus on essentially:
- "is splittable" is decided per file --> Similar to Hadoop: Both a codec
and fileformat get a "isSplittable" method/marker that let's this to be
decided per file at runtime. This should be the 'same' construct for all
file formats (so also for Text, Avro, Parquet, etc.). Also if I create a
new file format that for something that does not support splitting (like
some specific datastructures using for example XML) then that should be
cleanly possible.
- The codecs must be overridable. For this
https://github.com/nielsbasjes/splittablegzip to work the default gzip
decompressor must be disabled. The current setup seems to make this
possible but I'm not sure:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L118


I intend to leave the way the splits are calculated to be as-is.

Niels



On Thu, Apr 23, 2020 at 11:33 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi Niels,
>
> Thanks for start this discussion. Share some thought about your
> questions/proposals.
>
> > Judge whether splittable for each individual file
> Looks good to me.
>
> > BZIP2 support splittable
> Looks good to me.
>
> > the Flink implementation is controlled by the number of splits
> Can you check again? I think Flink is also affected by block size [1].
>
> > Looking at how Hadoop does this I see that the FileInputFormat has a
> method isSplitable
> Now Flink do this in FileInputFormat, but also can do this like Hadoop
>
> I can see the split strategy in Hadoop orc and parquet are quite complex, I
> don't have a lot of in-depth research, but I think these implementations
> should be meaningful.
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L644
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 23, 2020 at 4:13 PM Niels Basjes <ni...@basjes.nl> wrote:
>
> > Hi,
> >
> > I wanted to spend some time on this idea I asked about a year ago.
> >
> > So I had a look at the actual code and found this where the file splits
> are
> > calculated
> >
> >
> >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601
> >
> > What I realized is that apparently the current system for determining the
> > FileInputSplit s
> > 1) Lists  the collection of input files.
> > 2) If a file is NOT splittable set a global flag to false.
> > 3) ONLY if all of them are splittable then will they be split.
> >
> > So (if I understand correctly) if I have an input directory with a 100GB
> > uncompressed text file and a 1 KB gzipped file then BOTH will NOT be
> split.
> > Why is that done this way?
> >
> > I also found that the determination if a file is NOT splittable
> completely
> > rests on the fact that the decompression that is to be used is to be
> > provided by an InflaterInputStreamFactory.
> > Which is funny because the BZIP2 as a compression IS splittable but
> > according to this implementation it isn't.
> >
> > I also noticed that the Hadoop split calculation is controlled by a split
> > size, the Flink implementation is controlled by the number of splits.
> > This seems logical to me as in Hadoop (MapReduce) the number of tasks
> > running is dynamic. In contrast with Flink where the number of parallel
> > tasks is a given setting.
> >
> > Looking at how Hadoop does this I see that the FileInputFormat has a
> > method isSplitable
> > (yes with typo, and with a terribly dangerous default value:
> > MAPREDUCE-2094)
> > which gives the answer on a per file basis.
> > Some file formats (like TextInputFormat) look at the codec that was found
> > to see if it implements the SplittableCompressionCodec interface to
> > determine if the file is splittable.
> > Other file formats (like Avro and Parquet) use something else to
> determine
> > this.
> >
> > Overall I currently think the way this has been implemented in Flink is
> not
> > very good.
> >
> > However looking at the gap to bridge to make it similar to what Hadoop
> has
> > seems like a huge step.
> >
> > I expect that many classes and interfaces will need to change
> dramatically
> > to make this happen.
> >
> > My main question to you guys: Do you think it is worth it?
> >
> > Niels Basjes
> >
> >
> > On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <fhue...@gmail.com>
> wrote:
> >
> > > Hi Niels,
> > >
> > > Jörn is right, although offering different methods, Flink's InputFormat
> > is
> > > very similar to Hadoop's InputFormat interface.
> > > The InputFormat.createInputSplits() method generates splits that can be
> > > read in parallel.
> > > The FileInputFormat splits files by fixed boundaries (usually HDFS
> > > blocksize) and expects the InputFormat to find the right place to start
> > and
> > > end.
> > > For line-wise read files (TextInputFormat) or files with a record
> > delimiter
> > > (DelimiterInputFormat), the formats read the first record after they
> > found
> > > the first delimiter in their split and stop at the first delimiter
> after
> > > the split boundary.
> > > The BinaryInputFormat extends FileInputFormat but overrides the
> > > createInputSplits method.
> > >
> > > So, how exactly a file is read in parallel depends on the
> > > createInputSplits() method of the InputFormat.
> > >
> > > Hope this helps,
> > > Fabian
> > >
> > >
> > > 2018-02-18 13:36 GMT+01:00 Jörn Franke <jornfra...@gmail.com>:
> > >
> > > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore
> > you
> > > > can set for custom Fileibputformats the attribute unsplittable = true
> > if
> > > > your file format cannot be split
> > > >
> > > > > On 18. Feb 2018, at 13:28, Niels Basjes <ni...@basjes.nl> wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > In Hadoop MapReduce there is the notion of "splittable" in the
> > > > > FileInputFormat. This has the effect that a single input file can
> be
> > > fed
> > > > > into multiple separate instances of the mapper that read the data.
> > > > > A lot has been documented (i.e. text is splittable per line,
> gzipped
> > > text
> > > > > is not splittable) and designed into the various file formats (like
> > > Avro
> > > > > and Parquet) to allow splittability.
> > > > >
> > > > > The goal is that reading and parsing files can be done by multiple
> > > > > cpus/systems in parallel.
> > > > >
> > > > > How is this handled in Flink?
> > > > > Can Flink read a single file in parallel?
> > > > > How does Flink administrate/handle the possibilities regarding the
> > > > various
> > > > > file formats?
> > > > >
> > > > >
> > > > > The reason I ask is because I want to see if I can port this (now
> > > Hadoop
> > > > > specific) hobby project of mine to work with Flink:
> > > > > https://github.com/nielsbasjes/splittablegzip
> > > > >
> > > > > Thanks.
> > > > >
> > > > > --
> > > > > Best regards / Met vriendelijke groeten,
> > > > >
> > > > > Niels Basjes
> > > >
> > >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to