Thanks. Few more questions:

- Is there an example for FileInputFormat?
- how to make it read all the files in a directory?
- how to make an inputformat a streaming input instead of batch? Eg: read
as new files come to a dir.

Thanks again.

On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Flink calls the reachedEnd() method before it calls nextRecord() and
> closes the IF when reachedEnd() returns true.
> So, it should not return true until nextRecord() was called and the first
> and last record was emitted.
>
> You might also want to built your PDFFileInputFormat on FileInputFormat
> and set unsplittable to true.
> FileInputFormat comes with lots of built-in functionality such as
> InputSplit generation.
>
> Cheers, Fabian
>
> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>:
>
>> Hi,
>>
>> I created a custom input format. Idea behind this is to read all binary
>> files from a directory and use each file as it's own split. Each split is
>> read as one whole record. When I run it in flink I don't get any error but
>> I am not seeing any output from .print. Am I missing something?
>>
>> ----
>>
>> *public* *class* *PDFFileInputFormat* *extends*
>> RichInputFormat<StringValue, InputSplit> {
>>
>> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger*
>> (PDFFileInputFormat.*class*.getName());
>>
>> PDFFileInputSplit current = *null*;
>>
>> *public* *static* *void* main(String... args) *throws* Exception {
>>
>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test"
>> );
>>
>> InputSplit[] splits = pdfReader.createInputSplits(1);
>>
>> pdfReader.open(splits[0]);
>>
>> pdfReader.nextRecord(*null*);
>>
>> *final* ExecutionEnvironment env = ExecutionEnvironment.
>> *getExecutionEnvironment*();
>>
>> env.fromElements(1, 2, 3)
>>
>> // returns the squared i
>>
>> .print();
>>
>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test");
>>
>> InputFormatSourceFunction<StringValue> *reader* = *new*
>> InputFormatSourceFunction<>(format,
>>
>> TypeInformation.*of*(StringValue.*class*));
>>
>> env.createInput(format,TypeInformation.*of*(StringValue.*class*)
>> ).print();
>>
>> }
>>
>> String path = *null*;
>>
>> *public* PDFFileInputFormat(String path) {
>>
>> *this*.path = path;
>>
>> }
>>
>> *public* *void* configure(Configuration parameters) {
>>
>> // *TODO* Auto-generated method stub
>>
>> }
>>
>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics)
>> *throws* IOException {
>>
>> // *TODO* Auto-generated method stub
>>
>> *return* cachedStatistics;
>>
>> }
>>
>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws*
>> IOException {
>>
>> *final* List<PDFFileInputSplit> splits = *new*
>> ArrayList<PDFFileInputSplit>();
>>
>> Files.*list*(Paths.*get*(path)).forEach(f -> {
>>
>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f);
>>
>> splits.add(split);
>>
>> });
>>
>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits
>> .size()];
>>
>> *return* splits.toArray(inputSplitArray);
>>
>> }
>>
>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[]
>> inputSplits) {
>>
>> *logger*.info("Assigner");
>>
>> // *TODO* Auto-generated method stub
>>
>> *return* *new* DefaultInputSplitAssigner(inputSplits);
>>
>> }
>>
>> *public* *void* open(InputSplit split) *throws* IOException {
>>
>> *this*.current = (PDFFileInputSplit) split;
>>
>> }
>>
>> *public* *boolean* reachedEnd() *throws* IOException {
>>
>> // *TODO* Auto-generated method stub
>>
>> *return* *true*;
>>
>> }
>>
>> *public* StringValue nextRecord(StringValue reuse) *throws* IOException {
>>
>> String content = *new* String(Files.*readAllBytes*(*this*.current
>> .getFile()));
>>
>> *logger*.info("Content " + content);
>>
>> *return* *new* StringValue(content);
>>
>> }
>>
>> *public* *void* close() *throws* IOException {
>>
>> // *TODO* Auto-generated method stub
>>
>> }
>>
>> }
>>
>> ---
>>
>>
>> Thanks,
>>
>> Mohit
>>
>>
>>
>

Reply via email to