Xiaowei Jiang created FLINK-4855:
------------------------------------
Summary: Add partitionedKeyBy to DataStream
Key: FLINK-4855
URL: https://issues.apache.org/jira/browse/FLINK-4855
Project: Flink
Issue Type: Improvement
Components: DataStream API
Reporter: Xiaowei Jiang
Assignee: MaGuowei
After we do any interesting operations (e.g. reduce) on KeyedStream, the result
becomes DataStream. In a lot of cases, the output still has the same or
compatible keys with the KeyedStream (logically). But to do further operations
on these keys, we are forced to use keyby again. This works semantically, but
is costly in two aspects. First, it destroys the possibility of chaining, which
is one of the most important optimization technique. Second, keyby will greatly
expand the connected components of tasks, which has implications in failover
optimization.
To address this shortcoming, we propose a new operator partitionedKeyBy.
DataStream {
public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
}
Semantically, DataStream.partitionedKeyBy(key) is equivalent to
DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as
an extra field. This guarantees that records from different tasks will never
produce the same keys.
With this, it's possible to do
ds.keyBy(key1).reduce(func1)
.partitionedKeyBy(key1).reduce(func2)
.partitionedKeyBy(key2).reduce(func3);
Most importantly, in certain cases, we will be able to chains these into a
single vertex.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)