You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.
2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > So what should I do? > > On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Ah, I checked the code. >> >> The BinaryInputFormat expects metadata which is written be the >> BinaryOutputFormat. >> So you cannot use the BinaryInputFormat to read a file which does not >> provide the metadata. >> >> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> The file containing the serialized object is 7 bytes >>> >>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> This might be an issue with the blockSize parameter of the >>>> BinaryInputFormat. >>>> How large is the file with the single object? >>>> >>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> I also tried with >>>>> >>>>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1); >>>>> >>>>> but I get the same error :( >>>>> >>>>> Moreover, in this example I put exactly one object per file so it >>>>> should be able to deserialize it, right? >>>>> >>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> If you create your file by just sequentially writing all objects to >>>>>> the file using Kryo, you can only read it with a parallelism of 1. >>>>>> Writing binary files in a way that they can be read in parallel is a >>>>>> bit tricky (and not specific to Flink). >>>>>> >>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>>> >>>>>>> Hi to all, >>>>>>> I;m trying to read a file serialized with kryo but I get this >>>>>>> exception (due to the fact that the createInputSplits creates 8 >>>>>>> inputsplits, where just one is not empty..). >>>>>>> >>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>> at >>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>> at >>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>> at >>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> ----------------------------------------------- >>>>>>> My program is basically the following: >>>>>>> >>>>>>> public static void main(String[] args) throws Exception { >>>>>>> >>>>>>> ... >>>>>>> //try-with-resources used to autoclose resources >>>>>>> try (Output output = new Output(new >>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>> //serialise object >>>>>>> Kryo kryo=new Kryo(); >>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>> } catch (FileNotFoundException ex) { >>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>> } >>>>>>> >>>>>>> //deserialise object >>>>>>> >>>>>>> myObj=null; >>>>>>> >>>>>>> try (Input input = new Input( new >>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>> Kryo kryo=new Kryo(); >>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>> } catch (FileNotFoundException ex) { >>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>> } >>>>>>> >>>>>>> >>>>>>> final ExecutionEnvironment env = >>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>> MyClassSerializer.class); >>>>>>> Configuration configuration = new Configuration(); >>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>> 64*1024*1024); >>>>>>> >>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>> inputFormat.configure(configuration); >>>>>>> >>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>> ds.print(); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> private static final class MyClassSerializer extends >>>>>>> Serializer<MyClass> { >>>>>>> >>>>>>> @Override >>>>>>> public void write(Kryo kryo, Output output, MyClass object) { >>>>>>> kryo.writeClassAndObject(output, object); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { >>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> Am I doing something wrong? >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >