I have a directory containing a list of files, each one containing a kryo-serialized object. With json serialized objects I don't have that problem (but there I use env.readTextFile(path.withParameters(parameters) where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com> wrote: > I don't know your use case. > The InputFormat interface is very flexible. Directories can be recursively > read. A file can contain one or more objects. You can also make a smarter > IF and put multiple (small) files into one split... > > It is up to your use case what you need to implement. > > > 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> Should this be the case just reading recursively an entire directory >> containing one object per file? >> >> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> You could implement your own InputFormat based on FileInputFormat and >>> overwrite the createInputSplits method to just create a single split per >>> file. >>> >>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> So what should I do? >>>> >>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Ah, I checked the code. >>>>> >>>>> The BinaryInputFormat expects metadata which is written be the >>>>> BinaryOutputFormat. >>>>> So you cannot use the BinaryInputFormat to read a file which does not >>>>> provide the metadata. >>>>> >>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>> >>>>>> The file containing the serialized object is 7 bytes >>>>>> >>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> This might be an issue with the blockSize parameter of the >>>>>>> BinaryInputFormat. >>>>>>> How large is the file with the single object? >>>>>>> >>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it> >>>>>>> : >>>>>>> >>>>>>>> I also tried with >>>>>>>> >>>>>>>> DataSet<RowBundle> ds = >>>>>>>> env.createInput(inputFormat).setParallelism(1); >>>>>>>> >>>>>>>> but I get the same error :( >>>>>>>> >>>>>>>> Moreover, in this example I put exactly one object per file so it >>>>>>>> should be able to deserialize it, right? >>>>>>>> >>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> If you create your file by just sequentially writing all objects >>>>>>>>> to the file using Kryo, you can only read it with a parallelism of 1. >>>>>>>>> Writing binary files in a way that they can be read in parallel is >>>>>>>>> a bit tricky (and not specific to Flink). >>>>>>>>> >>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it>: >>>>>>>>> >>>>>>>>>> Hi to all, >>>>>>>>>> I;m trying to read a file serialized with kryo but I get this >>>>>>>>>> exception (due to the fact that the createInputSplits creates 8 >>>>>>>>>> inputsplits, where just one is not empty..). >>>>>>>>>> >>>>>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>> >>>>>>>>>> ----------------------------------------------- >>>>>>>>>> My program is basically the following: >>>>>>>>>> >>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>> >>>>>>>>>> ... >>>>>>>>>> //try-with-resources used to autoclose resources >>>>>>>>>> try (Output output = new Output(new >>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>>>>> //serialise object >>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> //deserialise object >>>>>>>>>> >>>>>>>>>> myObj=null; >>>>>>>>>> >>>>>>>>>> try (Input input = new Input( new >>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> final ExecutionEnvironment env = >>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>>>>> MyClassSerializer.class); >>>>>>>>>> Configuration configuration = new Configuration(); >>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>>>>> 64*1024*1024); >>>>>>>>>> >>>>>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>>>>> inputFormat.configure(configuration); >>>>>>>>>> >>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>>>>> ds.print(); >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> private static final class MyClassSerializer extends >>>>>>>>>> Serializer<MyClass> { >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) { >>>>>>>>>> kryo.writeClassAndObject(output, object); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { >>>>>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Am I doing something wrong? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Flavio >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >> >