This might be an issue with the blockSize parameter of the BinaryInputFormat. How large is the file with the single object?
2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > I also tried with > > DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1); > > but I get the same error :( > > Moreover, in this example I put exactly one object per file so it should > be able to deserialize it, right? > > On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> If you create your file by just sequentially writing all objects to the >> file using Kryo, you can only read it with a parallelism of 1. >> Writing binary files in a way that they can be read in parallel is a bit >> tricky (and not specific to Flink). >> >> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> Hi to all, >>> I;m trying to read a file serialized with kryo but I get this exception >>> (due to the fact that the createInputSplits creates 8 inputsplits, where >>> just one is not empty..). >>> >>> Caused by: java.io.IOException: Invalid argument >>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>> at >>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>> at >>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>> at >>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>> at >>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> ----------------------------------------------- >>> My program is basically the following: >>> >>> public static void main(String[] args) throws Exception { >>> >>> ... >>> //try-with-resources used to autoclose resources >>> try (Output output = new Output(new >>> FileOutputStream("/tmp/KryoTest.ser"))) { >>> //serialise object >>> Kryo kryo=new Kryo(); >>> kryo.writeClassAndObject(output, myObj); >>> } catch (FileNotFoundException ex) { >>> LOG.error(ex.getMessage(), ex); >>> } >>> >>> //deserialise object >>> >>> myObj=null; >>> >>> try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){ >>> Kryo kryo=new Kryo(); >>> myObj =(MyClass)kryo.readClassAndObject(input); >>> } catch (FileNotFoundException ex) { >>> LOG.error(ex.getMessage(), ex); >>> } >>> >>> >>> final ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> env.registerTypeWithKryoSerializer(MyClass.class, >>> MyClassSerializer.class); >>> Configuration configuration = new Configuration(); >>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>> 64*1024*1024); >>> >>> TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class); >>> final BinaryInputFormat<MyClass> inputFormat = new >>> TypeSerializerInputFormat<>(typeInfo); >>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>> inputFormat.configure(configuration); >>> >>> DataSet<MyClass> ds = env.createInput(inputFormat); >>> ds.print(); >>> >>> } >>> >>> private static final class MyClassSerializer extends Serializer<MyClass> >>> { >>> >>> @Override >>> public void write(Kryo kryo, Output output, MyClass object) { >>> kryo.writeClassAndObject(output, object); >>> } >>> >>> @Override >>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { >>> return (MyClass) kryo.readClassAndObject(input); >>> } >>> } >>> >>> Am I doing something wrong? >>> >>> Best, >>> Flavio >>> >>> >>> >> >