Hi Niels, Thanks for start this discussion. Share some thought about your questions/proposals.
> Judge whether splittable for each individual file Looks good to me. > BZIP2 support splittable Looks good to me. > the Flink implementation is controlled by the number of splits Can you check again? I think Flink is also affected by block size [1]. > Looking at how Hadoop does this I see that the FileInputFormat has a method isSplitable Now Flink do this in FileInputFormat, but also can do this like Hadoop I can see the split strategy in Hadoop orc and parquet are quite complex, I don't have a lot of in-depth research, but I think these implementations should be meaningful. [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L644 Best, Jingsong Lee On Thu, Apr 23, 2020 at 4:13 PM Niels Basjes <ni...@basjes.nl> wrote: > Hi, > > I wanted to spend some time on this idea I asked about a year ago. > > So I had a look at the actual code and found this where the file splits are > calculated > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601 > > What I realized is that apparently the current system for determining the > FileInputSplit s > 1) Lists the collection of input files. > 2) If a file is NOT splittable set a global flag to false. > 3) ONLY if all of them are splittable then will they be split. > > So (if I understand correctly) if I have an input directory with a 100GB > uncompressed text file and a 1 KB gzipped file then BOTH will NOT be split. > Why is that done this way? > > I also found that the determination if a file is NOT splittable completely > rests on the fact that the decompression that is to be used is to be > provided by an InflaterInputStreamFactory. > Which is funny because the BZIP2 as a compression IS splittable but > according to this implementation it isn't. > > I also noticed that the Hadoop split calculation is controlled by a split > size, the Flink implementation is controlled by the number of splits. > This seems logical to me as in Hadoop (MapReduce) the number of tasks > running is dynamic. In contrast with Flink where the number of parallel > tasks is a given setting. > > Looking at how Hadoop does this I see that the FileInputFormat has a > method isSplitable > (yes with typo, and with a terribly dangerous default value: > MAPREDUCE-2094) > which gives the answer on a per file basis. > Some file formats (like TextInputFormat) look at the codec that was found > to see if it implements the SplittableCompressionCodec interface to > determine if the file is splittable. > Other file formats (like Avro and Parquet) use something else to determine > this. > > Overall I currently think the way this has been implemented in Flink is not > very good. > > However looking at the gap to bridge to make it similar to what Hadoop has > seems like a huge step. > > I expect that many classes and interfaces will need to change dramatically > to make this happen. > > My main question to you guys: Do you think it is worth it? > > Niels Basjes > > > On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <fhue...@gmail.com> wrote: > > > Hi Niels, > > > > Jörn is right, although offering different methods, Flink's InputFormat > is > > very similar to Hadoop's InputFormat interface. > > The InputFormat.createInputSplits() method generates splits that can be > > read in parallel. > > The FileInputFormat splits files by fixed boundaries (usually HDFS > > blocksize) and expects the InputFormat to find the right place to start > and > > end. > > For line-wise read files (TextInputFormat) or files with a record > delimiter > > (DelimiterInputFormat), the formats read the first record after they > found > > the first delimiter in their split and stop at the first delimiter after > > the split boundary. > > The BinaryInputFormat extends FileInputFormat but overrides the > > createInputSplits method. > > > > So, how exactly a file is read in parallel depends on the > > createInputSplits() method of the InputFormat. > > > > Hope this helps, > > Fabian > > > > > > 2018-02-18 13:36 GMT+01:00 Jörn Franke <jornfra...@gmail.com>: > > > > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore > you > > > can set for custom Fileibputformats the attribute unsplittable = true > if > > > your file format cannot be split > > > > > > > On 18. Feb 2018, at 13:28, Niels Basjes <ni...@basjes.nl> wrote: > > > > > > > > Hi, > > > > > > > > In Hadoop MapReduce there is the notion of "splittable" in the > > > > FileInputFormat. This has the effect that a single input file can be > > fed > > > > into multiple separate instances of the mapper that read the data. > > > > A lot has been documented (i.e. text is splittable per line, gzipped > > text > > > > is not splittable) and designed into the various file formats (like > > Avro > > > > and Parquet) to allow splittability. > > > > > > > > The goal is that reading and parsing files can be done by multiple > > > > cpus/systems in parallel. > > > > > > > > How is this handled in Flink? > > > > Can Flink read a single file in parallel? > > > > How does Flink administrate/handle the possibilities regarding the > > > various > > > > file formats? > > > > > > > > > > > > The reason I ask is because I want to see if I can port this (now > > Hadoop > > > > specific) hobby project of mine to work with Flink: > > > > https://github.com/nielsbasjes/splittablegzip > > > > > > > > Thanks. > > > > > > > > -- > > > > Best regards / Met vriendelijke groeten, > > > > > > > > Niels Basjes > > > > > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > -- Best, Jingsong Lee