[ https://issues.apache.org/jira/browse/FLINK-18830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17172797#comment-17172797 ]
liupengcheng edited comment on FLINK-18830 at 8/8/20, 1:08 AM: --------------------------------------------------------------- First, I think we can create a new functionalInterface, e.g., DeclarativeJoinFunction, which provide more functionalities like checking join predicate result(true/false), returning joined result element according to different join predicate result. It may looks like this: {code:java} public interface DeclarativeJoinFuntion<IN1, IN2, OUT> { // check predicate result boolean joinPredicate(IN1 left, IN2 right); // return the full joined element when join predicate returns true OUT full(IN1 left, IN2 right); // return the joined element whose right side is padded with nulls. e.g. for left outer join when no match found in the right. OUT padLeft(IN1 left); // return the joined element whose left side is padded with nulls. OUT padRight(IN2 right); } {code} Then, based on the interfaces we can also create RichComplexFlatJoinFunction, and so that we are able to support outer join in `JoinCoGroupFunction` and `FlatJoinCoGroupFunction`. e.g. for lefter outer join. if is second iterator is empty or can not find any match, we can do as the following: {code:java} for (T1 val1: first) { // reset rightMatched to false for (T2 val2: second) { // call real join, e.g. RichDeclarativeJoinFunction // if any match found then set rightMatched to true } if not rightMatched out.collect(declarativeJoinFunction.padLeft(val1)); } {code} was (Author: liupengcheng): First, I think we can create a new functionalInterface, e.g., DeclarativeJoinFunction, which provide more functionalities like checking join predicate result(true/false), returning joined result element according to different join predicate result. It may looks like this: {code:java} public interface DeclarativeJoinFuntion<IN1, IN2, OUT> { // check predicate result boolean joinPredicate(IN1 left, IN2 right); // return the full joined element when join predicate returns true abstract OUT full(IN1 left, IN2 right); // return the joined element whose right side is padded with nulls. e.g. for left outer join when no match found in the right. abstract OUT padLeft(IN1 left); // return the joined element whose left side is padded with nulls. abstract OUT padRight(IN2 right); } {code} Then, based on the interfaces we can also create RichComplexFlatJoinFunction, and so that we are able to support outer join in `JoinCoGroupFunction` and `FlatJoinCoGroupFunction`. e.g. for lefter outer join. if is second iterator is empty or can not find any match, we can do as the following: {code:java} for (T1 val1: first) { // reset rightMatched to false for (T2 val2: second) { // call real join, e.g. RichDeclarativeJoinFunction // if any match found then set rightMatched to true } if not rightMatched out.collect(declarativeJoinFunction.padLeft(val1)); } {code} > JoinCoGroupFunction and FlatJoinCoGroupFunction work incorrectly for outer > join when one side of coGroup is empty > ----------------------------------------------------------------------------------------------------------------- > > Key: FLINK-18830 > URL: https://issues.apache.org/jira/browse/FLINK-18830 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.11.1 > Reporter: liupengcheng > Priority: Major > > Currently, The `JoinCoGroupFunction` and `FlatJoinCoGroupFunction` in > JoinedStreams does't respect the join type, it's been implemented as doing > join within a two-level loop. However, this is incorrect for outer join when > one side of the coGroup is empty. > ``` > public void coGroup(Iterable<T1> first, Iterable<T2> second, > Collector<T> out) throws Exception { > for (T1 val1: first) { > for (T2 val2: second) { > wrappedFunction.join(val1, val2, out); > } > } > } > ``` > The above code is the current implementation, suppose the first input is > non-empty, and the second input is an empty iterator, then the join > function(`wrappedFunction`) will never be called. This will cause no data to > be emitted for a left outer join. > So I propose to consider join type here, and handle this case, e.g., for left > outer join, we can emit record with right side set to null here if the right > side is empty or can not find any match in the right side. -- This message was sent by Atlassian Jira (v8.3.4#803005)