If you create your file by just sequentially writing all objects to the
file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit
tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Hi to all,
> I;m trying to read a file serialized with kryo but I get this exception
> (due to the fact that the createInputSplits creates 8 inputsplits, where
> just one is not empty..).
>
> Caused by: java.io.IOException: Invalid argument
> at sun.nio.ch.FileChannelImpl.position0(Native Method)
> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
> at
> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
> at
> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> -----------------------------------------------
> My program is basically the following:
>
> public static void main(String[] args) throws Exception {
>
> ...
> //try-with-resources used to autoclose resources
> try (Output output = new Output(new
> FileOutputStream("/tmp/KryoTest.ser"))) {
> //serialise object
> Kryo kryo=new Kryo();
> kryo.writeClassAndObject(output, myObj);
> } catch (FileNotFoundException ex) {
> LOG.error(ex.getMessage(), ex);
> }
>
> //deserialise object
>
> myObj=null;
>
> try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
>     Kryo kryo=new Kryo();
>     myObj =(MyClass)kryo.readClassAndObject(input);
> } catch (FileNotFoundException ex) {
> LOG.error(ex.getMessage(), ex);
> }
>
>
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
> Configuration configuration = new Configuration();
> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
> 64*1024*1024);
>
> TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
> final BinaryInputFormat<MyClass> inputFormat = new
> TypeSerializerInputFormat<>(typeInfo);
> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
> inputFormat.configure(configuration);
>
> DataSet<MyClass> ds = env.createInput(inputFormat);
> ds.print();
>
> }
>
> private static final class MyClassSerializer extends Serializer<MyClass> {
>
> @Override
> public void write(Kryo kryo, Output output, MyClass object) {
> kryo.writeClassAndObject(output, object);
> }
>
> @Override
> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
> return (MyClass) kryo.readClassAndObject(input);
> }
> }
>
> Am I doing something wrong?
>
> Best,
> Flavio
>
>
>

Reply via email to