Hi Mohit,

as Ted said, there are plenty of InputFormats which are based on
FileInputFormat.
FileInputFormat also supports reading all files in a directory. Simply
specify the path of the directory.

Check StreamExecutionEnvironment.createFileInput() which takes a several
parameters such as a FileInputFormat and a time interval in which the
directory is periodically checked.

Best, Fabian

2017-07-30 21:31 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:

> For #1, you can find quite a few classes which extend FileInputFormat.
> e.g.
>
> flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public
> class AvroInputFormat<E> extends FileInputFormat<E> implements
> ResultTypeQuer
> flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public
> abstract class BinaryInputFormat<T> extends FileInputFormat<T>
> flink-core/src/main/java/org/apache/flink/api/common/io/
> DelimitedInputFormat.java:public abstract class DelimitedInputFormat<OT>
> extends FileInputFormat<OT> implements Checkpoi
>
> flink-streaming-java/src/test/java/org/apache/flink/
> streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java:
>           extends FileInputFormat<String>
>
> FYI
>
> On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <mohitanch...@gmail.com>
> wrote:
>
>> Thanks. Few more questions:
>>
>> - Is there an example for FileInputFormat?
>> - how to make it read all the files in a directory?
>> - how to make an inputformat a streaming input instead of batch? Eg: read
>> as new files come to a dir.
>>
>> Thanks again.
>>
>> On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Flink calls the reachedEnd() method before it calls nextRecord() and
>>> closes the IF when reachedEnd() returns true.
>>> So, it should not return true until nextRecord() was called and the
>>> first and last record was emitted.
>>>
>>> You might also want to built your PDFFileInputFormat on FileInputFormat
>>> and set unsplittable to true.
>>> FileInputFormat comes with lots of built-in functionality such as
>>> InputSplit generation.
>>>
>>> Cheers, Fabian
>>>
>>> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I created a custom input format. Idea behind this is to read all binary
>>>> files from a directory and use each file as it's own split. Each split is
>>>> read as one whole record. When I run it in flink I don't get any error but
>>>> I am not seeing any output from .print. Am I missing something?
>>>>
>>>> ----
>>>>
>>>> *public* *class* *PDFFileInputFormat* *extends*
>>>> RichInputFormat<StringValue, InputSplit> {
>>>>
>>>> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger*
>>>> (PDFFileInputFormat.*class*.getName());
>>>>
>>>> PDFFileInputSplit current = *null*;
>>>>
>>>> *public* *static* *void* main(String... args) *throws* Exception {
>>>>
>>>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\
>>>> test");
>>>>
>>>> InputSplit[] splits = pdfReader.createInputSplits(1);
>>>>
>>>> pdfReader.open(splits[0]);
>>>>
>>>> pdfReader.nextRecord(*null*);
>>>>
>>>> *final* ExecutionEnvironment env = ExecutionEnvironment.
>>>> *getExecutionEnvironment*();
>>>>
>>>> env.fromElements(1, 2, 3)
>>>>
>>>> // returns the squared i
>>>>
>>>> .print();
>>>>
>>>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test");
>>>>
>>>> InputFormatSourceFunction<StringValue> *reader* = *new*
>>>> InputFormatSourceFunction<>(format,
>>>>
>>>> TypeInformation.*of*(StringValue.*class*));
>>>>
>>>> env.createInput(format,TypeInformation.*of*(StringValue.*class*)
>>>> ).print();
>>>>
>>>> }
>>>>
>>>> String path = *null*;
>>>>
>>>> *public* PDFFileInputFormat(String path) {
>>>>
>>>> *this*.path = path;
>>>>
>>>> }
>>>>
>>>> *public* *void* configure(Configuration parameters) {
>>>>
>>>> // *TODO* Auto-generated method stub
>>>>
>>>> }
>>>>
>>>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics)
>>>> *throws* IOException {
>>>>
>>>> // *TODO* Auto-generated method stub
>>>>
>>>> *return* cachedStatistics;
>>>>
>>>> }
>>>>
>>>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws*
>>>> IOException {
>>>>
>>>> *final* List<PDFFileInputSplit> splits = *new*
>>>> ArrayList<PDFFileInputSplit>();
>>>>
>>>> Files.*list*(Paths.*get*(path)).forEach(f -> {
>>>>
>>>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f);
>>>>
>>>> splits.add(split);
>>>>
>>>> });
>>>>
>>>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits
>>>> .size()];
>>>>
>>>> *return* splits.toArray(inputSplitArray);
>>>>
>>>> }
>>>>
>>>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[]
>>>> inputSplits) {
>>>>
>>>> *logger*.info("Assigner");
>>>>
>>>> // *TODO* Auto-generated method stub
>>>>
>>>> *return* *new* DefaultInputSplitAssigner(inputSplits);
>>>>
>>>> }
>>>>
>>>> *public* *void* open(InputSplit split) *throws* IOException {
>>>>
>>>> *this*.current = (PDFFileInputSplit) split;
>>>>
>>>> }
>>>>
>>>> *public* *boolean* reachedEnd() *throws* IOException {
>>>>
>>>> // *TODO* Auto-generated method stub
>>>>
>>>> *return* *true*;
>>>>
>>>> }
>>>>
>>>> *public* StringValue nextRecord(StringValue reuse) *throws*
>>>> IOException {
>>>>
>>>> String content = *new* String(Files.*readAllBytes*(*this*.current
>>>> .getFile()));
>>>>
>>>> *logger*.info("Content " + content);
>>>>
>>>> *return* *new* StringValue(content);
>>>>
>>>> }
>>>>
>>>> *public* *void* close() *throws* IOException {
>>>>
>>>> // *TODO* Auto-generated method stub
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> ---
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Mohit
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to