Enumeration of nested files is a feature of the FileInputFormat. If you implement your own IF based on FileInputFormat as I suggested before, you can use that feature.
2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > I have a directory containing a list of files, each one containing a > kryo-serialized object. > With json serialized objects I don't have that problem (but there I use > env.readTextFile(path.withParameters(parameters) > where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true). > > On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> I don't know your use case. >> The InputFormat interface is very flexible. Directories can be >> recursively read. A file can contain one or more objects. You can also make >> a smarter IF and put multiple (small) files into one split... >> >> It is up to your use case what you need to implement. >> >> >> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> Should this be the case just reading recursively an entire directory >>> containing one object per file? >>> >>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> You could implement your own InputFormat based on FileInputFormat and >>>> overwrite the createInputSplits method to just create a single split per >>>> file. >>>> >>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> So what should I do? >>>>> >>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Ah, I checked the code. >>>>>> >>>>>> The BinaryInputFormat expects metadata which is written be the >>>>>> BinaryOutputFormat. >>>>>> So you cannot use the BinaryInputFormat to read a file which does not >>>>>> provide the metadata. >>>>>> >>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>>> >>>>>>> The file containing the serialized object is 7 bytes >>>>>>> >>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> This might be an issue with the blockSize parameter of the >>>>>>>> BinaryInputFormat. >>>>>>>> How large is the file with the single object? >>>>>>>> >>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it >>>>>>>> >: >>>>>>>> >>>>>>>>> I also tried with >>>>>>>>> >>>>>>>>> DataSet<RowBundle> ds = >>>>>>>>> env.createInput(inputFormat).setParallelism(1); >>>>>>>>> >>>>>>>>> but I get the same error :( >>>>>>>>> >>>>>>>>> Moreover, in this example I put exactly one object per file so it >>>>>>>>> should be able to deserialize it, right? >>>>>>>>> >>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> If you create your file by just sequentially writing all objects >>>>>>>>>> to the file using Kryo, you can only read it with a parallelism of 1. >>>>>>>>>> Writing binary files in a way that they can be read in parallel >>>>>>>>>> is a bit tricky (and not specific to Flink). >>>>>>>>>> >>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>> >>>>>>>>>>> Hi to all, >>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this >>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8 >>>>>>>>>>> inputsplits, where just one is not empty..). >>>>>>>>>>> >>>>>>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>> >>>>>>>>>>> ----------------------------------------------- >>>>>>>>>>> My program is basically the following: >>>>>>>>>>> >>>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>>> >>>>>>>>>>> ... >>>>>>>>>>> //try-with-resources used to autoclose resources >>>>>>>>>>> try (Output output = new Output(new >>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>>>>>> //serialise object >>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> //deserialise object >>>>>>>>>>> >>>>>>>>>>> myObj=null; >>>>>>>>>>> >>>>>>>>>>> try (Input input = new Input( new >>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> final ExecutionEnvironment env = >>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>>>>>> MyClassSerializer.class); >>>>>>>>>>> Configuration configuration = new Configuration(); >>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>>>>>> 64*1024*1024); >>>>>>>>>>> >>>>>>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>>>>>> inputFormat.configure(configuration); >>>>>>>>>>> >>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>>>>>> ds.print(); >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> private static final class MyClassSerializer extends >>>>>>>>>>> Serializer<MyClass> { >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) { >>>>>>>>>>> kryo.writeClassAndObject(output, object); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) >>>>>>>>>>> { >>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> Am I doing something wrong? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Flavio >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >