Should this be the case just reading recursively an entire directory containing one object per file?
On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> wrote: > You could implement your own InputFormat based on FileInputFormat and > overwrite the createInputSplits method to just create a single split per > file. > > 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> So what should I do? >> >> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Ah, I checked the code. >>> >>> The BinaryInputFormat expects metadata which is written be the >>> BinaryOutputFormat. >>> So you cannot use the BinaryInputFormat to read a file which does not >>> provide the metadata. >>> >>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> The file containing the serialized object is 7 bytes >>>> >>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> This might be an issue with the blockSize parameter of the >>>>> BinaryInputFormat. >>>>> How large is the file with the single object? >>>>> >>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>> >>>>>> I also tried with >>>>>> >>>>>> DataSet<RowBundle> ds = >>>>>> env.createInput(inputFormat).setParallelism(1); >>>>>> >>>>>> but I get the same error :( >>>>>> >>>>>> Moreover, in this example I put exactly one object per file so it >>>>>> should be able to deserialize it, right? >>>>>> >>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> If you create your file by just sequentially writing all objects to >>>>>>> the file using Kryo, you can only read it with a parallelism of 1. >>>>>>> Writing binary files in a way that they can be read in parallel is a >>>>>>> bit tricky (and not specific to Flink). >>>>>>> >>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it> >>>>>>> : >>>>>>> >>>>>>>> Hi to all, >>>>>>>> I;m trying to read a file serialized with kryo but I get this >>>>>>>> exception (due to the fact that the createInputSplits creates 8 >>>>>>>> inputsplits, where just one is not empty..). >>>>>>>> >>>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>>> at >>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>>> at >>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>>> at >>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> >>>>>>>> ----------------------------------------------- >>>>>>>> My program is basically the following: >>>>>>>> >>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>> >>>>>>>> ... >>>>>>>> //try-with-resources used to autoclose resources >>>>>>>> try (Output output = new Output(new >>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>>> //serialise object >>>>>>>> Kryo kryo=new Kryo(); >>>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>> } >>>>>>>> >>>>>>>> //deserialise object >>>>>>>> >>>>>>>> myObj=null; >>>>>>>> >>>>>>>> try (Input input = new Input( new >>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>>> Kryo kryo=new Kryo(); >>>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> final ExecutionEnvironment env = >>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>>> MyClassSerializer.class); >>>>>>>> Configuration configuration = new Configuration(); >>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>>> 64*1024*1024); >>>>>>>> >>>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>>> inputFormat.configure(configuration); >>>>>>>> >>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>>> ds.print(); >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> private static final class MyClassSerializer extends >>>>>>>> Serializer<MyClass> { >>>>>>>> >>>>>>>> @Override >>>>>>>> public void write(Kryo kryo, Output output, MyClass object) { >>>>>>>> kryo.writeClassAndObject(output, object); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { >>>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> Am I doing something wrong? >>>>>>>> >>>>>>>> Best, >>>>>>>> Flavio >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >