Thanks. Few more questions: - Is there an example for FileInputFormat? - how to make it read all the files in a directory? - how to make an inputformat a streaming input instead of batch? Eg: read as new files come to a dir.
Thanks again. On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > Flink calls the reachedEnd() method before it calls nextRecord() and > closes the IF when reachedEnd() returns true. > So, it should not return true until nextRecord() was called and the first > and last record was emitted. > > You might also want to built your PDFFileInputFormat on FileInputFormat > and set unsplittable to true. > FileInputFormat comes with lots of built-in functionality such as > InputSplit generation. > > Cheers, Fabian > > 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>: > >> Hi, >> >> I created a custom input format. Idea behind this is to read all binary >> files from a directory and use each file as it's own split. Each split is >> read as one whole record. When I run it in flink I don't get any error but >> I am not seeing any output from .print. Am I missing something? >> >> ---- >> >> *public* *class* *PDFFileInputFormat* *extends* >> RichInputFormat<StringValue, InputSplit> { >> >> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger* >> (PDFFileInputFormat.*class*.getName()); >> >> PDFFileInputSplit current = *null*; >> >> *public* *static* *void* main(String... args) *throws* Exception { >> >> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test" >> ); >> >> InputSplit[] splits = pdfReader.createInputSplits(1); >> >> pdfReader.open(splits[0]); >> >> pdfReader.nextRecord(*null*); >> >> *final* ExecutionEnvironment env = ExecutionEnvironment. >> *getExecutionEnvironment*(); >> >> env.fromElements(1, 2, 3) >> >> // returns the squared i >> >> .print(); >> >> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"); >> >> InputFormatSourceFunction<StringValue> *reader* = *new* >> InputFormatSourceFunction<>(format, >> >> TypeInformation.*of*(StringValue.*class*)); >> >> env.createInput(format,TypeInformation.*of*(StringValue.*class*) >> ).print(); >> >> } >> >> String path = *null*; >> >> *public* PDFFileInputFormat(String path) { >> >> *this*.path = path; >> >> } >> >> *public* *void* configure(Configuration parameters) { >> >> // *TODO* Auto-generated method stub >> >> } >> >> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics) >> *throws* IOException { >> >> // *TODO* Auto-generated method stub >> >> *return* cachedStatistics; >> >> } >> >> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws* >> IOException { >> >> *final* List<PDFFileInputSplit> splits = *new* >> ArrayList<PDFFileInputSplit>(); >> >> Files.*list*(Paths.*get*(path)).forEach(f -> { >> >> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f); >> >> splits.add(split); >> >> }); >> >> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits >> .size()]; >> >> *return* splits.toArray(inputSplitArray); >> >> } >> >> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] >> inputSplits) { >> >> *logger*.info("Assigner"); >> >> // *TODO* Auto-generated method stub >> >> *return* *new* DefaultInputSplitAssigner(inputSplits); >> >> } >> >> *public* *void* open(InputSplit split) *throws* IOException { >> >> *this*.current = (PDFFileInputSplit) split; >> >> } >> >> *public* *boolean* reachedEnd() *throws* IOException { >> >> // *TODO* Auto-generated method stub >> >> *return* *true*; >> >> } >> >> *public* StringValue nextRecord(StringValue reuse) *throws* IOException { >> >> String content = *new* String(Files.*readAllBytes*(*this*.current >> .getFile())); >> >> *logger*.info("Content " + content); >> >> *return* *new* StringValue(content); >> >> } >> >> *public* *void* close() *throws* IOException { >> >> // *TODO* Auto-generated method stub >> >> } >> >> } >> >> --- >> >> >> Thanks, >> >> Mohit >> >> >> >