You are right. That is why I pointed out this already:

> -> You could force the UDF to return each time, be disallowing
>>> consecutive calls to Collector.out(...).

The Storm design would avoid the "NULL-Problem" Aljoscha mentioned, too.


-Matthias



On 05/08/2015 10:59 AM, Gyula Fóra wrote:
> I think the problem with this void next() approach is exactly the way it
> works:
> 
> "Using this interface, "next()" can loop internally as long
> as tuples are available and return if there is (currently) no input."
> 
> We dont want the user to loop internally in the next because then we have
> almost the same problem as now with the run(). We want to do snapshots
> between 2 produced source elements, roughly the same time at all the
> sources so we cannot afford waiting for some random user behaviour to
> finish.
> 
> 
> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
> mj...@informatik.hu-berlin.de> wrote:
> 
>> Did you consider the Storm way to handle this?
>>
>> Storm offers a method "void next()" that uses a collector object to emit
>> new tuples. Using this interface, "next()" can loop internally as long
>> as tuples are available and return if there is (currently) no input.
>> What I have seen, people tend to emit a single tuple an leave next()
>> immediately, because Storm call next() in an infinite loop anyway.
>> -> You could force the UDF to return each time, be disallowing
>> consecutive calls to Collector.out(...).
>>
>> If next() is called by the system and it returns, it is easy to check if
>> the out(..) method of the collector object was called at least once. If
>> the recored was emitted, Storm "sleeps" for a while before calling
>> next() again, to avoid busy waiting. The sleeping time is increased for
>> consecutive "empty" next() calls and reset the first time next() emits
>> records again.
>>
>> I like this interface, because it's very simple and would prefer it over
>> an interface with many methods.
>>
>>
>> -Matthias
>>
>>
>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
>>> Hi,
>>> in the process of reworking the Streaming Operator model I'm also
>>> reworking the sources in order to get rid of the loop in each source.
>>> Right now, the interface for sources (SourceFunction) has one method:
>>> run(). This is called when the source starts and can just output
>>> elements at any time using the Collector interface. This does not give
>>> the Task that runs the source a lot of control in suspending operation
>>> for performing checkpoints or some such thing.
>>>
>>> I thought about changing the interface to this:
>>>
>>> interface SourceFunction<T>  {
>>>   boolean reachedEnd();
>>>   T next();
>>> }
>>>
>>> This is similar to the batch API and also to what Stephan proposes in
>>> his pull request. I think this will not work for streaming because
>>> sources might not have new elements to emit at the moment but might
>>> have something to emit in the future. This is problematic because
>>> streaming topologies are usually running indefinitely. In that case,
>>> the reachedEnd() and next() would have to be blocking (until a new
>>> element arrives). This again does not give the task the power to
>>> suspend operation at will.
>>>
>>> I propose a three function interface:
>>>
>>> interface SourceFunction<T> {
>>>   boolean reachedEnd():
>>>   boolean hasNext():
>>>   T next();
>>> }
>>>
>>> where the contract for the source is as follows:
>>>  - reachedEnd() == true => stop the source
>>>  - hasNext() == true => call next() to retrieve next element
>>>  - hasNext() == false => call again at some later point
>>>  - next() => retrieve next element, throw exception if no element
>> available
>>>
>>> I thought about allowing next() to return NULL to signal that no
>>> element is available at the moment. This will not work because a
>>> source might want to return NULL as an element.
>>>
>>> What do you think? Any other ideas about solving this?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to