You need to do something like this:

public class YourInputFormat extends FileInputFormat<Object> {

   private boolean objectRead;

   @Override
   public FileInputSplit[] createInputSplits(int minNumSplits) {
      // Create one FileInputSplit for each file you want to read.
      // Check FileInputFormat for how to recursively enumerate files.
      // Input splits must start at 0 and have a length equal to length of
the file to read.
      return null;
   }

   @Override
   public void open(FileInputSplit split) throws IOException {
      super.open(split);
      objectRead = false;
   }

   @Override
   public boolean reachedEnd() throws IOException {
      return this.objectRead;
   }

   @Override
   public Object nextRecord(Object reuse) throws IOException {
      Object yourObject = this.stream.read(); // use Kryo here to read from
this.stream()
      this.objectRead = true; // read only one object
      return yourObject;
   }
}

2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Sorry Fabian but I don't understand what I should do :(
> Could you provide me a simple snippet of code to achieve this?
>
> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Enumeration of nested files is a feature of the FileInputFormat.
>> If you implement your own IF based on FileInputFormat as I suggested
>> before, you can use that feature.
>>
>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> I have a directory containing a list of files, each one containing a
>>> kryo-serialized object.
>>> With json serialized objects I don't have that problem (but there I use
>>>  env.readTextFile(path.withParameters(parameters)
>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>
>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> I don't know your use case.
>>>> The InputFormat interface is very flexible. Directories can be
>>>> recursively read. A file can contain one or more objects. You can also make
>>>> a smarter IF and put multiple (small) files into one split...
>>>>
>>>> It is up to your use case what you need to implement.
>>>>
>>>>
>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> Should this be the case just reading recursively an entire directory
>>>>> containing one object per file?
>>>>>
>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You could implement your own InputFormat based on FileInputFormat and
>>>>>> overwrite the createInputSplits method to just create a single split per
>>>>>> file.
>>>>>>
>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>>
>>>>>>> So what should I do?
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ah, I checked the code.
>>>>>>>>
>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>> BinaryOutputFormat.
>>>>>>>> So you cannot use the BinaryInputFormat to read a file which does
>>>>>>>> not provide the metadata.
>>>>>>>>
>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>
>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>
>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> I also tried with
>>>>>>>>>>>
>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>
>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>
>>>>>>>>>>> Moreover, in this example I put exactly one object per file so
>>>>>>>>>>> it should be able to deserialize it, right?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a 
>>>>>>>>>>>> parallelism of
>>>>>>>>>>>> 1.
>>>>>>>>>>>> Writing binary files in a way that they can be read in parallel
>>>>>>>>>>>> is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>
>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>
>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>
>>>>>>>>>>>>> ...
>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>
>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>
>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>
>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>
>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>> type) {
>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>> }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to