I don't know your use case. The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...
It is up to your use case what you need to implement. 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > Should this be the case just reading recursively an entire directory > containing one object per file? > > On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> You could implement your own InputFormat based on FileInputFormat and >> overwrite the createInputSplits method to just create a single split per >> file. >> >> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> So what should I do? >>> >>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Ah, I checked the code. >>>> >>>> The BinaryInputFormat expects metadata which is written be the >>>> BinaryOutputFormat. >>>> So you cannot use the BinaryInputFormat to read a file which does not >>>> provide the metadata. >>>> >>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> The file containing the serialized object is 7 bytes >>>>> >>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> This might be an issue with the blockSize parameter of the >>>>>> BinaryInputFormat. >>>>>> How large is the file with the single object? >>>>>> >>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>>> >>>>>>> I also tried with >>>>>>> >>>>>>> DataSet<RowBundle> ds = >>>>>>> env.createInput(inputFormat).setParallelism(1); >>>>>>> >>>>>>> but I get the same error :( >>>>>>> >>>>>>> Moreover, in this example I put exactly one object per file so it >>>>>>> should be able to deserialize it, right? >>>>>>> >>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> If you create your file by just sequentially writing all objects to >>>>>>>> the file using Kryo, you can only read it with a parallelism of 1. >>>>>>>> Writing binary files in a way that they can be read in parallel is >>>>>>>> a bit tricky (and not specific to Flink). >>>>>>>> >>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it >>>>>>>> >: >>>>>>>> >>>>>>>>> Hi to all, >>>>>>>>> I;m trying to read a file serialized with kryo but I get this >>>>>>>>> exception (due to the fact that the createInputSplits creates 8 >>>>>>>>> inputsplits, where just one is not empty..). >>>>>>>>> >>>>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>>>> at >>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>> >>>>>>>>> ----------------------------------------------- >>>>>>>>> My program is basically the following: >>>>>>>>> >>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>> >>>>>>>>> ... >>>>>>>>> //try-with-resources used to autoclose resources >>>>>>>>> try (Output output = new Output(new >>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>>>> //serialise object >>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>> } >>>>>>>>> >>>>>>>>> //deserialise object >>>>>>>>> >>>>>>>>> myObj=null; >>>>>>>>> >>>>>>>>> try (Input input = new Input( new >>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> final ExecutionEnvironment env = >>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>>>> MyClassSerializer.class); >>>>>>>>> Configuration configuration = new Configuration(); >>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>>>> 64*1024*1024); >>>>>>>>> >>>>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>>>> inputFormat.configure(configuration); >>>>>>>>> >>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>>>> ds.print(); >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> private static final class MyClassSerializer extends >>>>>>>>> Serializer<MyClass> { >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) { >>>>>>>>> kryo.writeClassAndObject(output, object); >>>>>>>>> } >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { >>>>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> Am I doing something wrong? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Flavio >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> > >