If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1. Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).
2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > Hi to all, > I;m trying to read a file serialized with kryo but I get this exception > (due to the fact that the createInputSplits creates 8 inputsplits, where > just one is not empty..). > > Caused by: java.io.IOException: Invalid argument > at sun.nio.ch.FileChannelImpl.position0(Native Method) > at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) > at > org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) > at > org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) > at > org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > ----------------------------------------------- > My program is basically the following: > > public static void main(String[] args) throws Exception { > > ... > //try-with-resources used to autoclose resources > try (Output output = new Output(new > FileOutputStream("/tmp/KryoTest.ser"))) { > //serialise object > Kryo kryo=new Kryo(); > kryo.writeClassAndObject(output, myObj); > } catch (FileNotFoundException ex) { > LOG.error(ex.getMessage(), ex); > } > > //deserialise object > > myObj=null; > > try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){ > Kryo kryo=new Kryo(); > myObj =(MyClass)kryo.readClassAndObject(input); > } catch (FileNotFoundException ex) { > LOG.error(ex.getMessage(), ex); > } > > > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class); > Configuration configuration = new Configuration(); > configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, > 64*1024*1024); > > TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class); > final BinaryInputFormat<MyClass> inputFormat = new > TypeSerializerInputFormat<>(typeInfo); > inputFormat.setFilePath("file:/tmp/KryoTest.ser"); > inputFormat.configure(configuration); > > DataSet<MyClass> ds = env.createInput(inputFormat); > ds.print(); > > } > > private static final class MyClassSerializer extends Serializer<MyClass> { > > @Override > public void write(Kryo kryo, Output output, MyClass object) { > kryo.writeClassAndObject(output, object); > } > > @Override > public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { > return (MyClass) kryo.readClassAndObject(input); > } > } > > Am I doing something wrong? > > Best, > Flavio > > >