Thanks, Marton, for the summary. The PollingStreamSource essentially follows the API suggested by Aljoscha, and will probably internally use a backoff sleep time (as Matthias pointed out), so we really have arrived at a mix of techniques ;-)
On Mon, May 11, 2015 at 4:12 PM, Márton Balassi <balassi.mar...@gmail.com> wrote: > We had a conversation with Stephan, Aljoscha, Gyula and Paris and converged > on the following outline for the streaming source interface. The question > is tricky because we need to coordinate between the actual source > computation and triggering the checkpointing of the state of the source. > > We should provide two interfaces, let us refer to the as StreamSource and > PollingStreamSource for now. > > StreamSource has two methods: next() and reachedEnd() as outlined in > Stephan's recent pull request. [1] The main idea here that reachedEnd() > should only return false when no more data will arrive and the source can > be closed. As a consequence when no data is currently available, but might > arrive in the future the methods of the interface can be blocking > operations. To properly coordinate this with checkpointing the state of the > sources these blocking calls should be able to forward > InterruptedExceptions, so that the checkpointed can be done when triggered. > This puts expectations on the usercode. > > To provide an option where this behavior is not enforceable we would > introduce PollingStreamSource which in addition to the next() and > reachedEnd() methods would have a hasNext() method that would return true > when data is currently available. Here all of the functions are expected to > return almost immediately (so that checkpointed is not delayed too much > when triggered), but usercode is not expected to handle interrupts. > > [1] https://github.com/apache/flink/pull/643 > > On Mon, May 11, 2015 at 10:37 AM, Gyula Fóra <gyula.f...@gmail.com> wrote: > > > I would not go into this direction. Returning lists is messy I think. I > > would stick with hasNext and Next returning a single element > > > > On Mon, May 11, 2015 at 10:20 AM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > We could also change next() to return List<T> and say that the method > > > must not sit and wait but simply return stuff that is available > > > without waiting while also being able to not return anything for the > > > moment. > > > > > > On Fri, May 8, 2015 at 12:05 PM, Matthias J. Sax > > > <mj...@informatik.hu-berlin.de> wrote: > > > > You are right. That is why I pointed out this already: > > > > > > > >> -> You could force the UDF to return each time, be disallowing > > > >>>> consecutive calls to Collector.out(...). > > > > > > > > The Storm design would avoid the "NULL-Problem" Aljoscha mentioned, > > too. > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 05/08/2015 10:59 AM, Gyula Fóra wrote: > > > >> I think the problem with this void next() approach is exactly the > way > > it > > > >> works: > > > >> > > > >> "Using this interface, "next()" can loop internally as long > > > >> as tuples are available and return if there is (currently) no > input." > > > >> > > > >> We dont want the user to loop internally in the next because then we > > > have > > > >> almost the same problem as now with the run(). We want to do > snapshots > > > >> between 2 produced source elements, roughly the same time at all the > > > >> sources so we cannot afford waiting for some random user behaviour > to > > > >> finish. > > > >> > > > >> > > > >> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax < > > > >> mj...@informatik.hu-berlin.de> wrote: > > > >> > > > >>> Did you consider the Storm way to handle this? > > > >>> > > > >>> Storm offers a method "void next()" that uses a collector object to > > > emit > > > >>> new tuples. Using this interface, "next()" can loop internally as > > long > > > >>> as tuples are available and return if there is (currently) no > input. > > > >>> What I have seen, people tend to emit a single tuple an leave > next() > > > >>> immediately, because Storm call next() in an infinite loop anyway. > > > >>> -> You could force the UDF to return each time, be disallowing > > > >>> consecutive calls to Collector.out(...). > > > >>> > > > >>> If next() is called by the system and it returns, it is easy to > check > > > if > > > >>> the out(..) method of the collector object was called at least > once. > > If > > > >>> the recored was emitted, Storm "sleeps" for a while before calling > > > >>> next() again, to avoid busy waiting. The sleeping time is increased > > for > > > >>> consecutive "empty" next() calls and reset the first time next() > > emits > > > >>> records again. > > > >>> > > > >>> I like this interface, because it's very simple and would prefer it > > > over > > > >>> an interface with many methods. > > > >>> > > > >>> > > > >>> -Matthias > > > >>> > > > >>> > > > >>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote: > > > >>>> Hi, > > > >>>> in the process of reworking the Streaming Operator model I'm also > > > >>>> reworking the sources in order to get rid of the loop in each > > source. > > > >>>> Right now, the interface for sources (SourceFunction) has one > > method: > > > >>>> run(). This is called when the source starts and can just output > > > >>>> elements at any time using the Collector interface. This does not > > give > > > >>>> the Task that runs the source a lot of control in suspending > > operation > > > >>>> for performing checkpoints or some such thing. > > > >>>> > > > >>>> I thought about changing the interface to this: > > > >>>> > > > >>>> interface SourceFunction<T> { > > > >>>> boolean reachedEnd(); > > > >>>> T next(); > > > >>>> } > > > >>>> > > > >>>> This is similar to the batch API and also to what Stephan proposes > > in > > > >>>> his pull request. I think this will not work for streaming because > > > >>>> sources might not have new elements to emit at the moment but > might > > > >>>> have something to emit in the future. This is problematic because > > > >>>> streaming topologies are usually running indefinitely. In that > case, > > > >>>> the reachedEnd() and next() would have to be blocking (until a new > > > >>>> element arrives). This again does not give the task the power to > > > >>>> suspend operation at will. > > > >>>> > > > >>>> I propose a three function interface: > > > >>>> > > > >>>> interface SourceFunction<T> { > > > >>>> boolean reachedEnd(): > > > >>>> boolean hasNext(): > > > >>>> T next(); > > > >>>> } > > > >>>> > > > >>>> where the contract for the source is as follows: > > > >>>> - reachedEnd() == true => stop the source > > > >>>> - hasNext() == true => call next() to retrieve next element > > > >>>> - hasNext() == false => call again at some later point > > > >>>> - next() => retrieve next element, throw exception if no element > > > >>> available > > > >>>> > > > >>>> I thought about allowing next() to return NULL to signal that no > > > >>>> element is available at the moment. This will not work because a > > > >>>> source might want to return NULL as an element. > > > >>>> > > > >>>> What do you think? Any other ideas about solving this? > > > >>>> > > > >>>> Cheers, > > > >>>> Aljoscha > > > >>>> > > > >>> > > > >>> > > > >> > > > > > > > > > >