Hi, you would need to set the co-location constraint in order to ensure that the sub-tasks of operators are deployed to the same machine. It effectively means that subtasks a_i, b_i of operator a and b will be deployed to the same slot. This feature is not super well exposed but you can take a look at [1] to see how it can be used.
[1] https://issues.apache.org/jira/browse/FLINK-9809 Cheers, Till On Fri, Jan 10, 2020 at 9:08 AM Zhijiang <wangzhijiang...@aliyun.com> wrote: > Only chained operators can avoid record serialization cost, but the > chaining mode can not support keyed stream. > If you want to deploy downstream with upstream in the same task manager, > it can avoid network shuffle cost which can still get performance benefits. > As I know @Till Rohrmann has implemented some enhancements in scheduler > layer to support such requirement in release-1.10. You can have a try when > the rc candidate is ready. > > Best, > Zhijiang > > ------------------------------------------------------------------ > From:杨东晓 <laolang...@gmail.com> > Send Time:2020 Jan. 10 (Fri.) 02:10 > To:Congxian Qiu <qcx978132...@gmail.com> > Cc:user <user@flink.apache.org> > Subject:Re: How can I find out which key group belongs to which subtask > > Thanks Congxian! > My purpose is not only make data goes into one same subtask but the > specific subtask which belongs to same taskmanager with upstream record. > The key idea is to avoid shuffling between taskmanagers. > I think the KeyGroupRangeAssignment.java > <https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java> > explained a lot about how to get keygroup and subtask context that can make > that happen. > Do you know if there are still serialization happening while data > transferred between operator in same taskmanager? > Thanks. > > Congxian Qiu <qcx978132...@gmail.com> 于2020年1月9日周四 上午1:55写道: > Hi > > If you just want to make sure some key goes into the same subtask, does > custom key selector[1] help? > > For the keygroup and subtask information, you can ref to > KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you > can ref to doc[3] > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions > [2] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java > [3] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism > > Best, > Congxian > > > 杨东晓 <laolang...@gmail.com> 于2020年1月9日周四 上午7:47写道: > Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. > Is there any possible I can find out one key belongs to which key-group > and essentially find out one key-group belongs to which subtask. > The motivation I want to know that is we want to force the data records > from upstream still goes to same taskmanager downstream subtask .Which > means even if we use a keyedstream function we still want no cross jvm > communication happened during run time. > And if we can achieve that , can we also avoid the expensive cost for > record serialization because data is only transferred in same taskmanager > jvm instance? > > Thanks. > > >