Hi kant, Thanks for reporting the issue, I'd like to give some thoughts here after digging into the source code[1] in blink planner, logic is same with legacy planner[2].
The main logic of FULL OUTER JOIN is: if input record is accumulate | if input side is outer | | if there is no matched rows on the other side, send +[record+null], state.add(record, 0) | | if there are matched rows on the other side | | | if other side is outer | | | | if the matched num in the matched rows == 0, send -[null+other] | | | | if the matched num in the matched rows > 0, skip | | | | otherState.update(other, old + 1) | | | endif | | | send +[record+other]s, state.add(record, other.size) | | endif | endif | if input side not outer | | state.add(record) | | if there is no matched rows on the other side, skip | | if there are matched rows on the other side | | | if other side is outer | | | | if the matched num in the matched rows == 0, send -[null+other] | | | | if the matched num in the matched rows > 0, skip | | | | otherState.update(other, old + 1) | | | endif | | | send +[record+other]s | | endif | endif endif if input record is retract | state.retract(record) | if there is no matched rows on the other side | | if input side is outer, send -[record+null] | endif | if there are matched rows on the other side, send -[record+other]s | | if other side is outer | | | if the matched num in the matched rows == 0, this should never happen! | | | if the matched num in the matched rows == 1, send +[null+other] | | | if the matched num in the matched rows > 1, skip | | | otherState.update(other, old - 1) | | endif | endif endif For just one Join Operator, the logic above is correct, and deals with all corner cases. However, for your query, there are three Join Operators: Join3 / \ Join2 T4 / \ Join1 T3 / \ T1 T2 At t4, after sending "flink" to test-topic2: Join1 will first retract -[flink, null, null, null], then send +[flink, flink, null, null] Join2 receives -[flink, null, null, null], will send -[flink, null, null, null] Join2 receives +[flink, flink, null, null], will send -[null, null, flink, null], and +[flink, flink, flink, null] Join3 receives -[flink, null, null, null], will send -[flink, null, null, null] Join3 receives -[null, null, flink, null], will send -[null, null, flink, flink], and +[null, null, null, flink] Join3 receives +[flink, flink, flink, null], will send -[null, null, null, flink], and +[flink, flink, flink, flink] In my personal opinion, it's normal behavior for current design, and I can't find an easy way to eliminate this duplication. Let's wait for Jark and Timo's opinions. [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java [2] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala kant kodali <kanth...@gmail.com> 于2020年2月2日周日 上午5:18写道: > Wondering if anyone had a chance to look through this or should I create > the JIRA? > > > > On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Kant, >> >> I am not an expert on Flink's SQL implementation. Hence, I'm pulling in >> Timo and Jark who might help you with your question. >> >> Cheers, >> Till >> >> On Tue, Jan 28, 2020 at 10:46 PM kant kodali <kanth...@gmail.com> wrote: >> >>> Sorry. fixed some typos. >>> >>> I am doing a streaming outer join from four topics in Kafka lets call >>> them sample1, sample2, sample3, sample4. Each of these test topics has just >>> one column which is of tuple string. my query is this >>> >>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL >>> OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on >>> sample3.f0=sample4.f0 >>> >>> >>> And here is how I send messages to those Kafka topics at various times. >>> >>> At time t1 Send a message "flink" to test-topic1 >>> >>> (true,flink,null,null,null) // Looks good >>> >>> At time t2 Send a message "flink" to test-topic4 >>> >>> (true,null,null,null,flink) // Looks good >>> >>> At time t3 Send a message "flink" to test-topic3 >>> >>> (false,null,null,null,flink) // Looks good >>> (true,null,null,flink,flink) //Looks good >>> >>> At time t4 Send a message "flink" to test-topic2 >>> >>> (false,flink,null,null,null) // Looks good >>> (false,null,null,flink,flink) // Looks good >>> *(true,null,null,null,flink) // Redundant?* >>> *(false,null,null,null,flink) // Redundant?* >>> (true,flink,flink,flink,flink) //Looks good >>> >>> Assume t1<t2<t3<t4 >>> >>> Those two rows above seem to be redundant to me although the end result >>> is correct. Doesn't see the same behavior if I join two topics. These >>> redundant messages can lead to a lot of database operations underneath so >>> any way to optimize this? I am using Flink 1.9 so not sure if this is >>> already fixed in 1.10. >>> >>> Attached the code as well. >>> >>> Thanks! >>> kant >>> >>> >>> On Tue, Jan 28, 2020 at 1:43 PM kant kodali <kanth...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I am doing a streaming outer join from four topics in Kafka lets call >>>> them sample1, sample2, sample3, sample4. Each of these test topics has just >>>> one column which is of tuple string. my query is this >>>> >>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 >>>> FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 >>>> on sample3.f0=sample4.f0 >>>> >>>> >>>> And here is how I send messages to those Kafka topics at various times. >>>> >>>> At time t1 Send a message "flink" to test-topic1 >>>> >>>> (true,flink,null,null,null) // Looks good >>>> >>>> At time t2 Send a message "flink" to test-topic4 >>>> >>>> (true,null,null,null,flink) // Looks good >>>> >>>> At time t3 Send a message "flink" to test-topic3 >>>> >>>> (false,null,null,null,flink) // Looks good >>>> (true,null,null,flink,flink) //Looks good >>>> >>>> At time t3 Send a message "flink" to test-topic2 >>>> >>>> (false,flink,null,null,null) // Looks good >>>> (false,null,null,flink,flink) // Looks good >>>> *(true,null,null,null,flink) // Redundant?* >>>> *(false,null,null,null,flink) // Redundant?* >>>> (true,flink,flink,flink,flink) //Looks good >>>> >>>> Those two rows above seem to be redundant to be although the end result >>>> is correct. Doesn't see the same behavior if I join two topics. This >>>> unwanted message will lead to a lot of database operations underneath so >>>> any way to optimize this? I am using Flink 1.9 so not sure if this is >>>> already fixed in 1.10. >>>> >>>> Attached the code as well. >>>> >>>> Thanks! >>>> kant >>>> >>>> >>>> >>>> -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn