Sorry Fabian but I don't understand what I should do :( Could you provide me a simple snippet of code to achieve this?
On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Enumeration of nested files is a feature of the FileInputFormat. > If you implement your own IF based on FileInputFormat as I suggested > before, you can use that feature. > > 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> I have a directory containing a list of files, each one containing a >> kryo-serialized object. >> With json serialized objects I don't have that problem (but there I use >> env.readTextFile(path.withParameters(parameters) >> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true). >> >> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> I don't know your use case. >>> The InputFormat interface is very flexible. Directories can be >>> recursively read. A file can contain one or more objects. You can also make >>> a smarter IF and put multiple (small) files into one split... >>> >>> It is up to your use case what you need to implement. >>> >>> >>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> Should this be the case just reading recursively an entire directory >>>> containing one object per file? >>>> >>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> You could implement your own InputFormat based on FileInputFormat and >>>>> overwrite the createInputSplits method to just create a single split per >>>>> file. >>>>> >>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>> >>>>>> So what should I do? >>>>>> >>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Ah, I checked the code. >>>>>>> >>>>>>> The BinaryInputFormat expects metadata which is written be the >>>>>>> BinaryOutputFormat. >>>>>>> So you cannot use the BinaryInputFormat to read a file which does >>>>>>> not provide the metadata. >>>>>>> >>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it> >>>>>>> : >>>>>>> >>>>>>>> The file containing the serialized object is 7 bytes >>>>>>>> >>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This might be an issue with the blockSize parameter of the >>>>>>>>> BinaryInputFormat. >>>>>>>>> How large is the file with the single object? >>>>>>>>> >>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it>: >>>>>>>>> >>>>>>>>>> I also tried with >>>>>>>>>> >>>>>>>>>> DataSet<RowBundle> ds = >>>>>>>>>> env.createInput(inputFormat).setParallelism(1); >>>>>>>>>> >>>>>>>>>> but I get the same error :( >>>>>>>>>> >>>>>>>>>> Moreover, in this example I put exactly one object per file so it >>>>>>>>>> should be able to deserialize it, right? >>>>>>>>>> >>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> If you create your file by just sequentially writing all objects >>>>>>>>>>> to the file using Kryo, you can only read it with a parallelism of >>>>>>>>>>> 1. >>>>>>>>>>> Writing binary files in a way that they can be read in parallel >>>>>>>>>>> is a bit tricky (and not specific to Flink). >>>>>>>>>>> >>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>>> >>>>>>>>>>>> Hi to all, >>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this >>>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8 >>>>>>>>>>>> inputsplits, where just one is not empty..). >>>>>>>>>>>> >>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>> >>>>>>>>>>>> ----------------------------------------------- >>>>>>>>>>>> My program is basically the following: >>>>>>>>>>>> >>>>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>>>> >>>>>>>>>>>> ... >>>>>>>>>>>> //try-with-resources used to autoclose resources >>>>>>>>>>>> try (Output output = new Output(new >>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>>>>>>> //serialise object >>>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> //deserialise object >>>>>>>>>>>> >>>>>>>>>>>> myObj=null; >>>>>>>>>>>> >>>>>>>>>>>> try (Input input = new Input( new >>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> final ExecutionEnvironment env = >>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>>>>>>> MyClassSerializer.class); >>>>>>>>>>>> Configuration configuration = new Configuration(); >>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>>>>>>> 64*1024*1024); >>>>>>>>>>>> >>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>>>>>>> inputFormat.configure(configuration); >>>>>>>>>>>> >>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>>>>>>> ds.print(); >>>>>>>>>>>> >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> private static final class MyClassSerializer extends >>>>>>>>>>>> Serializer<MyClass> { >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) { >>>>>>>>>>>> kryo.writeClassAndObject(output, object); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> >>>>>>>>>>>> type) { >>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> Am I doing something wrong? >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Flavio >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >