Hi, Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true. So, it should not return true until nextRecord() was called and the first and last record was emitted.
You might also want to built your PDFFileInputFormat on FileInputFormat and set unsplittable to true. FileInputFormat comes with lots of built-in functionality such as InputSplit generation. Cheers, Fabian 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>: > Hi, > > I created a custom input format. Idea behind this is to read all binary > files from a directory and use each file as it's own split. Each split is > read as one whole record. When I run it in flink I don't get any error but > I am not seeing any output from .print. Am I missing something? > > ---- > > *public* *class* *PDFFileInputFormat* *extends* > RichInputFormat<StringValue, InputSplit> { > > *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger*( > PDFFileInputFormat.*class*.getName()); > > PDFFileInputSplit current = *null*; > > *public* *static* *void* main(String... args) *throws* Exception { > > PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test"); > > InputSplit[] splits = pdfReader.createInputSplits(1); > > pdfReader.open(splits[0]); > > pdfReader.nextRecord(*null*); > > *final* ExecutionEnvironment env = ExecutionEnvironment. > *getExecutionEnvironment*(); > > env.fromElements(1, 2, 3) > > // returns the squared i > > .print(); > > PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"); > > InputFormatSourceFunction<StringValue> *reader* = *new* > InputFormatSourceFunction<>(format, > > TypeInformation.*of*(StringValue.*class*)); > > env.createInput(format,TypeInformation.*of*(StringValue.*class*)).print(); > > } > > String path = *null*; > > *public* PDFFileInputFormat(String path) { > > *this*.path = path; > > } > > *public* *void* configure(Configuration parameters) { > > // *TODO* Auto-generated method stub > > } > > *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics) > *throws* IOException { > > // *TODO* Auto-generated method stub > > *return* cachedStatistics; > > } > > *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws* > IOException { > > *final* List<PDFFileInputSplit> splits = *new* > ArrayList<PDFFileInputSplit>(); > > Files.*list*(Paths.*get*(path)).forEach(f -> { > > PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f); > > splits.add(split); > > }); > > PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits.size( > )]; > > *return* splits.toArray(inputSplitArray); > > } > > *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) > { > > *logger*.info("Assigner"); > > // *TODO* Auto-generated method stub > > *return* *new* DefaultInputSplitAssigner(inputSplits); > > } > > *public* *void* open(InputSplit split) *throws* IOException { > > *this*.current = (PDFFileInputSplit) split; > > } > > *public* *boolean* reachedEnd() *throws* IOException { > > // *TODO* Auto-generated method stub > > *return* *true*; > > } > > *public* StringValue nextRecord(StringValue reuse) *throws* IOException { > > String content = *new* String(Files.*readAllBytes*(*this*.current > .getFile())); > > *logger*.info("Content " + content); > > *return* *new* StringValue(content); > > } > > *public* *void* close() *throws* IOException { > > // *TODO* Auto-generated method stub > > } > > } > > --- > > > Thanks, > > Mohit > > >