Sorry Stephan I meant it slightly differently, I see your point: DataStream source = ... SingleInputOperator mapper = source.map(...) mapper.addInput()
So the add input would be a method of the operator not the stream. Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl. 7., K, 16:12): > I think this would be good yes. I was just about to open an Issue for > changing the Streaming Iteration API. :D > > Then we should also make the implementation very straightforward and > simple, right now, the implementation of the iterations is all over the > place. > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gyf...@apache.org> wrote: > > > Hey, > > > > Along with the suggested changes to the streaming API structure I think > we > > should also rework the "iteration" api. Currently the iteration api tries > > to mimic the syntax of the batch API while the runtime behaviour is quite > > different. > > > > What we create instead of iterations is really just cyclic streams (loops > > in the streaming job), so the API should somehow be intuitive about this > > behaviour. > > > > I suggest to remove the explicit iterate call and instead add a method to > > the StreamOperators that allows to connect feedback inputs (create > loops). > > It would look like this: > > > > A mapper that does nothing but iterates over some filtered input: > > > > *Current API :* > > DataStream source = .. > > IterativeDataStream it = source.iterate() > > DataStream mapper = it.map(noOpMapper) > > DataStream feedback = mapper.filter(...) > > it.closeWith(feedback) > > > > *Suggested API :* > > DataStream source = .. > > DataStream mapper = source.map(noOpMapper) > > DataStream feedback = mapper.filter(...) > > mapper.addInput(feedback) > > > > The suggested approach would let us define inputs to operators after they > > are created and implicitly union them with the normal input. This is I > > think a much clearer approach than what we have now. > > > > What do you think? > > > > Gyula > > >