I would not go into this direction. Returning lists is messy I think. I would stick with hasNext and Next returning a single element
On Mon, May 11, 2015 at 10:20 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > We could also change next() to return List<T> and say that the method > must not sit and wait but simply return stuff that is available > without waiting while also being able to not return anything for the > moment. > > On Fri, May 8, 2015 at 12:05 PM, Matthias J. Sax > <mj...@informatik.hu-berlin.de> wrote: > > You are right. That is why I pointed out this already: > > > >> -> You could force the UDF to return each time, be disallowing > >>>> consecutive calls to Collector.out(...). > > > > The Storm design would avoid the "NULL-Problem" Aljoscha mentioned, too. > > > > > > -Matthias > > > > > > > > On 05/08/2015 10:59 AM, Gyula Fóra wrote: > >> I think the problem with this void next() approach is exactly the way it > >> works: > >> > >> "Using this interface, "next()" can loop internally as long > >> as tuples are available and return if there is (currently) no input." > >> > >> We dont want the user to loop internally in the next because then we > have > >> almost the same problem as now with the run(). We want to do snapshots > >> between 2 produced source elements, roughly the same time at all the > >> sources so we cannot afford waiting for some random user behaviour to > >> finish. > >> > >> > >> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax < > >> mj...@informatik.hu-berlin.de> wrote: > >> > >>> Did you consider the Storm way to handle this? > >>> > >>> Storm offers a method "void next()" that uses a collector object to > emit > >>> new tuples. Using this interface, "next()" can loop internally as long > >>> as tuples are available and return if there is (currently) no input. > >>> What I have seen, people tend to emit a single tuple an leave next() > >>> immediately, because Storm call next() in an infinite loop anyway. > >>> -> You could force the UDF to return each time, be disallowing > >>> consecutive calls to Collector.out(...). > >>> > >>> If next() is called by the system and it returns, it is easy to check > if > >>> the out(..) method of the collector object was called at least once. If > >>> the recored was emitted, Storm "sleeps" for a while before calling > >>> next() again, to avoid busy waiting. The sleeping time is increased for > >>> consecutive "empty" next() calls and reset the first time next() emits > >>> records again. > >>> > >>> I like this interface, because it's very simple and would prefer it > over > >>> an interface with many methods. > >>> > >>> > >>> -Matthias > >>> > >>> > >>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote: > >>>> Hi, > >>>> in the process of reworking the Streaming Operator model I'm also > >>>> reworking the sources in order to get rid of the loop in each source. > >>>> Right now, the interface for sources (SourceFunction) has one method: > >>>> run(). This is called when the source starts and can just output > >>>> elements at any time using the Collector interface. This does not give > >>>> the Task that runs the source a lot of control in suspending operation > >>>> for performing checkpoints or some such thing. > >>>> > >>>> I thought about changing the interface to this: > >>>> > >>>> interface SourceFunction<T> { > >>>> boolean reachedEnd(); > >>>> T next(); > >>>> } > >>>> > >>>> This is similar to the batch API and also to what Stephan proposes in > >>>> his pull request. I think this will not work for streaming because > >>>> sources might not have new elements to emit at the moment but might > >>>> have something to emit in the future. This is problematic because > >>>> streaming topologies are usually running indefinitely. In that case, > >>>> the reachedEnd() and next() would have to be blocking (until a new > >>>> element arrives). This again does not give the task the power to > >>>> suspend operation at will. > >>>> > >>>> I propose a three function interface: > >>>> > >>>> interface SourceFunction<T> { > >>>> boolean reachedEnd(): > >>>> boolean hasNext(): > >>>> T next(); > >>>> } > >>>> > >>>> where the contract for the source is as follows: > >>>> - reachedEnd() == true => stop the source > >>>> - hasNext() == true => call next() to retrieve next element > >>>> - hasNext() == false => call again at some later point > >>>> - next() => retrieve next element, throw exception if no element > >>> available > >>>> > >>>> I thought about allowing next() to return NULL to signal that no > >>>> element is available at the moment. This will not work because a > >>>> source might want to return NULL as an element. > >>>> > >>>> What do you think? Any other ideas about solving this? > >>>> > >>>> Cheers, > >>>> Aljoscha > >>>> > >>> > >>> > >> > > >