[ 
https://issues.apache.org/jira/browse/FLINK-18830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17172797#comment-17172797
 ] 

liupengcheng edited comment on FLINK-18830 at 8/8/20, 1:08 AM:
---------------------------------------------------------------

First, I think we can create a new functionalInterface, e.g., 
DeclarativeJoinFunction, which provide more functionalities like checking join 
predicate result(true/false), returning joined result element according to 
different join predicate result.
 
It may looks like this:

{code:java}
public interface DeclarativeJoinFuntion<IN1, IN2, OUT> {
     // check predicate result
      boolean joinPredicate(IN1 left, IN2 right);
      // return the full joined element when join predicate returns true
      OUT full(IN1 left, IN2 right);
     // return the joined element whose right side is padded with nulls. e.g. 
for left outer join when no match found in the right.
      OUT padLeft(IN1 left);
     // return the joined element whose left side is padded with nulls.
       OUT padRight(IN2 right);
}
{code}

Then, based on the interfaces we can also create RichComplexFlatJoinFunction, 
and so that we are able to support outer join in `JoinCoGroupFunction` and 
`FlatJoinCoGroupFunction`.

e.g. for lefter outer join. if is second iterator is empty or can not find any 
match, we can do as the following:

{code:java}

for (T1 val1: first) {
   // reset rightMatched to false
   
   for (T2 val2: second) {
       // call real join, e.g. RichDeclarativeJoinFunction
      // if any match found then set rightMatched to true
    }
   
    if not rightMatched
         out.collect(declarativeJoinFunction.padLeft(val1));
}
{code}



was (Author: liupengcheng):
First, I think we can create a new functionalInterface, e.g., 
DeclarativeJoinFunction, which provide more functionalities like checking join 
predicate result(true/false), returning joined result element according to 
different join predicate result.
 
It may looks like this:

{code:java}
public interface DeclarativeJoinFuntion<IN1, IN2, OUT> {
     // check predicate result
      boolean joinPredicate(IN1 left, IN2 right);
      // return the full joined element when join predicate returns true
      abstract OUT full(IN1 left, IN2 right);
     // return the joined element whose right side is padded with nulls. e.g. 
for left outer join when no match found in the right.
      abstract OUT padLeft(IN1 left);
     // return the joined element whose left side is padded with nulls.
       abstract OUT padRight(IN2 right);
}
{code}

Then, based on the interfaces we can also create RichComplexFlatJoinFunction, 
and so that we are able to support outer join in `JoinCoGroupFunction` and 
`FlatJoinCoGroupFunction`.

e.g. for lefter outer join. if is second iterator is empty or can not find any 
match, we can do as the following:

{code:java}

for (T1 val1: first) {
   // reset rightMatched to false
   
   for (T2 val2: second) {
       // call real join, e.g. RichDeclarativeJoinFunction
      // if any match found then set rightMatched to true
    }
   
    if not rightMatched
         out.collect(declarativeJoinFunction.padLeft(val1));
}
{code}


> JoinCoGroupFunction and FlatJoinCoGroupFunction work incorrectly for outer 
> join when one side of coGroup is empty
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18830
>                 URL: https://issues.apache.org/jira/browse/FLINK-18830
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.11.1
>            Reporter: liupengcheng
>            Priority: Major
>
> Currently, The `JoinCoGroupFunction` and `FlatJoinCoGroupFunction` in 
> JoinedStreams does't respect the join type, it's been implemented as doing 
> join within a two-level loop. However, this is incorrect for outer join when 
> one side of the coGroup is empty.
> ```
>       public void coGroup(Iterable<T1> first, Iterable<T2> second, 
> Collector<T> out) throws Exception {
>                       for (T1 val1: first) {
>                               for (T2 val2: second) {
>                                       wrappedFunction.join(val1, val2, out);
>                               }
>                       }
>               }
> ```
> The above code is the current implementation, suppose the first input is 
> non-empty, and the second input is an empty iterator, then the join 
> function(`wrappedFunction`) will never be called. This will cause no data to 
> be emitted for a left outer join.
> So I propose to consider join type here, and handle this case, e.g., for left 
> outer join, we can emit record with right side set to null here if the right 
> side is empty or can not find any match in the right side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to