Hi, Yes, you are correct. Flink does use a minSplitSize to determine the splits. I missed that part. Also this is the part I do not intend to change.
In this I would focus on essentially: - "is splittable" is decided per file --> Similar to Hadoop: Both a codec and fileformat get a "isSplittable" method/marker that let's this to be decided per file at runtime. This should be the 'same' construct for all file formats (so also for Text, Avro, Parquet, etc.). Also if I create a new file format that for something that does not support splitting (like some specific datastructures using for example XML) then that should be cleanly possible. - The codecs must be overridable. For this https://github.com/nielsbasjes/splittablegzip to work the default gzip decompressor must be disabled. The current setup seems to make this possible but I'm not sure: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L118 I intend to leave the way the splits are calculated to be as-is. Niels On Thu, Apr 23, 2020 at 11:33 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi Niels, > > Thanks for start this discussion. Share some thought about your > questions/proposals. > > > Judge whether splittable for each individual file > Looks good to me. > > > BZIP2 support splittable > Looks good to me. > > > the Flink implementation is controlled by the number of splits > Can you check again? I think Flink is also affected by block size [1]. > > > Looking at how Hadoop does this I see that the FileInputFormat has a > method isSplitable > Now Flink do this in FileInputFormat, but also can do this like Hadoop > > I can see the split strategy in Hadoop orc and parquet are quite complex, I > don't have a lot of in-depth research, but I think these implementations > should be meaningful. > > [1] > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L644 > > Best, > Jingsong Lee > > On Thu, Apr 23, 2020 at 4:13 PM Niels Basjes <ni...@basjes.nl> wrote: > > > Hi, > > > > I wanted to spend some time on this idea I asked about a year ago. > > > > So I had a look at the actual code and found this where the file splits > are > > calculated > > > > > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601 > > > > What I realized is that apparently the current system for determining the > > FileInputSplit s > > 1) Lists the collection of input files. > > 2) If a file is NOT splittable set a global flag to false. > > 3) ONLY if all of them are splittable then will they be split. > > > > So (if I understand correctly) if I have an input directory with a 100GB > > uncompressed text file and a 1 KB gzipped file then BOTH will NOT be > split. > > Why is that done this way? > > > > I also found that the determination if a file is NOT splittable > completely > > rests on the fact that the decompression that is to be used is to be > > provided by an InflaterInputStreamFactory. > > Which is funny because the BZIP2 as a compression IS splittable but > > according to this implementation it isn't. > > > > I also noticed that the Hadoop split calculation is controlled by a split > > size, the Flink implementation is controlled by the number of splits. > > This seems logical to me as in Hadoop (MapReduce) the number of tasks > > running is dynamic. In contrast with Flink where the number of parallel > > tasks is a given setting. > > > > Looking at how Hadoop does this I see that the FileInputFormat has a > > method isSplitable > > (yes with typo, and with a terribly dangerous default value: > > MAPREDUCE-2094) > > which gives the answer on a per file basis. > > Some file formats (like TextInputFormat) look at the codec that was found > > to see if it implements the SplittableCompressionCodec interface to > > determine if the file is splittable. > > Other file formats (like Avro and Parquet) use something else to > determine > > this. > > > > Overall I currently think the way this has been implemented in Flink is > not > > very good. > > > > However looking at the gap to bridge to make it similar to what Hadoop > has > > seems like a huge step. > > > > I expect that many classes and interfaces will need to change > dramatically > > to make this happen. > > > > My main question to you guys: Do you think it is worth it? > > > > Niels Basjes > > > > > > On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <fhue...@gmail.com> > wrote: > > > > > Hi Niels, > > > > > > Jörn is right, although offering different methods, Flink's InputFormat > > is > > > very similar to Hadoop's InputFormat interface. > > > The InputFormat.createInputSplits() method generates splits that can be > > > read in parallel. > > > The FileInputFormat splits files by fixed boundaries (usually HDFS > > > blocksize) and expects the InputFormat to find the right place to start > > and > > > end. > > > For line-wise read files (TextInputFormat) or files with a record > > delimiter > > > (DelimiterInputFormat), the formats read the first record after they > > found > > > the first delimiter in their split and stop at the first delimiter > after > > > the split boundary. > > > The BinaryInputFormat extends FileInputFormat but overrides the > > > createInputSplits method. > > > > > > So, how exactly a file is read in parallel depends on the > > > createInputSplits() method of the InputFormat. > > > > > > Hope this helps, > > > Fabian > > > > > > > > > 2018-02-18 13:36 GMT+01:00 Jörn Franke <jornfra...@gmail.com>: > > > > > > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore > > you > > > > can set for custom Fileibputformats the attribute unsplittable = true > > if > > > > your file format cannot be split > > > > > > > > > On 18. Feb 2018, at 13:28, Niels Basjes <ni...@basjes.nl> wrote: > > > > > > > > > > Hi, > > > > > > > > > > In Hadoop MapReduce there is the notion of "splittable" in the > > > > > FileInputFormat. This has the effect that a single input file can > be > > > fed > > > > > into multiple separate instances of the mapper that read the data. > > > > > A lot has been documented (i.e. text is splittable per line, > gzipped > > > text > > > > > is not splittable) and designed into the various file formats (like > > > Avro > > > > > and Parquet) to allow splittability. > > > > > > > > > > The goal is that reading and parsing files can be done by multiple > > > > > cpus/systems in parallel. > > > > > > > > > > How is this handled in Flink? > > > > > Can Flink read a single file in parallel? > > > > > How does Flink administrate/handle the possibilities regarding the > > > > various > > > > > file formats? > > > > > > > > > > > > > > > The reason I ask is because I want to see if I can port this (now > > > Hadoop > > > > > specific) hobby project of mine to work with Flink: > > > > > https://github.com/nielsbasjes/splittablegzip > > > > > > > > > > Thanks. > > > > > > > > > > -- > > > > > Best regards / Met vriendelijke groeten, > > > > > > > > > > Niels Basjes > > > > > > > > > > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > > > > > -- > Best, Jingsong Lee > -- Best regards / Met vriendelijke groeten, Niels Basjes