The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> This might be an issue with the blockSize parameter of the
> BinaryInputFormat.
> How large is the file with the single object?
>
> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
>> I also tried with
>>
>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>>
>> but I get the same error :(
>>
>> Moreover, in this example I put exactly one object per file so it should
>> be able to deserialize it, right?
>>
>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> If you create your file by just sequentially writing all objects to the
>>> file using Kryo, you can only read it with a parallelism of 1.
>>> Writing binary files in a way that they can be read in parallel is a bit
>>> tricky (and not specific to Flink).
>>>
>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>
>>>> Hi to all,
>>>> I;m trying to read a file serialized with kryo but I get this exception
>>>> (due to the fact that the createInputSplits creates 8 inputsplits, where
>>>> just one is not empty..).
>>>>
>>>> Caused by: java.io.IOException: Invalid argument
>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>> at
>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>> at
>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>> at
>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> -----------------------------------------------
>>>> My program is basically the following:
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>>
>>>> ...
>>>> //try-with-resources used to autoclose resources
>>>> try (Output output = new Output(new
>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>> //serialise object
>>>> Kryo kryo=new Kryo();
>>>> kryo.writeClassAndObject(output, myObj);
>>>> } catch (FileNotFoundException ex) {
>>>> LOG.error(ex.getMessage(), ex);
>>>> }
>>>>
>>>> //deserialise object
>>>>
>>>> myObj=null;
>>>>
>>>> try (Input input = new Input( new
>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>     Kryo kryo=new Kryo();
>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>> } catch (FileNotFoundException ex) {
>>>> LOG.error(ex.getMessage(), ex);
>>>> }
>>>>
>>>>
>>>> final ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>> MyClassSerializer.class);
>>>> Configuration configuration = new Configuration();
>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>> 64*1024*1024);
>>>>
>>>> TypeInformation<MyClass> typeInfo = new
>>>> GenericTypeInfo<>(MyClass.class);
>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>> TypeSerializerInputFormat<>(typeInfo);
>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>> inputFormat.configure(configuration);
>>>>
>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>> ds.print();
>>>>
>>>> }
>>>>
>>>> private static final class MyClassSerializer extends
>>>> Serializer<MyClass> {
>>>>
>>>> @Override
>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>> kryo.writeClassAndObject(output, object);
>>>> }
>>>>
>>>> @Override
>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>> return (MyClass) kryo.readClassAndObject(input);
>>>> }
>>>> }
>>>>
>>>> Am I doing something wrong?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to