Thanks for the updates, Matthias.

Both of your questions get an other context, because we have decided to go
back to the run()/cancel() type of source interface - but with a slightly
changed signature to enable "transactional" operator state checkpointing.
You can check out the new source interface here [1] which is part of PR 755.

We hope that PR 755 will be in the master in the following days as it is a
release blocker, so you can plan with those interfaces. As far as the
FiniteSpoutWrapper I think this makes your job easier and you definitely
need the isRunning flag and the cancel method.

[1]
https://github.com/StephanEwen/incubator-flink/blob/stream_sources/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java


On Wed, Jun 3, 2015 at 7:51 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> I just pushed my changes to Marton's "storm" branch.
>
> It is still open how to process with the following (please give feedback):
>
> StormSpoutWrapper:
>   - do we still need "isRunning" and "cancel()"? The new API should make
> them obsolete from my point of view.
>  - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
> within the while-loop:
>
> >       long sleep = 1;
> >       while(!stormCollector.hasNext()) {
> >               Thread.sleep(sleep);
> >               sleep *= 2;
> >               spout.nextTuple();
> >       }
>
> StormFiniteSpoutWrapper:
>   - remove member variable "isDefined" --> this is redundant information
> and might cause bugs...
>  - can we remove the "tupleEmitted" flag? Maybe we can implement it
> without it (nor sure though)
>
>
> I am also working on a new implementation of StromSpoutWrapper and
> StormSpoutCollector. I will push it into my own repository if finished
> and tell you. It could replace the current implementation without the
> "nasty" buffering Queue (which I don't like). However, we need to
> discuss this alternative implementation first.
>
>
> -Matthias
>
>
> On 06/03/2015 03:32 PM, Szabó Péter wrote:
> > ---------- Forwarded message ----------
> > From: Szabó Péter <nemderogator...@gmail.com>
> > Date: 2015-06-03 15:31 GMT+02:00
> > Subject: Re: Discussion: Storm Comparability Layer
> > To: Márton Balassi <balassi.mar...@gmail.com>
> >
> >
> > Hey, Matthias,
> >
> > Of course, you can remove my last commit. I just wanted to remove the
> > failing tests, and some unnecessary comments. Please do the latter it in
> > your commit as well.
> >
> > As for StormSpoutCollector, I used Queue with LinkedList implementation,
> > because the list we keep is a queue in nature: we put records into it,
> and
> > remove the head from time to time. The collector implements iterator,
> > because I wanted to use something like next() and hasNext() in the
> > StormSpoutWrapper. I think emphasizing this iterator-nature makes the
> code
> > more readable.
> >
> > Peter
> >
> > 2015-06-03 14:16 GMT+02:00 Márton Balassi <balassi.mar...@gmail.com>:
> >
> >> Hey Matthias,
> >>
> >> We can undo Peter's commit if that helps you and have yours instead. You
> >> can simply remove that commit in a rebase. Besides this let us push to
> the
> >> same branch with trying not to break the history, I will squash the
> commits
> >> once again if it gets too bulky.
> >>
> >> I would like to bring the discussion to the mailing list, so the
> cummunity
> >> is seeing that you are actively working on this. Are you OK with
> reposting
> >> this thread to the dev mailing list?
> >>
> >> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax <
> >> mj...@informatik.hu-berlin.de> wrote:
> >>
> >>> Hi,
> >>>
> >>> I just saw, that Peter pushed a new commit. It makes it hard for me to
> >>> push my changes. Can we undo the last commit?
> >>>
> >>> If I get it right, it removes StormFiniteSpoutWrapper and disables
> >>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I
> would
> >>> rather keep it.
> >>>
> >>> -Matthias
> >>>
> >>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote:
> >>>> Hi,
> >>>>
> >>>> I have a few questions about the current status ("storm" branch from
> >>>> Marton).
> >>>>
> >>>> StormSpoutCollector:
> >>>>   - is there any specify advantage in using a Queue instead of
> >>>> LinkedList for the internal buffer?
> >>>>   - Why are us implementing Iterator interface and mark
> >>>> flinkCollectionDelegates as private?
> >>>>     -> I would rather drop the interface and make the variable
> "package
> >>>> private" to access it directly (avoids "unnecessary" method calls)
> >>>>
> >>>> StormSpoutWrapper:
> >>>>   - do we still need "isRunning" and "cancel()"? The new API should
> make
> >>>> them obsolete from my point of view.
> >>>>   - I would avoid "busy wait" in "next()" and apply a "not-emit"
> penalty
> >>>> within the while-loop:
> >>>>
> >>>>>      long sleep = 1;
> >>>>>      while(!stormCollector.hasNext()) {
> >>>>>              Thread.sleep(sleep);
> >>>>>              sleep *= 2;
> >>>>>              spout.nextTuple();
> >>>>>      }
> >>>>
> >>>> StormFiniteSpoutWrapper:
> >>>>   - remove member variable "isDefined" --> this is redundant
> information
> >>>> and might cause bugs...
> >>>>   - can we remove the "tupleEmitted" flag? Maybe we can implement it
> >>>> without it (nor sure though)
> >>>>
> >>>>
> >>>> I am also working on a new implementation of StormSpoutOutputWrapper.
> I
> >>>> will push it into my own repository if finished and tell you. It could
> >>>> replace the current implementation without the "nasty" buffering Queue
> >>>> (which I don't like). However, we need to discuss this alternative
> >>>> implementation first.
> >>>>
> >>>> Things I would like to push:
> >>>>
> >>>> I fixed the following tests (was already fixed in my branch but not
> >>>> merged by Marton):
> >>>>  - StormBoltWrapperTest
> >>>>  - StormSpoutWrapperTest
> >>>>  - StormFiniteSpoutWrapperTest
> >>>>  - Added new Test class InfiniteTestSpout
> >>>>
> >>>> I also step throw the hole code, removed "unused" tag (which are not
> >>>> necessary for public methods), corrected a few spelling mistakes is
> >>>> comments, and did some other minor "improvements".
> >>>>
> >>>> Additionally, I "merged" my changes (after my rebase) that are
> different
> >>>> to Peters changes. Peter and I discussed some of the rebase
> differences
> >>>> and I "merged" my and his changes (we both agreed how to resolve the
> >>>> differenced already).
> >>>>
> >>>> If it is ok, I will push it directly into Marton's git repository.
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to