@Kostas: This new API is I believe equivalent in expressivity with the current one. We can define nested loops now as well. And I also don't see nested loops much worse generally than simple loops.
Gyula Fóra <gyula.f...@gmail.com> ezt írta (időpont: 2015. júl. 7., K, 16:14): > Sorry Stephan I meant it slightly differently, I see your point: > > DataStream source = ... > SingleInputOperator mapper = source.map(...) > mapper.addInput() > > So the add input would be a method of the operator not the stream. > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. 7., > K, 16:12): > >> I think this would be good yes. I was just about to open an Issue for >> changing the Streaming Iteration API. :D >> >> Then we should also make the implementation very straightforward and >> simple, right now, the implementation of the iterations is all over the >> place. >> >> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gyf...@apache.org> wrote: >> >> > Hey, >> > >> > Along with the suggested changes to the streaming API structure I think >> we >> > should also rework the "iteration" api. Currently the iteration api >> tries >> > to mimic the syntax of the batch API while the runtime behaviour is >> quite >> > different. >> > >> > What we create instead of iterations is really just cyclic streams >> (loops >> > in the streaming job), so the API should somehow be intuitive about this >> > behaviour. >> > >> > I suggest to remove the explicit iterate call and instead add a method >> to >> > the StreamOperators that allows to connect feedback inputs (create >> loops). >> > It would look like this: >> > >> > A mapper that does nothing but iterates over some filtered input: >> > >> > *Current API :* >> > DataStream source = .. >> > IterativeDataStream it = source.iterate() >> > DataStream mapper = it.map(noOpMapper) >> > DataStream feedback = mapper.filter(...) >> > it.closeWith(feedback) >> > >> > *Suggested API :* >> > DataStream source = .. >> > DataStream mapper = source.map(noOpMapper) >> > DataStream feedback = mapper.filter(...) >> > mapper.addInput(feedback) >> > >> > The suggested approach would let us define inputs to operators after >> they >> > are created and implicitly union them with the normal input. This is I >> > think a much clearer approach than what we have now. >> > >> > What do you think? >> > >> > Gyula >> > >> >