You need to do something like this: public class YourInputFormat extends FileInputFormat<Object> {
private boolean objectRead; @Override public FileInputSplit[] createInputSplits(int minNumSplits) { // Create one FileInputSplit for each file you want to read. // Check FileInputFormat for how to recursively enumerate files. // Input splits must start at 0 and have a length equal to length of the file to read. return null; } @Override public void open(FileInputSplit split) throws IOException { super.open(split); objectRead = false; } @Override public boolean reachedEnd() throws IOException { return this.objectRead; } @Override public Object nextRecord(Object reuse) throws IOException { Object yourObject = this.stream.read(); // use Kryo here to read from this.stream() this.objectRead = true; // read only one object return yourObject; } } 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > Sorry Fabian but I don't understand what I should do :( > Could you provide me a simple snippet of code to achieve this? > > On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Enumeration of nested files is a feature of the FileInputFormat. >> If you implement your own IF based on FileInputFormat as I suggested >> before, you can use that feature. >> >> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> I have a directory containing a list of files, each one containing a >>> kryo-serialized object. >>> With json serialized objects I don't have that problem (but there I use >>> env.readTextFile(path.withParameters(parameters) >>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true). >>> >>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> I don't know your use case. >>>> The InputFormat interface is very flexible. Directories can be >>>> recursively read. A file can contain one or more objects. You can also make >>>> a smarter IF and put multiple (small) files into one split... >>>> >>>> It is up to your use case what you need to implement. >>>> >>>> >>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> Should this be the case just reading recursively an entire directory >>>>> containing one object per file? >>>>> >>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> You could implement your own InputFormat based on FileInputFormat and >>>>>> overwrite the createInputSplits method to just create a single split per >>>>>> file. >>>>>> >>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>>> >>>>>>> So what should I do? >>>>>>> >>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Ah, I checked the code. >>>>>>>> >>>>>>>> The BinaryInputFormat expects metadata which is written be the >>>>>>>> BinaryOutputFormat. >>>>>>>> So you cannot use the BinaryInputFormat to read a file which does >>>>>>>> not provide the metadata. >>>>>>>> >>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it >>>>>>>> >: >>>>>>>> >>>>>>>>> The file containing the serialized object is 7 bytes >>>>>>>>> >>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> This might be an issue with the blockSize parameter of the >>>>>>>>>> BinaryInputFormat. >>>>>>>>>> How large is the file with the single object? >>>>>>>>>> >>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>> >>>>>>>>>>> I also tried with >>>>>>>>>>> >>>>>>>>>>> DataSet<RowBundle> ds = >>>>>>>>>>> env.createInput(inputFormat).setParallelism(1); >>>>>>>>>>> >>>>>>>>>>> but I get the same error :( >>>>>>>>>>> >>>>>>>>>>> Moreover, in this example I put exactly one object per file so >>>>>>>>>>> it should be able to deserialize it, right? >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske < >>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> If you create your file by just sequentially writing all >>>>>>>>>>>> objects to the file using Kryo, you can only read it with a >>>>>>>>>>>> parallelism of >>>>>>>>>>>> 1. >>>>>>>>>>>> Writing binary files in a way that they can be read in parallel >>>>>>>>>>>> is a bit tricky (and not specific to Flink). >>>>>>>>>>>> >>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier < >>>>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>>>> >>>>>>>>>>>>> Hi to all, >>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this >>>>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8 >>>>>>>>>>>>> inputsplits, where just one is not empty..). >>>>>>>>>>>>> >>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>>>>>>>> at >>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>> >>>>>>>>>>>>> ----------------------------------------------- >>>>>>>>>>>>> My program is basically the following: >>>>>>>>>>>>> >>>>>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>>>>> >>>>>>>>>>>>> ... >>>>>>>>>>>>> //try-with-resources used to autoclose resources >>>>>>>>>>>>> try (Output output = new Output(new >>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>>>>>>>> //serialise object >>>>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> //deserialise object >>>>>>>>>>>>> >>>>>>>>>>>>> myObj=null; >>>>>>>>>>>>> >>>>>>>>>>>>> try (Input input = new Input( new >>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> final ExecutionEnvironment env = >>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>>>>>>>> MyClassSerializer.class); >>>>>>>>>>>>> Configuration configuration = new Configuration(); >>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>>>>>>>> 64*1024*1024); >>>>>>>>>>>>> >>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>>>>>>>> inputFormat.configure(configuration); >>>>>>>>>>>>> >>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>>>>>>>> ds.print(); >>>>>>>>>>>>> >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> private static final class MyClassSerializer extends >>>>>>>>>>>>> Serializer<MyClass> { >>>>>>>>>>>>> >>>>>>>>>>>>> @Override >>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) { >>>>>>>>>>>>> kryo.writeClassAndObject(output, object); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> @Override >>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> >>>>>>>>>>>>> type) { >>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>>>>>>>> } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> Am I doing something wrong? >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Flavio >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >