You are right. That is why I pointed out this already: > -> You could force the UDF to return each time, be disallowing >>> consecutive calls to Collector.out(...).
The Storm design would avoid the "NULL-Problem" Aljoscha mentioned, too. -Matthias On 05/08/2015 10:59 AM, Gyula Fóra wrote: > I think the problem with this void next() approach is exactly the way it > works: > > "Using this interface, "next()" can loop internally as long > as tuples are available and return if there is (currently) no input." > > We dont want the user to loop internally in the next because then we have > almost the same problem as now with the run(). We want to do snapshots > between 2 produced source elements, roughly the same time at all the > sources so we cannot afford waiting for some random user behaviour to > finish. > > > On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax < > mj...@informatik.hu-berlin.de> wrote: > >> Did you consider the Storm way to handle this? >> >> Storm offers a method "void next()" that uses a collector object to emit >> new tuples. Using this interface, "next()" can loop internally as long >> as tuples are available and return if there is (currently) no input. >> What I have seen, people tend to emit a single tuple an leave next() >> immediately, because Storm call next() in an infinite loop anyway. >> -> You could force the UDF to return each time, be disallowing >> consecutive calls to Collector.out(...). >> >> If next() is called by the system and it returns, it is easy to check if >> the out(..) method of the collector object was called at least once. If >> the recored was emitted, Storm "sleeps" for a while before calling >> next() again, to avoid busy waiting. The sleeping time is increased for >> consecutive "empty" next() calls and reset the first time next() emits >> records again. >> >> I like this interface, because it's very simple and would prefer it over >> an interface with many methods. >> >> >> -Matthias >> >> >> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote: >>> Hi, >>> in the process of reworking the Streaming Operator model I'm also >>> reworking the sources in order to get rid of the loop in each source. >>> Right now, the interface for sources (SourceFunction) has one method: >>> run(). This is called when the source starts and can just output >>> elements at any time using the Collector interface. This does not give >>> the Task that runs the source a lot of control in suspending operation >>> for performing checkpoints or some such thing. >>> >>> I thought about changing the interface to this: >>> >>> interface SourceFunction<T> { >>> boolean reachedEnd(); >>> T next(); >>> } >>> >>> This is similar to the batch API and also to what Stephan proposes in >>> his pull request. I think this will not work for streaming because >>> sources might not have new elements to emit at the moment but might >>> have something to emit in the future. This is problematic because >>> streaming topologies are usually running indefinitely. In that case, >>> the reachedEnd() and next() would have to be blocking (until a new >>> element arrives). This again does not give the task the power to >>> suspend operation at will. >>> >>> I propose a three function interface: >>> >>> interface SourceFunction<T> { >>> boolean reachedEnd(): >>> boolean hasNext(): >>> T next(); >>> } >>> >>> where the contract for the source is as follows: >>> - reachedEnd() == true => stop the source >>> - hasNext() == true => call next() to retrieve next element >>> - hasNext() == false => call again at some later point >>> - next() => retrieve next element, throw exception if no element >> available >>> >>> I thought about allowing next() to return NULL to signal that no >>> element is available at the moment. This will not work because a >>> source might want to return NULL as an element. >>> >>> What do you think? Any other ideas about solving this? >>> >>> Cheers, >>> Aljoscha >>> >> >> >
signature.asc
Description: OpenPGP digital signature