An InputFormat processes multiple InputSplits. open() is called for each
InputSplit.
If you don't reset reached to false in open() you will only read a single
(i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {
  super.open();
  reached = false;
}

Cheers, Fabian

2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>:

> I didn't override open. I am using open that got inherited from
> FileInputFormat . Am I supposed to specifically override open?
>
> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Do you set reached to false in open()?
>>
>>
>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanch...@gmail.com
>> >:
>>
>> And here is the inputformat code:
>>
>> public class PDFFileInputFormat extends FileInputFormat<String> {
>>  /**
>>   *
>>   */
>>  private static final long serialVersionUID = -4137283038479003711L;
>>  private static final Logger logger = LoggerFactory
>>    .getLogger(PDFInputFormat.class.getName());
>>  private boolean reached = false;
>>  @Override
>>  public boolean reachedEnd() throws IOException {
>>   logger.info("called reached " + reached);
>>   // TODO Auto-generated method stub
>>   return reached;
>>  }
>>  @Override
>>  public String nextRecord(String reuse) throws IOException {
>>   logger.info("This is where you parse PDF");
>>   String content = new String(
>>     Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>> .getPath())));
>>   logger.info("Content " + content);
>>   reached = true;
>>   return content;
>>  }
>> }
>>
>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanch...@gmail.com>
>> wrote:
>>
>>> I have a very simple program that just reads all the files in the path.
>>> However, flink is not working as expected.
>>>
>>> Everytime I execute this job I only see flink reading 2 files, even
>>> though there are more in that directory. On closer look it appears that it
>>> might be related to:
>>>
>>> [flink-akka.actor.default-dispatcher-3] INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>>> task slot(s).
>>>
>>> My question is, isn't flink supposed to iterate over the directory after
>>> those 2 slots become free again? I am assuming this problem is caused
>>> because there are only 2 slots.
>>>
>>>
>>> Code ---
>>>
>>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>>   format.setFilePath(args[0]);
>>>   format.setNestedFileEnumeration(true);
>>>   logger.info("Number of splits " + format.getNumSplits());
>>>
>>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>>> ring());
>>>
>>>   env.createInput(format, TypeInformation.of(StringValue
>>> .class)).print();
>>>
>>
>>
>>
>

Reply via email to