For #1, you can find quite a few classes which extend FileInputFormat. e.g.
flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQuer flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public abstract class BinaryInputFormat<T> extends FileInputFormat<T> flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements Checkpoi flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java: extends FileInputFormat<String> FYI On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <mohitanch...@gmail.com> wrote: > Thanks. Few more questions: > > - Is there an example for FileInputFormat? > - how to make it read all the files in a directory? > - how to make an inputformat a streaming input instead of batch? Eg: read > as new files come to a dir. > > Thanks again. > > On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> Flink calls the reachedEnd() method before it calls nextRecord() and >> closes the IF when reachedEnd() returns true. >> So, it should not return true until nextRecord() was called and the first >> and last record was emitted. >> >> You might also want to built your PDFFileInputFormat on FileInputFormat >> and set unsplittable to true. >> FileInputFormat comes with lots of built-in functionality such as >> InputSplit generation. >> >> Cheers, Fabian >> >> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>: >> >>> Hi, >>> >>> I created a custom input format. Idea behind this is to read all binary >>> files from a directory and use each file as it's own split. Each split is >>> read as one whole record. When I run it in flink I don't get any error but >>> I am not seeing any output from .print. Am I missing something? >>> >>> ---- >>> >>> *public* *class* *PDFFileInputFormat* *extends* >>> RichInputFormat<StringValue, InputSplit> { >>> >>> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger* >>> (PDFFileInputFormat.*class*.getName()); >>> >>> PDFFileInputSplit current = *null*; >>> >>> *public* *static* *void* main(String... args) *throws* Exception { >>> >>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test" >>> ); >>> >>> InputSplit[] splits = pdfReader.createInputSplits(1); >>> >>> pdfReader.open(splits[0]); >>> >>> pdfReader.nextRecord(*null*); >>> >>> *final* ExecutionEnvironment env = ExecutionEnvironment. >>> *getExecutionEnvironment*(); >>> >>> env.fromElements(1, 2, 3) >>> >>> // returns the squared i >>> >>> .print(); >>> >>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"); >>> >>> InputFormatSourceFunction<StringValue> *reader* = *new* >>> InputFormatSourceFunction<>(format, >>> >>> TypeInformation.*of*(StringValue.*class*)); >>> >>> env.createInput(format,TypeInformation.*of*(StringValue.*class*) >>> ).print(); >>> >>> } >>> >>> String path = *null*; >>> >>> *public* PDFFileInputFormat(String path) { >>> >>> *this*.path = path; >>> >>> } >>> >>> *public* *void* configure(Configuration parameters) { >>> >>> // *TODO* Auto-generated method stub >>> >>> } >>> >>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics) >>> *throws* IOException { >>> >>> // *TODO* Auto-generated method stub >>> >>> *return* cachedStatistics; >>> >>> } >>> >>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws* >>> IOException { >>> >>> *final* List<PDFFileInputSplit> splits = *new* >>> ArrayList<PDFFileInputSplit>(); >>> >>> Files.*list*(Paths.*get*(path)).forEach(f -> { >>> >>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f); >>> >>> splits.add(split); >>> >>> }); >>> >>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits >>> .size()]; >>> >>> *return* splits.toArray(inputSplitArray); >>> >>> } >>> >>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] >>> inputSplits) { >>> >>> *logger*.info("Assigner"); >>> >>> // *TODO* Auto-generated method stub >>> >>> *return* *new* DefaultInputSplitAssigner(inputSplits); >>> >>> } >>> >>> *public* *void* open(InputSplit split) *throws* IOException { >>> >>> *this*.current = (PDFFileInputSplit) split; >>> >>> } >>> >>> *public* *boolean* reachedEnd() *throws* IOException { >>> >>> // *TODO* Auto-generated method stub >>> >>> *return* *true*; >>> >>> } >>> >>> *public* StringValue nextRecord(StringValue reuse) *throws* IOException >>> { >>> >>> String content = *new* String(Files.*readAllBytes*(*this*.current >>> .getFile())); >>> >>> *logger*.info("Content " + content); >>> >>> *return* *new* StringValue(content); >>> >>> } >>> >>> *public* *void* close() *throws* IOException { >>> >>> // *TODO* Auto-generated method stub >>> >>> } >>> >>> } >>> >>> --- >>> >>> >>> Thanks, >>> >>> Mohit >>> >>> >>> >> >