If you have more files than task slots, then some tasks will get multiple
files. That means that open() and close() are called multiple times on the
input format.

Make sure that your input format tolerates that and does not get confused
with lingering state (maybe create a new SimpleInputProjection as well)

On Mon, Oct 5, 2015 at 12:41 PM, Pieter Hameete <phame...@gmail.com> wrote:

> Hi Stephen,
>
> it concerns the DataSet API.
>
> The program im running can be found at
> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala
> The Custom Input Format at
> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java
>
> Cheers!
>
> 2015-10-05 12:38 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
>> I assume this concerns the streaming API?
>>
>> Can you share your program and/or the custom input format code?
>>
>> On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete <phame...@gmail.com>
>> wrote:
>>
>>> Hello Flinkers!
>>>
>>> I run into some strange behavior when reading from a folder of input
>>> files.
>>>
>>> When the number of input files in the folder exceeds the number of task
>>> slots I noticed that the size of my datasets varies with each run. It seems
>>> as if the transformations don't wait for all input files to be read.
>>>
>>> When I have equal or more task slots than there are files, there are no
>>> problems.
>>>
>>> I'm using a custom input format. Could there be a problem with my custom
>>> input format, and if so what could I be forgetting?
>>>
>>> Kind regards and thank you for your time!
>>>
>>> Pieter
>>>
>>
>>
>

Reply via email to