The file containing the serialized object is 7 bytes On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> This might be an issue with the blockSize parameter of the > BinaryInputFormat. > How large is the file with the single object? > > 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> I also tried with >> >> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1); >> >> but I get the same error :( >> >> Moreover, in this example I put exactly one object per file so it should >> be able to deserialize it, right? >> >> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> If you create your file by just sequentially writing all objects to the >>> file using Kryo, you can only read it with a parallelism of 1. >>> Writing binary files in a way that they can be read in parallel is a bit >>> tricky (and not specific to Flink). >>> >>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> Hi to all, >>>> I;m trying to read a file serialized with kryo but I get this exception >>>> (due to the fact that the createInputSplits creates 8 inputsplits, where >>>> just one is not empty..). >>>> >>>> Caused by: java.io.IOException: Invalid argument >>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>> at >>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>> at >>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>> at >>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> ----------------------------------------------- >>>> My program is basically the following: >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> ... >>>> //try-with-resources used to autoclose resources >>>> try (Output output = new Output(new >>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>> //serialise object >>>> Kryo kryo=new Kryo(); >>>> kryo.writeClassAndObject(output, myObj); >>>> } catch (FileNotFoundException ex) { >>>> LOG.error(ex.getMessage(), ex); >>>> } >>>> >>>> //deserialise object >>>> >>>> myObj=null; >>>> >>>> try (Input input = new Input( new >>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>> Kryo kryo=new Kryo(); >>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>> } catch (FileNotFoundException ex) { >>>> LOG.error(ex.getMessage(), ex); >>>> } >>>> >>>> >>>> final ExecutionEnvironment env = >>>> ExecutionEnvironment.getExecutionEnvironment(); >>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>> MyClassSerializer.class); >>>> Configuration configuration = new Configuration(); >>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>> 64*1024*1024); >>>> >>>> TypeInformation<MyClass> typeInfo = new >>>> GenericTypeInfo<>(MyClass.class); >>>> final BinaryInputFormat<MyClass> inputFormat = new >>>> TypeSerializerInputFormat<>(typeInfo); >>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>> inputFormat.configure(configuration); >>>> >>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>> ds.print(); >>>> >>>> } >>>> >>>> private static final class MyClassSerializer extends >>>> Serializer<MyClass> { >>>> >>>> @Override >>>> public void write(Kryo kryo, Output output, MyClass object) { >>>> kryo.writeClassAndObject(output, object); >>>> } >>>> >>>> @Override >>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { >>>> return (MyClass) kryo.readClassAndObject(input); >>>> } >>>> } >>>> >>>> Am I doing something wrong? >>>> >>>> Best, >>>> Flavio >>>> >>>> >>>> >>> >> >