[ https://issues.apache.org/jira/browse/FLINK-31654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiangjie Qin updated FLINK-31654: --------------------------------- Description: Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well with batch jobs. Currently the chaining strategy of the StreamOperators applied to a KeyedStream is always overridden to HEAD. This is because in batch execution mode the records have to be sorted by keys before they are fed to the stateful operators. The runtime relies on the shuffle to do the sort so a shuffle is needed for the stateful operators. However, for {{DataStreamUtils.reinterpretAsKeyedStream()}} this results in unexpected behavior. It breaks the operator chain and defeats the purpose of reinterpreting the stream instead of calling {{keyBy.}} To fix this issue, we need to do the following for reinterpretAsKeyedStream: # Add a sort operator instead of relying on the shuffle to do the sort. # Stop overriding the chaining strategy specified by the user for the operators applied to the result KeyedStream. was: Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well with batch jobs. There are two issues: # The input to the downstream operator will not be sorted. So users need to sort the records by themselves. # The result {{KeyedStream}} will still override the chaining strategy of the downstream operator to {{HEAD.}} This breaks the operator chain and defeats the purpose of reinterpreting the stream instead of calling {{keyBy.}} This ticket tends to address the second issue. > DataStreamUtils.reinterpretAsKeyedStream() should not override the user > specified chaining strategy. > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-31654 > URL: https://issues.apache.org/jira/browse/FLINK-31654 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.17.0, 1.14.6, 1.16.1, 1.15.4 > Reporter: Jiangjie Qin > Assignee: Jiangjie Qin > Priority: Major > Fix For: 1.16.2, 1.18.0, 1.17.1 > > > Currently {{DataStreamUtils.reinterpretAsKeyedStream()}} does not work well > with batch jobs. Currently the chaining strategy of the StreamOperators > applied to a KeyedStream is always overridden to HEAD. This is because in > batch execution mode the records have to be sorted by keys before they are > fed to the stateful operators. The runtime relies on the shuffle to do the > sort so a shuffle is needed for the stateful operators. > However, for {{DataStreamUtils.reinterpretAsKeyedStream()}} this results in > unexpected behavior. It breaks the operator chain and defeats the purpose of > reinterpreting the stream instead of calling {{keyBy.}} > To fix this issue, we need to do the following for reinterpretAsKeyedStream: > # Add a sort operator instead of relying on the shuffle to do the sort. > # Stop overriding the chaining strategy specified by the user for the > operators applied to the result KeyedStream. -- This message was sent by Atlassian Jira (v8.20.10#820010)