Hi Mohit, as Ted said, there are plenty of InputFormats which are based on FileInputFormat. FileInputFormat also supports reading all files in a directory. Simply specify the path of the directory.
Check StreamExecutionEnvironment.createFileInput() which takes a several parameters such as a FileInputFormat and a time interval in which the directory is periodically checked. Best, Fabian 2017-07-30 21:31 GMT+02:00 Ted Yu <yuzhih...@gmail.com>: > For #1, you can find quite a few classes which extend FileInputFormat. > e.g. > > flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public > class AvroInputFormat<E> extends FileInputFormat<E> implements > ResultTypeQuer > flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public > abstract class BinaryInputFormat<T> extends FileInputFormat<T> > flink-core/src/main/java/org/apache/flink/api/common/io/ > DelimitedInputFormat.java:public abstract class DelimitedInputFormat<OT> > extends FileInputFormat<OT> implements Checkpoi > > flink-streaming-java/src/test/java/org/apache/flink/ > streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java: > extends FileInputFormat<String> > > FYI > > On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <mohitanch...@gmail.com> > wrote: > >> Thanks. Few more questions: >> >> - Is there an example for FileInputFormat? >> - how to make it read all the files in a directory? >> - how to make an inputformat a streaming input instead of batch? Eg: read >> as new files come to a dir. >> >> Thanks again. >> >> On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhue...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Flink calls the reachedEnd() method before it calls nextRecord() and >>> closes the IF when reachedEnd() returns true. >>> So, it should not return true until nextRecord() was called and the >>> first and last record was emitted. >>> >>> You might also want to built your PDFFileInputFormat on FileInputFormat >>> and set unsplittable to true. >>> FileInputFormat comes with lots of built-in functionality such as >>> InputSplit generation. >>> >>> Cheers, Fabian >>> >>> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>: >>> >>>> Hi, >>>> >>>> I created a custom input format. Idea behind this is to read all binary >>>> files from a directory and use each file as it's own split. Each split is >>>> read as one whole record. When I run it in flink I don't get any error but >>>> I am not seeing any output from .print. Am I missing something? >>>> >>>> ---- >>>> >>>> *public* *class* *PDFFileInputFormat* *extends* >>>> RichInputFormat<StringValue, InputSplit> { >>>> >>>> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger* >>>> (PDFFileInputFormat.*class*.getName()); >>>> >>>> PDFFileInputSplit current = *null*; >>>> >>>> *public* *static* *void* main(String... args) *throws* Exception { >>>> >>>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\ >>>> test"); >>>> >>>> InputSplit[] splits = pdfReader.createInputSplits(1); >>>> >>>> pdfReader.open(splits[0]); >>>> >>>> pdfReader.nextRecord(*null*); >>>> >>>> *final* ExecutionEnvironment env = ExecutionEnvironment. >>>> *getExecutionEnvironment*(); >>>> >>>> env.fromElements(1, 2, 3) >>>> >>>> // returns the squared i >>>> >>>> .print(); >>>> >>>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"); >>>> >>>> InputFormatSourceFunction<StringValue> *reader* = *new* >>>> InputFormatSourceFunction<>(format, >>>> >>>> TypeInformation.*of*(StringValue.*class*)); >>>> >>>> env.createInput(format,TypeInformation.*of*(StringValue.*class*) >>>> ).print(); >>>> >>>> } >>>> >>>> String path = *null*; >>>> >>>> *public* PDFFileInputFormat(String path) { >>>> >>>> *this*.path = path; >>>> >>>> } >>>> >>>> *public* *void* configure(Configuration parameters) { >>>> >>>> // *TODO* Auto-generated method stub >>>> >>>> } >>>> >>>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics) >>>> *throws* IOException { >>>> >>>> // *TODO* Auto-generated method stub >>>> >>>> *return* cachedStatistics; >>>> >>>> } >>>> >>>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws* >>>> IOException { >>>> >>>> *final* List<PDFFileInputSplit> splits = *new* >>>> ArrayList<PDFFileInputSplit>(); >>>> >>>> Files.*list*(Paths.*get*(path)).forEach(f -> { >>>> >>>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f); >>>> >>>> splits.add(split); >>>> >>>> }); >>>> >>>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits >>>> .size()]; >>>> >>>> *return* splits.toArray(inputSplitArray); >>>> >>>> } >>>> >>>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] >>>> inputSplits) { >>>> >>>> *logger*.info("Assigner"); >>>> >>>> // *TODO* Auto-generated method stub >>>> >>>> *return* *new* DefaultInputSplitAssigner(inputSplits); >>>> >>>> } >>>> >>>> *public* *void* open(InputSplit split) *throws* IOException { >>>> >>>> *this*.current = (PDFFileInputSplit) split; >>>> >>>> } >>>> >>>> *public* *boolean* reachedEnd() *throws* IOException { >>>> >>>> // *TODO* Auto-generated method stub >>>> >>>> *return* *true*; >>>> >>>> } >>>> >>>> *public* StringValue nextRecord(StringValue reuse) *throws* >>>> IOException { >>>> >>>> String content = *new* String(Files.*readAllBytes*(*this*.current >>>> .getFile())); >>>> >>>> *logger*.info("Content " + content); >>>> >>>> *return* *new* StringValue(content); >>>> >>>> } >>>> >>>> *public* *void* close() *throws* IOException { >>>> >>>> // *TODO* Auto-generated method stub >>>> >>>> } >>>> >>>> } >>>> >>>> --- >>>> >>>> >>>> Thanks, >>>> >>>> Mohit >>>> >>>> >>>> >>> >> >