Hi, First and foremost I would like to nominate myself to the Golden Shovel award for digging out this topic:
Secondly, I would like to discuss coming back to this particular idea of implementing N-Ary Stream Operator. This time motivation doesn’t come from the Side Inputs, but to efficiently support multi joins in SQL, without extra network exchanges. I’ve reviewed the design doc proposed by Aljoscha, I quite like it and I think we could start from that. Specifically the end-goal is to allow for example Blink, to: I. Implement A* multi broadcast join - to have a single operator chain, where probe table (source) is read locally (inside the task that’s is actually doing the join), then joined with multiple other broadcasted tables. II. Another example might be when we have 2 or more sources, pre-partitioned on the same key. In that case we should also be able to perform all of the table reading and the join inside a single Task. In order to achieve that, I would propose the following plan: 1. Implement N-Ary Stream Operator as proposed in the design doc below, however with added support for the input selection [1]. - initially it can be just exposed via the `StreamTransformation`, without direct access from the `DataStream API` 2. Allow it to be chained with sources (implemented using the FLIP-27 SourceReader [2]) 3. Think about whether we need to support more complex chaining. Without this point, motivating examples (I and II) could be implemented if all of the joins/filtering/mappings are compiled/composed into a single N-Ary Stream Operator (which could be chained with some other single input operators at the tail). We could also think about supporting of chaining a tree of for example TwoInputStreamOperators inside a single Task. However I’m leaving this as a follow up, since in that case, it’s not so easy to handle the `InputSelection` of multiple operators inside the tree. Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html> [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> >> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> Hi, >> yes, I see operators of this style as very much an internal thing. You are >> probably talking about use cases for OneInputOperator and TwoInputOperator >> where users have a very specific need and require access the the low-level >> details such as watermarks, state and timers to get stuff done. Maybe we >> could have a wrapper for these so that users can still use them but >> internally we wrap them in an N-Ary Operator. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org> wrote: >> Hey, >> >> Some initial feedback from side: >> >> I think this a very important problem to deal with as a lot of applications >> depend on it. I like the proposed runtime model and that is probably the >> good way to handle this task, it is very clean what is happening. >> >> My main concern is how to handle this from the API and UDFs. What you >> proposed seems like a very internal thing from the API perspective and I >> would be against exposing it in the way you wrote in your example. We >> should make all effort to streamline this with the functional style >> operators in some way. (so in that sense the way broadcastsets are handled >> is pretty nice) Maybe we could extend ds.connect() to many streams >> >> But in any case this is awesome initiative :) >> >> Cheers, >> Gyula >> >> >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2016. ápr. 21., >> Cs, 15:56): >> >>> Hi Team, >>> I'm currently thinking about how we can bring the broadcast set/broadcast >>> input feature form the DataSet API to the DataStream API. I think this >>> would be a valuable addition since it would enable use cases that join >>> streams with constant (or slowly changing) side information. >>> >>> For this purpose, I think that we need to change the way we handle stream >>> operators. The new model would have one unified operator that handles all >>> cases and allows to add inputs after the operator was constructed, thus >>> allowing the specification of broadcast inputs. >>> >>> I wrote up this preliminary document detailing the reason why we need such >>> a new operator for broadcast inputs and also what the API of such an >>> operator would be. It also quickly touches on the required changes of >>> existing per-operation stream operations such as StreamMap: >>> >>> >>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing >>> >>> Please have a look if you're interested. Feedback/insights are very >>> welcome. :-) >>> >>> Cheers, >>> Aljoscha >>> >