Thanks that worked. However, what I don't understand is wouldn't the open call that I am inheriting have this logic already inbuilt? I am inheriting FileInputFormat.
On Tue, Aug 1, 2017 at 1:42 AM, Fabian Hueske <fhue...@gmail.com> wrote: > An InputFormat processes multiple InputSplits. open() is called for each > InputSplit. > If you don't reset reached to false in open() you will only read a single > (i.e., the first) InputSplit and skip all others. > > I'd override open as follows: > > public void open(FileInputSplit fileSplit) throws IOException { > super.open(); > reached = false; > } > > Cheers, Fabian > > > 2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>: > >> I didn't override open. I am using open that got inherited from >> FileInputFormat . Am I supposed to specifically override open? >> >> On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Do you set reached to false in open()? >>> >>> >>> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanch...@gmail.com >>> >: >>> >>> And here is the inputformat code: >>> >>> public class PDFFileInputFormat extends FileInputFormat<String> { >>> /** >>> * >>> */ >>> private static final long serialVersionUID = -4137283038479003711L; >>> private static final Logger logger = LoggerFactory >>> .getLogger(PDFInputFormat.class.getName()); >>> private boolean reached = false; >>> @Override >>> public boolean reachedEnd() throws IOException { >>> logger.info("called reached " + reached); >>> // TODO Auto-generated method stub >>> return reached; >>> } >>> @Override >>> public String nextRecord(String reuse) throws IOException { >>> logger.info("This is where you parse PDF"); >>> String content = new String( >>> Files.readAllBytes(Paths.get(this.currentSplit.getPath() >>> .getPath()))); >>> logger.info("Content " + content); >>> reached = true; >>> return content; >>> } >>> } >>> >>> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanch...@gmail.com> >>> wrote: >>> >>>> I have a very simple program that just reads all the files in the path. >>>> However, flink is not working as expected. >>>> >>>> Everytime I execute this job I only see flink reading 2 files, even >>>> though there are more in that directory. On closer look it appears that it >>>> might be related to: >>>> >>>> [flink-akka.actor.default-dispatcher-3] INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 >>>> task slot(s). >>>> >>>> My question is, isn't flink supposed to iterate over the directory >>>> after those 2 slots become free again? I am assuming this problem is caused >>>> because there are only 2 slots. >>>> >>>> >>>> Code --- >>>> >>>> PDFFileInputFormat format = new PDFFileInputFormat(); >>>> format.setFilePath(args[0]); >>>> format.setNestedFileEnumeration(true); >>>> logger.info("Number of splits " + format.getNumSplits()); >>>> >>>> // logger.info(Paths.get(".").toAbsolutePath().normalize().toSt >>>> ring()); >>>> >>>> env.createInput(format, TypeInformation.of(StringValue >>>> .class)).print(); >>>> >>> >>> >>> >> >