Thanks that worked. However, what I don't understand is wouldn't the open
call that I am inheriting have this logic already inbuilt? I am inheriting
FileInputFormat.

On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> An InputFormat processes multiple InputSplits. open() is called for each
> InputSplit.
> If you don't reset reached to false in open() you will only read a single
> (i.e., the first) InputSplit and skip all others.
>
> I'd override open as follows:
>
> public void open(FileInputSplit fileSplit) throws IOException {
>   super.open();
>   reached = false;
> }
>
> Cheers, Fabian
>
>
> 2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>:
>
>> I didn't override open. I am using open that got inherited from
>> FileInputFormat . Am I supposed to specifically override open?
>>
>> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Do you set reached to false in open()?
>>>
>>>
>>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanch...@gmail.com
>>> >:
>>>
>>> And here is the inputformat code:
>>>
>>> public class PDFFileInputFormat extends FileInputFormat<String> {
>>>  /**
>>>   *
>>>   */
>>>  private static final long serialVersionUID = -4137283038479003711L;
>>>  private static final Logger logger = LoggerFactory
>>>    .getLogger(PDFInputFormat.class.getName());
>>>  private boolean reached = false;
>>>  @Override
>>>  public boolean reachedEnd() throws IOException {
>>>   logger.info("called reached " + reached);
>>>   // TODO Auto-generated method stub
>>>   return reached;
>>>  }
>>>  @Override
>>>  public String nextRecord(String reuse) throws IOException {
>>>   logger.info("This is where you parse PDF");
>>>   String content = new String(
>>>     Files.readAllBytes(Paths.get(this.currentSplit.getPath()
>>> .getPath())));
>>>   logger.info("Content " + content);
>>>   reached = true;
>>>   return content;
>>>  }
>>> }
>>>
>>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanch...@gmail.com>
>>> wrote:
>>>
>>>> I have a very simple program that just reads all the files in the path.
>>>> However, flink is not working as expected.
>>>>
>>>> Everytime I execute this job I only see flink reading 2 files, even
>>>> though there are more in that directory. On closer look it appears that it
>>>> might be related to:
>>>>
>>>> [flink-akka.actor.default-dispatcher-3] INFO
>>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>>>> task slot(s).
>>>>
>>>> My question is, isn't flink supposed to iterate over the directory
>>>> after those 2 slots become free again? I am assuming this problem is caused
>>>> because there are only 2 slots.
>>>>
>>>>
>>>> Code ---
>>>>
>>>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>>>   format.setFilePath(args[0]);
>>>>   format.setNestedFileEnumeration(true);
>>>>   logger.info("Number of splits " + format.getNumSplits());
>>>>
>>>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt
>>>> ring());
>>>>
>>>>   env.createInput(format, TypeInformation.of(StringValue
>>>> .class)).print();
>>>>
>>>
>>>
>>>
>>
>

Reply via email to