Thanks, Marton, for the summary.

The PollingStreamSource essentially follows the API suggested by Aljoscha,
and will probably internally use a backoff sleep time (as Matthias pointed
out), so
we really have arrived at a mix of techniques ;-)

On Mon, May 11, 2015 at 4:12 PM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> We had a conversation with Stephan, Aljoscha, Gyula and Paris and converged
> on the following outline for the streaming source interface. The question
> is tricky because we need to coordinate between the actual source
> computation and triggering the checkpointing of the state of the source.
>
> We should provide two interfaces, let us refer to the as StreamSource and
> PollingStreamSource for now.
>
> StreamSource has two methods: next() and reachedEnd() as outlined in
> Stephan's recent pull request. [1] The main idea here that reachedEnd()
> should only return false when no more data will arrive and the source can
> be closed. As a consequence when no data is currently available, but might
> arrive in the future the methods of the interface can be blocking
> operations. To properly coordinate this with checkpointing the state of the
> sources these blocking calls should be able to forward
> InterruptedExceptions, so that the checkpointed can be done when triggered.
> This puts expectations on the usercode.
>
> To provide an option where this behavior is not enforceable we would
> introduce PollingStreamSource which in addition to the next() and
> reachedEnd() methods would have a hasNext() method that would return true
> when data is currently available. Here all of the functions are expected to
> return almost immediately (so that checkpointed is not delayed too much
> when triggered), but usercode is not expected to handle interrupts.
>
> [1] https://github.com/apache/flink/pull/643
>
> On Mon, May 11, 2015 at 10:37 AM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > I would not go into this direction. Returning lists is messy I think. I
> > would stick with hasNext and Next returning a single element
> >
> > On Mon, May 11, 2015 at 10:20 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > We could also change next() to return List<T> and say that the method
> > > must not sit and wait but simply return stuff that is available
> > > without waiting while also being able to not return anything for the
> > > moment.
> > >
> > > On Fri, May 8, 2015 at 12:05 PM, Matthias J. Sax
> > > <mj...@informatik.hu-berlin.de> wrote:
> > > > You are right. That is why I pointed out this already:
> > > >
> > > >> -> You could force the UDF to return each time, be disallowing
> > > >>>> consecutive calls to Collector.out(...).
> > > >
> > > > The Storm design would avoid the "NULL-Problem" Aljoscha mentioned,
> > too.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 05/08/2015 10:59 AM, Gyula Fóra wrote:
> > > >> I think the problem with this void next() approach is exactly the
> way
> > it
> > > >> works:
> > > >>
> > > >> "Using this interface, "next()" can loop internally as long
> > > >> as tuples are available and return if there is (currently) no
> input."
> > > >>
> > > >> We dont want the user to loop internally in the next because then we
> > > have
> > > >> almost the same problem as now with the run(). We want to do
> snapshots
> > > >> between 2 produced source elements, roughly the same time at all the
> > > >> sources so we cannot afford waiting for some random user behaviour
> to
> > > >> finish.
> > > >>
> > > >>
> > > >> On Fri, May 8, 2015 at 10:47 AM, Matthias J. Sax <
> > > >> mj...@informatik.hu-berlin.de> wrote:
> > > >>
> > > >>> Did you consider the Storm way to handle this?
> > > >>>
> > > >>> Storm offers a method "void next()" that uses a collector object to
> > > emit
> > > >>> new tuples. Using this interface, "next()" can loop internally as
> > long
> > > >>> as tuples are available and return if there is (currently) no
> input.
> > > >>> What I have seen, people tend to emit a single tuple an leave
> next()
> > > >>> immediately, because Storm call next() in an infinite loop anyway.
> > > >>> -> You could force the UDF to return each time, be disallowing
> > > >>> consecutive calls to Collector.out(...).
> > > >>>
> > > >>> If next() is called by the system and it returns, it is easy to
> check
> > > if
> > > >>> the out(..) method of the collector object was called at least
> once.
> > If
> > > >>> the recored was emitted, Storm "sleeps" for a while before calling
> > > >>> next() again, to avoid busy waiting. The sleeping time is increased
> > for
> > > >>> consecutive "empty" next() calls and reset the first time next()
> > emits
> > > >>> records again.
> > > >>>
> > > >>> I like this interface, because it's very simple and would prefer it
> > > over
> > > >>> an interface with many methods.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>>
> > > >>> On 05/08/2015 10:16 AM, Aljoscha Krettek wrote:
> > > >>>> Hi,
> > > >>>> in the process of reworking the Streaming Operator model I'm also
> > > >>>> reworking the sources in order to get rid of the loop in each
> > source.
> > > >>>> Right now, the interface for sources (SourceFunction) has one
> > method:
> > > >>>> run(). This is called when the source starts and can just output
> > > >>>> elements at any time using the Collector interface. This does not
> > give
> > > >>>> the Task that runs the source a lot of control in suspending
> > operation
> > > >>>> for performing checkpoints or some such thing.
> > > >>>>
> > > >>>> I thought about changing the interface to this:
> > > >>>>
> > > >>>> interface SourceFunction<T>  {
> > > >>>>   boolean reachedEnd();
> > > >>>>   T next();
> > > >>>> }
> > > >>>>
> > > >>>> This is similar to the batch API and also to what Stephan proposes
> > in
> > > >>>> his pull request. I think this will not work for streaming because
> > > >>>> sources might not have new elements to emit at the moment but
> might
> > > >>>> have something to emit in the future. This is problematic because
> > > >>>> streaming topologies are usually running indefinitely. In that
> case,
> > > >>>> the reachedEnd() and next() would have to be blocking (until a new
> > > >>>> element arrives). This again does not give the task the power to
> > > >>>> suspend operation at will.
> > > >>>>
> > > >>>> I propose a three function interface:
> > > >>>>
> > > >>>> interface SourceFunction<T> {
> > > >>>>   boolean reachedEnd():
> > > >>>>   boolean hasNext():
> > > >>>>   T next();
> > > >>>> }
> > > >>>>
> > > >>>> where the contract for the source is as follows:
> > > >>>>  - reachedEnd() == true => stop the source
> > > >>>>  - hasNext() == true => call next() to retrieve next element
> > > >>>>  - hasNext() == false => call again at some later point
> > > >>>>  - next() => retrieve next element, throw exception if no element
> > > >>> available
> > > >>>>
> > > >>>> I thought about allowing next() to return NULL to signal that no
> > > >>>> element is available at the moment. This will not work because a
> > > >>>> source might want to return NULL as an element.
> > > >>>>
> > > >>>> What do you think? Any other ideas about solving this?
> > > >>>>
> > > >>>> Cheers,
> > > >>>> Aljoscha
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> >
>

Reply via email to