[ https://issues.apache.org/jira/browse/FLINK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066473#comment-16066473 ]
Aljoscha Krettek commented on FLINK-6936: ----------------------------------------- I see, I have a few quick comments but no completely satisfactory solution with regards to state. You can try using {{OperatorStateStore.getUnionListState()}} [javadoc|https://github.com/apache/flink/blob/2ef4900aa279e75844a9f8536cfe007c2542187d/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java#L77-L77] for the state that is broadcast. With this type of state the checkpointed state of all parallel operator instances is collected and sent to all operator instances when restoring. In your case, where the state is the same on all operator instances, you would only checkpoint on operator instance 0 and thereby get the same state on all operator instances when restoring (independent of changed parallelism). Regarding data partitioning, couldn't you use something like {code} orderA.rebalance().connect(orderB.broadcast()) .process(new CommonStreamJoin[Order, Order, Order2]( new JoinFunction[Order, Order, Order2] { override def join(left: Order, right: Order): Order2 = { Order2(left.user, right.user, left.product, right.product, left.amount, right.amount); } }, 60000, 1000)) {code} {{CommonStreamJoin}} would have to be a {{CoProcessFunction}} and you would get rid of the two other UDFs that only to packing/unpacking and routing. (This doesn't work if you want different multicast patterns, of course, but for the example that you showed where one stream is randomly partitioned and the other is broadcast it works.) > Add multiple targets support for custom partitioner > --------------------------------------------------- > > Key: FLINK-6936 > URL: https://issues.apache.org/jira/browse/FLINK-6936 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Reporter: Xingcan Cui > Assignee: Xingcan Cui > Priority: Minor > > The current user-facing Partitioner only allows returning one target. > {code:java} > @Public > public interface Partitioner<K> extends java.io.Serializable, Function { > /** > * Computes the partition for the given key. > * > * @param key The key. > * @param numPartitions The number of partitions to partition into. > * @return The partition index. > */ > int partition(K key, int numPartitions); > } > {code} > Actually, this function should return multiple partitions and this may be a > historical legacy. > There could be at least three approaches to solve this. > # Make the `protected DataStream<T> setConnectionType(StreamPartitioner<T> > partitioner)` method in DataStream public and that allows users to directly > define StreamPartitioner. > # Change the `partition` method in the Partitioner interface to return an int > array instead of a single int value. > # Add a new `multicast` method to DataStream and provide a MultiPartitioner > interface which returns an int array. > Considering the consistency of API, the 3rd approach seems to be an > acceptable choice. [~aljoscha], what do you think? -- This message was sent by Atlassian JIRA (v6.4.14#64029)