Thanks for the updates, Matthias. Both of your questions get an other context, because we have decided to go back to the run()/cancel() type of source interface - but with a slightly changed signature to enable "transactional" operator state checkpointing. You can check out the new source interface here [1] which is part of PR 755.
We hope that PR 755 will be in the master in the following days as it is a release blocker, so you can plan with those interfaces. As far as the FiniteSpoutWrapper I think this makes your job easier and you definitely need the isRunning flag and the cancel method. [1] https://github.com/StephanEwen/incubator-flink/blob/stream_sources/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java On Wed, Jun 3, 2015 at 7:51 PM, Matthias J. Sax < mj...@informatik.hu-berlin.de> wrote: > I just pushed my changes to Marton's "storm" branch. > > It is still open how to process with the following (please give feedback): > > StormSpoutWrapper: > - do we still need "isRunning" and "cancel()"? The new API should make > them obsolete from my point of view. > - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty > within the while-loop: > > > long sleep = 1; > > while(!stormCollector.hasNext()) { > > Thread.sleep(sleep); > > sleep *= 2; > > spout.nextTuple(); > > } > > StormFiniteSpoutWrapper: > - remove member variable "isDefined" --> this is redundant information > and might cause bugs... > - can we remove the "tupleEmitted" flag? Maybe we can implement it > without it (nor sure though) > > > I am also working on a new implementation of StromSpoutWrapper and > StormSpoutCollector. I will push it into my own repository if finished > and tell you. It could replace the current implementation without the > "nasty" buffering Queue (which I don't like). However, we need to > discuss this alternative implementation first. > > > -Matthias > > > On 06/03/2015 03:32 PM, Szabó Péter wrote: > > ---------- Forwarded message ---------- > > From: Szabó Péter <nemderogator...@gmail.com> > > Date: 2015-06-03 15:31 GMT+02:00 > > Subject: Re: Discussion: Storm Comparability Layer > > To: Márton Balassi <balassi.mar...@gmail.com> > > > > > > Hey, Matthias, > > > > Of course, you can remove my last commit. I just wanted to remove the > > failing tests, and some unnecessary comments. Please do the latter it in > > your commit as well. > > > > As for StormSpoutCollector, I used Queue with LinkedList implementation, > > because the list we keep is a queue in nature: we put records into it, > and > > remove the head from time to time. The collector implements iterator, > > because I wanted to use something like next() and hasNext() in the > > StormSpoutWrapper. I think emphasizing this iterator-nature makes the > code > > more readable. > > > > Peter > > > > 2015-06-03 14:16 GMT+02:00 Márton Balassi <balassi.mar...@gmail.com>: > > > >> Hey Matthias, > >> > >> We can undo Peter's commit if that helps you and have yours instead. You > >> can simply remove that commit in a rebase. Besides this let us push to > the > >> same branch with trying not to break the history, I will squash the > commits > >> once again if it gets too bulky. > >> > >> I would like to bring the discussion to the mailing list, so the > cummunity > >> is seeing that you are actively working on this. Are you OK with > reposting > >> this thread to the dev mailing list? > >> > >> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax < > >> mj...@informatik.hu-berlin.de> wrote: > >> > >>> Hi, > >>> > >>> I just saw, that Peter pushed a new commit. It makes it hard for me to > >>> push my changes. Can we undo the last commit? > >>> > >>> If I get it right, it removes StormFiniteSpoutWrapper and disables > >>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I > would > >>> rather keep it. > >>> > >>> -Matthias > >>> > >>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote: > >>>> Hi, > >>>> > >>>> I have a few questions about the current status ("storm" branch from > >>>> Marton). > >>>> > >>>> StormSpoutCollector: > >>>> - is there any specify advantage in using a Queue instead of > >>>> LinkedList for the internal buffer? > >>>> - Why are us implementing Iterator interface and mark > >>>> flinkCollectionDelegates as private? > >>>> -> I would rather drop the interface and make the variable > "package > >>>> private" to access it directly (avoids "unnecessary" method calls) > >>>> > >>>> StormSpoutWrapper: > >>>> - do we still need "isRunning" and "cancel()"? The new API should > make > >>>> them obsolete from my point of view. > >>>> - I would avoid "busy wait" in "next()" and apply a "not-emit" > penalty > >>>> within the while-loop: > >>>> > >>>>> long sleep = 1; > >>>>> while(!stormCollector.hasNext()) { > >>>>> Thread.sleep(sleep); > >>>>> sleep *= 2; > >>>>> spout.nextTuple(); > >>>>> } > >>>> > >>>> StormFiniteSpoutWrapper: > >>>> - remove member variable "isDefined" --> this is redundant > information > >>>> and might cause bugs... > >>>> - can we remove the "tupleEmitted" flag? Maybe we can implement it > >>>> without it (nor sure though) > >>>> > >>>> > >>>> I am also working on a new implementation of StormSpoutOutputWrapper. > I > >>>> will push it into my own repository if finished and tell you. It could > >>>> replace the current implementation without the "nasty" buffering Queue > >>>> (which I don't like). However, we need to discuss this alternative > >>>> implementation first. > >>>> > >>>> Things I would like to push: > >>>> > >>>> I fixed the following tests (was already fixed in my branch but not > >>>> merged by Marton): > >>>> - StormBoltWrapperTest > >>>> - StormSpoutWrapperTest > >>>> - StormFiniteSpoutWrapperTest > >>>> - Added new Test class InfiniteTestSpout > >>>> > >>>> I also step throw the hole code, removed "unused" tag (which are not > >>>> necessary for public methods), corrected a few spelling mistakes is > >>>> comments, and did some other minor "improvements". > >>>> > >>>> Additionally, I "merged" my changes (after my rebase) that are > different > >>>> to Peters changes. Peter and I discussed some of the rebase > differences > >>>> and I "merged" my and his changes (we both agreed how to resolve the > >>>> differenced already). > >>>> > >>>> If it is ok, I will push it directly into Marton's git repository. > >>>> > >>>> > >>>> > >>>> -Matthias > >>>> > >>> > >>> > >> > > > >