I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively
read. A file can contain one or more objects. You can also make a smarter
IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Should this be the case just reading recursively an entire directory
> containing one object per file?
>
> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> You could implement your own InputFormat based on FileInputFormat and
>> overwrite the createInputSplits method to just create a single split per
>> file.
>>
>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> So what should I do?
>>>
>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Ah, I checked the code.
>>>>
>>>> The BinaryInputFormat expects metadata which is written be the
>>>> BinaryOutputFormat.
>>>> So you cannot use the BinaryInputFormat to read a file which does not
>>>> provide the metadata.
>>>>
>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> The file containing the serialized object is 7 bytes
>>>>>
>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> This might be an issue with the blockSize parameter of the
>>>>>> BinaryInputFormat.
>>>>>> How large is the file with the single object?
>>>>>>
>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>>
>>>>>>> I also tried with
>>>>>>>
>>>>>>> DataSet<RowBundle> ds =
>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>
>>>>>>> but I get the same error :(
>>>>>>>
>>>>>>> Moreover, in this example I put exactly one object per file so it
>>>>>>> should be able to deserialize it, right?
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If you create your file by just sequentially writing all objects to
>>>>>>>> the file using Kryo, you can only read it with a parallelism of 1.
>>>>>>>> Writing binary files in a way that they can be read in parallel is
>>>>>>>> a bit tricky (and not specific to Flink).
>>>>>>>>
>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>
>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>
>>>>>>>>> -----------------------------------------------
>>>>>>>>> My program is basically the following:
>>>>>>>>>
>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>
>>>>>>>>> ...
>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>> try (Output output = new Output(new
>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>> //serialise object
>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> //deserialise object
>>>>>>>>>
>>>>>>>>> myObj=null;
>>>>>>>>>
>>>>>>>>> try (Input input = new Input( new
>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>> MyClassSerializer.class);
>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>> 64*1024*1024);
>>>>>>>>>
>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>
>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>> ds.print();
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Am I doing something wrong?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to