Thanks for the info.

I am just a little bit "disappointed". The whole rewrite to the new
interface was unnecessary... We need to "revert" everything again...
I can also stop working on the new StormSpoutWrapper and
StormSpoutCollector implementation in this case...

Let's see how it goes. But from my point of view, The simplest way might
even be to use the old state (including the clean up commit from Peter
before his rebase) from my original pull request and work from there
after the changed interface is in the master.

Any other thought on this?

(I guess, the current reworking was a little bit too eager.)

@Marton: Can you have a lock at the travis build. It failed with a weird
compilation error. It builds locally. I cannot reproduce the error...
(Maybe just trigger the build again)


-Matthias


On 06/03/2015 09:09 PM, Márton Balassi wrote:
> Thanks for the updates, Matthias.
> 
> Both of your questions get an other context, because we have decided to go
> back to the run()/cancel() type of source interface - but with a slightly
> changed signature to enable "transactional" operator state checkpointing.
> You can check out the new source interface here [1] which is part of PR 755.
> 
> We hope that PR 755 will be in the master in the following days as it is a
> release blocker, so you can plan with those interfaces. As far as the
> FiniteSpoutWrapper I think this makes your job easier and you definitely
> need the isRunning flag and the cancel method.
> 
> [1]
> https://github.com/StephanEwen/incubator-flink/blob/stream_sources/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
> 
> 
> On Wed, Jun 3, 2015 at 7:51 PM, Matthias J. Sax <
> mj...@informatik.hu-berlin.de> wrote:
> 
>> I just pushed my changes to Marton's "storm" branch.
>>
>> It is still open how to process with the following (please give feedback):
>>
>> StormSpoutWrapper:
>>   - do we still need "isRunning" and "cancel()"? The new API should make
>> them obsolete from my point of view.
>>  - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
>> within the while-loop:
>>
>>>       long sleep = 1;
>>>       while(!stormCollector.hasNext()) {
>>>               Thread.sleep(sleep);
>>>               sleep *= 2;
>>>               spout.nextTuple();
>>>       }
>>
>> StormFiniteSpoutWrapper:
>>   - remove member variable "isDefined" --> this is redundant information
>> and might cause bugs...
>>  - can we remove the "tupleEmitted" flag? Maybe we can implement it
>> without it (nor sure though)
>>
>>
>> I am also working on a new implementation of StromSpoutWrapper and
>> StormSpoutCollector. I will push it into my own repository if finished
>> and tell you. It could replace the current implementation without the
>> "nasty" buffering Queue (which I don't like). However, we need to
>> discuss this alternative implementation first.
>>
>>
>> -Matthias
>>
>>
>> On 06/03/2015 03:32 PM, Szabó Péter wrote:
>>> ---------- Forwarded message ----------
>>> From: Szabó Péter <nemderogator...@gmail.com>
>>> Date: 2015-06-03 15:31 GMT+02:00
>>> Subject: Re: Discussion: Storm Comparability Layer
>>> To: Márton Balassi <balassi.mar...@gmail.com>
>>>
>>>
>>> Hey, Matthias,
>>>
>>> Of course, you can remove my last commit. I just wanted to remove the
>>> failing tests, and some unnecessary comments. Please do the latter it in
>>> your commit as well.
>>>
>>> As for StormSpoutCollector, I used Queue with LinkedList implementation,
>>> because the list we keep is a queue in nature: we put records into it,
>> and
>>> remove the head from time to time. The collector implements iterator,
>>> because I wanted to use something like next() and hasNext() in the
>>> StormSpoutWrapper. I think emphasizing this iterator-nature makes the
>> code
>>> more readable.
>>>
>>> Peter
>>>
>>> 2015-06-03 14:16 GMT+02:00 Márton Balassi <balassi.mar...@gmail.com>:
>>>
>>>> Hey Matthias,
>>>>
>>>> We can undo Peter's commit if that helps you and have yours instead. You
>>>> can simply remove that commit in a rebase. Besides this let us push to
>> the
>>>> same branch with trying not to break the history, I will squash the
>> commits
>>>> once again if it gets too bulky.
>>>>
>>>> I would like to bring the discussion to the mailing list, so the
>> cummunity
>>>> is seeing that you are actively working on this. Are you OK with
>> reposting
>>>> this thread to the dev mailing list?
>>>>
>>>> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax <
>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I just saw, that Peter pushed a new commit. It makes it hard for me to
>>>>> push my changes. Can we undo the last commit?
>>>>>
>>>>> If I get it right, it removes StormFiniteSpoutWrapper and disables
>>>>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I
>> would
>>>>> rather keep it.
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I have a few questions about the current status ("storm" branch from
>>>>>> Marton).
>>>>>>
>>>>>> StormSpoutCollector:
>>>>>>   - is there any specify advantage in using a Queue instead of
>>>>>> LinkedList for the internal buffer?
>>>>>>   - Why are us implementing Iterator interface and mark
>>>>>> flinkCollectionDelegates as private?
>>>>>>     -> I would rather drop the interface and make the variable
>> "package
>>>>>> private" to access it directly (avoids "unnecessary" method calls)
>>>>>>
>>>>>> StormSpoutWrapper:
>>>>>>   - do we still need "isRunning" and "cancel()"? The new API should
>> make
>>>>>> them obsolete from my point of view.
>>>>>>   - I would avoid "busy wait" in "next()" and apply a "not-emit"
>> penalty
>>>>>> within the while-loop:
>>>>>>
>>>>>>>      long sleep = 1;
>>>>>>>      while(!stormCollector.hasNext()) {
>>>>>>>              Thread.sleep(sleep);
>>>>>>>              sleep *= 2;
>>>>>>>              spout.nextTuple();
>>>>>>>      }
>>>>>>
>>>>>> StormFiniteSpoutWrapper:
>>>>>>   - remove member variable "isDefined" --> this is redundant
>> information
>>>>>> and might cause bugs...
>>>>>>   - can we remove the "tupleEmitted" flag? Maybe we can implement it
>>>>>> without it (nor sure though)
>>>>>>
>>>>>>
>>>>>> I am also working on a new implementation of StormSpoutOutputWrapper.
>> I
>>>>>> will push it into my own repository if finished and tell you. It could
>>>>>> replace the current implementation without the "nasty" buffering Queue
>>>>>> (which I don't like). However, we need to discuss this alternative
>>>>>> implementation first.
>>>>>>
>>>>>> Things I would like to push:
>>>>>>
>>>>>> I fixed the following tests (was already fixed in my branch but not
>>>>>> merged by Marton):
>>>>>>  - StormBoltWrapperTest
>>>>>>  - StormSpoutWrapperTest
>>>>>>  - StormFiniteSpoutWrapperTest
>>>>>>  - Added new Test class InfiniteTestSpout
>>>>>>
>>>>>> I also step throw the hole code, removed "unused" tag (which are not
>>>>>> necessary for public methods), corrected a few spelling mistakes is
>>>>>> comments, and did some other minor "improvements".
>>>>>>
>>>>>> Additionally, I "merged" my changes (after my rebase) that are
>> different
>>>>>> to Peters changes. Peter and I discussed some of the rebase
>> differences
>>>>>> and I "merged" my and his changes (we both agreed how to resolve the
>>>>>> differenced already).
>>>>>>
>>>>>> If it is ok, I will push it directly into Marton's git repository.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 


Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to