Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r186341293 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream<Item, Color> colorPartitionedStream = shapeStream + .keyBy(new KeySelector<Shape, Color>(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint<Rule>() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream<Rule> ruleBroadcastStream = ruleStream + .broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: + 1) connect the two streams and + 2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + +<div class="alert alert-info"> + <strong>Attention:</strong> The connect should be called on the non-broadcasted stream, with the `BroadcastStream` --- End diff -- What happens if the user calls connect on the wrong (broadcasted) stream? Is there an exception? If so it might make sense to clarify that here.
---