noorall commented on code in PR #25552: URL: https://github.com/apache/flink/pull/25552#discussion_r1904878002
########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java: ########## @@ -256,19 +264,22 @@ public String getEdgeId() { return edgeId; } - public boolean existInterInputsKeyCorrelation() { - return existInterInputsKeyCorrelation; + private void configureKeyCorrelation(StreamPartitioner<?> partitioner) { + this.intraInputKeyCorrelation = + !partitioner.isPointwise() || partitioner instanceof ForwardPartitioner; + this.interInputsKeyCorrelation = !partitioner.isPointwise(); } - public boolean existIntraInputKeyCorrelation() { - return existIntraInputKeyCorrelation; + public boolean existInterInputsKeyCorrelation() { + return interInputsKeyCorrelation; } - public void setExistInterInputsKeyCorrelation(boolean existInterInputsKeyCorrelation) { - this.existInterInputsKeyCorrelation = existInterInputsKeyCorrelation; + public boolean existIntraInputKeyCorrelation() { + return intraInputKeyCorrelation; } - public void setExistIntraInputKeyCorrelation(boolean existIntraInputKeyCorrelation) { - this.existIntraInputKeyCorrelation = existIntraInputKeyCorrelation; + public void setIntraInputKeyCorrelation(boolean intraInputKeyCorrelation) { + checkState(interInputsKeyCorrelation, "InterInputsKeyCorrelation must be true"); Review Comment: > Why does it have such assumption? We hope to strictly control the behavior of modifications to avoid unexpected situations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org