An InputFormat processes multiple InputSplits. open() is called for each InputSplit. If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others.
I'd override open as follows: public void open(FileInputSplit fileSplit) throws IOException { super.open(); reached = false; } Cheers, Fabian 2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>: > I didn't override open. I am using open that got inherited from > FileInputFormat . Am I supposed to specifically override open? > > On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Do you set reached to false in open()? >> >> >> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanch...@gmail.com >> >: >> >> And here is the inputformat code: >> >> public class PDFFileInputFormat extends FileInputFormat<String> { >> /** >> * >> */ >> private static final long serialVersionUID = -4137283038479003711L; >> private static final Logger logger = LoggerFactory >> .getLogger(PDFInputFormat.class.getName()); >> private boolean reached = false; >> @Override >> public boolean reachedEnd() throws IOException { >> logger.info("called reached " + reached); >> // TODO Auto-generated method stub >> return reached; >> } >> @Override >> public String nextRecord(String reuse) throws IOException { >> logger.info("This is where you parse PDF"); >> String content = new String( >> Files.readAllBytes(Paths.get(this.currentSplit.getPath() >> .getPath()))); >> logger.info("Content " + content); >> reached = true; >> return content; >> } >> } >> >> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanch...@gmail.com> >> wrote: >> >>> I have a very simple program that just reads all the files in the path. >>> However, flink is not working as expected. >>> >>> Everytime I execute this job I only see flink reading 2 files, even >>> though there are more in that directory. On closer look it appears that it >>> might be related to: >>> >>> [flink-akka.actor.default-dispatcher-3] INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 >>> task slot(s). >>> >>> My question is, isn't flink supposed to iterate over the directory after >>> those 2 slots become free again? I am assuming this problem is caused >>> because there are only 2 slots. >>> >>> >>> Code --- >>> >>> PDFFileInputFormat format = new PDFFileInputFormat(); >>> format.setFilePath(args[0]); >>> format.setNestedFileEnumeration(true); >>> logger.info("Number of splits " + format.getNumSplits()); >>> >>> // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt >>> ring()); >>> >>> env.createInput(format, TypeInformation.of(StringValue >>> .class)).print(); >>> >> >> >> >