Done through https://issues.apache.org/jira/browse/FLINK-2503
Thanks again, Flavio On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Congrats that you got your InputFormat working! > It is true, there can be a few inconsistencies in the Formats derived from > FileInputFormat. > > It would be great if you could open JIRAs for these issues. Otherwise, the > might get lost on the mailing list. > > Thanks, Fabian > > 2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> Hi Fabian, >> thanks to your help I finally managed to successfully generate a DataSet >> from my folder but I think that there are some inconsistencies in the >> hierarchy of InputFormats. >> The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow >> inherit the behaviour of the FileInputFormat (so respect *unsplittable* >> and *enumerateNestedFiles*) while they doesn't take into account those >> flags. >> Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix >> this shit"* that maybe should be removed or fixed :) >> >> Also maintaing aligned testForUnsplittable and decorateInputStream is >> somehow dangerous.. >> And maybe visibility for getBlockIndexForPosition should be changed to >> protected? >> >> So basically, my needs was to implement >> a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a >> lot of overrides..am I doing something wrong or are those inputFormat >> somehow to improve..? This is my IF code (*remark*: from the comment *"Copied >> from FileInputFormat (override TypeSerializerInputFormat)"* on the code >> is copied-and-pasted from FileInputFormat..thus MY code ends there): >> >> public class RowBundleInputFormat extends >> TypeSerializerInputFormat<RowBundle> { >> >> private static final long serialVersionUID = 1L; >> private static final Logger LOG = >> LoggerFactory.getLogger(RowBundleInputFormat.class); >> >> /** The fraction that the last split may be larger than the others. */ >> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; >> private boolean objectRead; >> >> public RowBundleInputFormat() { >> super(new GenericTypeInfo<>(RowBundle.class)); >> unsplittable = true; >> } >> >> @Override >> protected FSDataInputStream decorateInputStream(FSDataInputStream >> inputStream, FileInputSplit fileSplit) throws Throwable { >> return inputStream; >> } >> >> @Override >> protected boolean testForUnsplittable(FileStatus pathFile) { >> return true; >> } >> >> @Override >> public void open(FileInputSplit split) throws IOException { >> super.open(split); >> objectRead = false; >> } >> >> @Override >> public boolean reachedEnd() throws IOException { >> return this.objectRead; >> } >> >> @Override >> public RowBundle nextRecord(RowBundle reuse) throws IOException { >> RowBundle yourObject = super.nextRecord(reuse); >> this.objectRead = true; // read only one object >> return yourObject; >> } >> >> // ------------------------------------------------------------------- >> // Copied from FileInputFormat (override TypeSerializerInputFormat) >> // ------------------------------------------------------------------- >> @Override >> public FileInputSplit[] createInputSplits(int minNumSplits) >> throws IOException { >> if (minNumSplits < 1) { >> throw new IllegalArgumentException( >> "Number of input splits has to be at least 1."); >> } >> >> // take the desired number of splits into account >> minNumSplits = Math.max(minNumSplits, this.numSplits); >> >> final Path path = this.filePath; >> final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>( >> minNumSplits); >> >> // get all the files that are involved in the splits >> List<FileStatus> files = new ArrayList<FileStatus>(); >> long totalLength = 0; >> >> final FileSystem fs = path.getFileSystem(); >> final FileStatus pathFile = fs.getFileStatus(path); >> >> if (pathFile.isDir()) { >> // input is directory. list all contained files >> final FileStatus[] dir = fs.listStatus(path); >> for (int i = 0; i < dir.length; i++) { >> if (dir[i].isDir()) { >> if (enumerateNestedFiles) { >> if (acceptFile(dir[i])) { >> totalLength += addNestedFiles(dir[i].getPath(), >> files, 0, true); >> } else { >> if (LOG.isDebugEnabled()) { >> LOG.debug("Directory " >> + dir[i].getPath().toString() >> + " did not pass the file-filter and is excluded."); >> } >> } >> } >> } else { >> if (acceptFile(dir[i])) { >> files.add(dir[i]); >> totalLength += dir[i].getLen(); >> // as soon as there is one deflate file in a directory, >> // we can not split it >> testForUnsplittable(dir[i]); >> } else { >> if (LOG.isDebugEnabled()) { >> LOG.debug("File " >> + dir[i].getPath().toString() >> + " did not pass the file-filter and is excluded."); >> } >> } >> } >> } >> } else { >> testForUnsplittable(pathFile); >> >> files.add(pathFile); >> totalLength += pathFile.getLen(); >> } >> // returns if unsplittable >> if (unsplittable) { >> int splitNum = 0; >> for (final FileStatus file : files) { >> final BlockLocation[] blocks = fs.getFileBlockLocations(file, >> 0, file.getLen()); >> Set<String> hosts = new HashSet<String>(); >> for (BlockLocation block : blocks) { >> hosts.addAll(Arrays.asList(block.getHosts())); >> } >> long len = file.getLen(); >> if (testForUnsplittable(file)) { >> len = READ_WHOLE_SPLIT_FLAG; >> } >> FileInputSplit fis = new FileInputSplit(splitNum++, >> file.getPath(), 0, len, hosts.toArray(new String[hosts >> .size()])); >> inputSplits.add(fis); >> } >> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); >> } >> >> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE >> : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 >> : 1)); >> >> // now that we have the files, generate the splits >> int splitNum = 0; >> for (final FileStatus file : files) { >> >> final long len = file.getLen(); >> final long blockSize = file.getBlockSize(); >> >> final long minSplitSize; >> if (this.minSplitSize <= blockSize) { >> minSplitSize = this.minSplitSize; >> } else { >> if (LOG.isWarnEnabled()) { >> LOG.warn("Minimal split size of " + this.minSplitSize >> + " is larger than the block size of " + blockSize >> + ". Decreasing minimal split size to block size."); >> } >> minSplitSize = blockSize; >> } >> >> final long splitSize = Math.max(minSplitSize, >> Math.min(maxSplitSize, blockSize)); >> final long halfSplit = splitSize >>> 1; >> >> final long maxBytesForLastSplit = (long) (splitSize * >> MAX_SPLIT_SIZE_DISCREPANCY); >> >> if (len > 0) { >> >> // get the block locations and make sure they are in order with >> // respect to their offset >> final BlockLocation[] blocks = fs.getFileBlockLocations(file, >> 0, len); >> Arrays.sort(blocks); >> >> long bytesUnassigned = len; >> long position = 0; >> >> int blockIndex = 0; >> >> while (bytesUnassigned > maxBytesForLastSplit) { >> // get the block containing the majority of the data >> blockIndex = getBlockIndexForPosition(blocks, position, >> halfSplit, blockIndex); >> // create a new split >> FileInputSplit fis = new FileInputSplit(splitNum++, >> file.getPath(), position, splitSize, >> blocks[blockIndex].getHosts()); >> inputSplits.add(fis); >> >> // adjust the positions >> position += splitSize; >> bytesUnassigned -= splitSize; >> } >> >> // assign the last split >> if (bytesUnassigned > 0) { >> blockIndex = getBlockIndexForPosition(blocks, position, >> halfSplit, blockIndex); >> final FileInputSplit fis = new FileInputSplit(splitNum++, >> file.getPath(), position, bytesUnassigned, >> blocks[blockIndex].getHosts()); >> inputSplits.add(fis); >> } >> } else { >> // special case with a file of zero bytes size >> final BlockLocation[] blocks = fs.getFileBlockLocations(file, >> 0, 0); >> String[] hosts; >> if (blocks.length > 0) { >> hosts = blocks[0].getHosts(); >> } else { >> hosts = new String[0]; >> } >> final FileInputSplit fis = new FileInputSplit(splitNum++, >> file.getPath(), 0, 0, hosts); >> inputSplits.add(fis); >> } >> } >> >> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); >> } >> >> /** >> * Recursively traverse the input directory structure and enumerate all >> * accepted nested files. >> * >> * @return the total length of accepted files. >> */ >> private long addNestedFiles(Path path, List<FileStatus> files, long >> length, >> boolean logExcludedFiles) throws IOException { >> final FileSystem fs = path.getFileSystem(); >> >> for (FileStatus dir : fs.listStatus(path)) { >> if (dir.isDir()) { >> if (acceptFile(dir)) { >> addNestedFiles(dir.getPath(), files, length, >> logExcludedFiles); >> } else { >> if (logExcludedFiles && LOG.isDebugEnabled()) { >> LOG.debug("Directory " >> + dir.getPath().toString() >> + " did not pass the file-filter and is excluded."); >> } >> } >> } else { >> if (acceptFile(dir)) { >> files.add(dir); >> length += dir.getLen(); >> testForUnsplittable(dir); >> } else { >> if (logExcludedFiles && LOG.isDebugEnabled()) { >> LOG.debug("Directory " >> + dir.getPath().toString() >> + " did not pass the file-filter and is excluded."); >> } >> } >> } >> } >> return length; >> } >> >> /** >> * Retrieves the index of the <tt>BlockLocation</tt> that contains the part >> * of the file described by the given offset. >> * >> * @param blocks >> * The different blocks of the file. Must be ordered by their >> * offset. >> * @param offset >> * The offset of the position in the file. >> * @param startIndex >> * The earliest index to look at. >> * @return The index of the block containing the given position. >> */ >> private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, >> long halfSplitSize, int startIndex) { >> // go over all indexes after the startIndex >> for (int i = startIndex; i < blocks.length; i++) { >> long blockStart = blocks[i].getOffset(); >> long blockEnd = blockStart + blocks[i].getLength(); >> >> if (offset >= blockStart && offset < blockEnd) { >> // got the block where the split starts >> // check if the next block contains more than this one does >> if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) { >> return i + 1; >> } else { >> return i; >> } >> } >> } >> throw new IllegalArgumentException("The given offset is not contained in >> the any block."); >> } >> >> } >> >> >> >> >> On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> You need to do something like this: >>> >>> public class YourInputFormat extends FileInputFormat<Object> { >>> >>> private boolean objectRead; >>> >>> @Override >>> public FileInputSplit[] createInputSplits(int minNumSplits) { >>> // Create one FileInputSplit for each file you want to read. >>> // Check FileInputFormat for how to recursively enumerate files. >>> // Input splits must start at 0 and have a length equal to length >>> of the file to read. >>> return null; >>> } >>> >>> @Override >>> public void open(FileInputSplit split) throws IOException { >>> super.open(split); >>> objectRead = false; >>> } >>> >>> @Override >>> public boolean reachedEnd() throws IOException { >>> return this.objectRead; >>> } >>> >>> @Override >>> public Object nextRecord(Object reuse) throws IOException { >>> Object yourObject = this.stream.read(); // use Kryo here to read >>> from this.stream() >>> this.objectRead = true; // read only one object >>> return yourObject; >>> } >>> } >>> >>> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> Sorry Fabian but I don't understand what I should do :( >>>> Could you provide me a simple snippet of code to achieve this? >>>> >>>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Enumeration of nested files is a feature of the FileInputFormat. >>>>> If you implement your own IF based on FileInputFormat as I suggested >>>>> before, you can use that feature. >>>>> >>>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>> >>>>>> I have a directory containing a list of files, each one containing a >>>>>> kryo-serialized object. >>>>>> With json serialized objects I don't have that problem (but there I >>>>>> use env.readTextFile(path.withParameters(parameters) >>>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true). >>>>>> >>>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I don't know your use case. >>>>>>> The InputFormat interface is very flexible. Directories can be >>>>>>> recursively read. A file can contain one or more objects. You can also >>>>>>> make >>>>>>> a smarter IF and put multiple (small) files into one split... >>>>>>> >>>>>>> It is up to your use case what you need to implement. >>>>>>> >>>>>>> >>>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it> >>>>>>> : >>>>>>> >>>>>>>> Should this be the case just reading recursively an entire >>>>>>>> directory containing one object per file? >>>>>>>> >>>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> You could implement your own InputFormat based on FileInputFormat >>>>>>>>> and overwrite the createInputSplits method to just create a single >>>>>>>>> split >>>>>>>>> per file. >>>>>>>>> >>>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it>: >>>>>>>>> >>>>>>>>>> So what should I do? >>>>>>>>>> >>>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> Ah, I checked the code. >>>>>>>>>>> >>>>>>>>>>> The BinaryInputFormat expects metadata which is written be the >>>>>>>>>>> BinaryOutputFormat. >>>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which >>>>>>>>>>> does not provide the metadata. >>>>>>>>>>> >>>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>>> >>>>>>>>>>>> The file containing the serialized object is 7 bytes >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske < >>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> This might be an issue with the blockSize parameter of the >>>>>>>>>>>>> BinaryInputFormat. >>>>>>>>>>>>> How large is the file with the single object? >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier < >>>>>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>>>>> >>>>>>>>>>>>>> I also tried with >>>>>>>>>>>>>> >>>>>>>>>>>>>> DataSet<RowBundle> ds = >>>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1); >>>>>>>>>>>>>> >>>>>>>>>>>>>> but I get the same error :( >>>>>>>>>>>>>> >>>>>>>>>>>>>> Moreover, in this example I put exactly one object per file >>>>>>>>>>>>>> so it should be able to deserialize it, right? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske < >>>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> If you create your file by just sequentially writing all >>>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a >>>>>>>>>>>>>>> parallelism of >>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>> Writing binary files in a way that they can be read in >>>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier < >>>>>>>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi to all, >>>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get >>>>>>>>>>>>>>>> this exception (due to the fact that the createInputSplits >>>>>>>>>>>>>>>> creates 8 >>>>>>>>>>>>>>>> inputsplits, where just one is not empty..). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ----------------------------------------------- >>>>>>>>>>>>>>>> My program is basically the following: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>> //try-with-resources used to autoclose resources >>>>>>>>>>>>>>>> try (Output output = new Output(new >>>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>>>>>>>>>>> //serialise object >>>>>>>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> //deserialise object >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> myObj=null; >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> try (Input input = new Input( new >>>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> final ExecutionEnvironment env = >>>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>>>>>>>>>>> MyClassSerializer.class); >>>>>>>>>>>>>>>> Configuration configuration = new Configuration(); >>>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>>>>>>>>>>> 64*1024*1024); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>>>>>>>>>>> inputFormat.configure(configuration); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>>>>>>>>>>> ds.print(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> private static final class MyClassSerializer extends >>>>>>>>>>>>>>>> Serializer<MyClass> { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) >>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> >>>>>>>>>>>>>>>> type) { >>>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Am I doing something wrong? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Flavio >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >>