Hi kant,

Thanks for reporting the issue, I'd like to give some thoughts here after
digging into the source code[1] in blink planner, logic is same with legacy
planner[2].

The main logic of FULL OUTER JOIN is:

if input record is accumulate
|  if input side is outer
|  |  if there is no matched rows on the other side, send
+[record+null], state.add(record, 0)
|  |  if there are matched rows on the other side
|  |  | if other side is outer
|  |  | |  if the matched num in the matched rows == 0, send -[null+other]
|  |  | |  if the matched num in the matched rows > 0, skip
|  |  | |  otherState.update(other, old + 1)
|  |  | endif
|  |  | send +[record+other]s, state.add(record, other.size)
|  |  endif
|  endif
|  if input side not outer
|  |  state.add(record)
|  |  if there is no matched rows on the other side, skip
|  |  if there are matched rows on the other side
|  |  |  if other side is outer
|  |  |  |  if the matched num in the matched rows == 0, send -[null+other]
|  |  |  |  if the matched num in the matched rows > 0, skip
|  |  |  |  otherState.update(other, old + 1)
|  |  |  endif
|  |  |  send +[record+other]s
|  |  endif
|  endif
endif

if input record is retract
|  state.retract(record)
|  if there is no matched rows on the other side
|  | if input side is outer, send -[record+null]
|  endif
|  if there are matched rows on the other side, send -[record+other]s
|  |  if other side is outer
|  |  |  if the matched num in the matched rows == 0, this should never happen!
|  |  |  if the matched num in the matched rows == 1, send +[null+other]
|  |  |  if the matched num in the matched rows > 1, skip
|  |  |  otherState.update(other, old - 1)
|  |  endif
|  endif
endif


For just one Join Operator, the logic above is correct, and deals with all
corner cases.
However, for your query, there are three Join Operators:
      Join3
      /    \
   Join2 T4
   /    \
Join1 T3
 /  \
T1 T2

At t4, after sending "flink" to test-topic2:
Join1 will first retract -[flink, null, null, null], then send +[flink,
flink, null, null]
Join2 receives -[flink, null, null, null], will send -[flink, null, null,
null]
Join2 receives +[flink, flink, null, null], will send -[null, null, flink,
null], and +[flink, flink, flink, null]
Join3 receives -[flink, null, null, null], will send -[flink, null, null,
null]
Join3 receives -[null, null, flink, null], will send -[null, null, flink,
flink], and +[null, null, null, flink]
Join3 receives +[flink, flink, flink, null], will send -[null, null, null,
flink], and +[flink, flink, flink, flink]

In my personal opinion, it's normal behavior for current design, and I
can't find an easy way to eliminate this duplication.
Let's wait for Jark and Timo's opinions.

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala

kant kodali <kanth...@gmail.com> 于2020年2月2日周日 上午5:18写道:

> Wondering if anyone had a chance to look through this or should I create
> the JIRA?
>
>
>
> On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Kant,
>>
>> I am not an expert on Flink's SQL implementation. Hence, I'm pulling in
>> Timo and Jark who might help you with your question.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 28, 2020 at 10:46 PM kant kodali <kanth...@gmail.com> wrote:
>>
>>> Sorry. fixed some typos.
>>>
>>> I am doing a streaming outer join from four topics in Kafka lets call
>>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>>> one column which is of tuple string. my query is this
>>>
>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL 
>>> OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on 
>>> sample3.f0=sample4.f0
>>>
>>>
>>> And here is how I send messages to those Kafka topics at various times.
>>>
>>> At time t1 Send a message "flink" to test-topic1
>>>
>>> (true,flink,null,null,null) // Looks good
>>>
>>> At time t2 Send a message "flink" to test-topic4
>>>
>>> (true,null,null,null,flink) // Looks good
>>>
>>> At time t3 Send a message "flink" to test-topic3
>>>
>>> (false,null,null,null,flink) // Looks good
>>> (true,null,null,flink,flink) //Looks good
>>>
>>> At time t4 Send a message "flink" to test-topic2
>>>
>>> (false,flink,null,null,null) // Looks good
>>> (false,null,null,flink,flink) // Looks good
>>> *(true,null,null,null,flink) // Redundant?*
>>> *(false,null,null,null,flink) // Redundant?*
>>> (true,flink,flink,flink,flink) //Looks good
>>>
>>> Assume t1<t2<t3<t4
>>>
>>> Those two rows above seem to be redundant to me although the end result
>>> is correct. Doesn't see the same behavior if I join two topics. These
>>> redundant messages can lead to a lot of database operations underneath so
>>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>>> already fixed in 1.10.
>>>
>>> Attached the code as well.
>>>
>>> Thanks!
>>> kant
>>>
>>>
>>> On Tue, Jan 28, 2020 at 1:43 PM kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am doing a streaming outer join from four topics in Kafka lets call
>>>> them sample1, sample2, sample3, sample4. Each of these test topics has just
>>>> one column which is of tuple string. my query is this
>>>>
>>>> SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 
>>>> FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 
>>>> on sample3.f0=sample4.f0
>>>>
>>>>
>>>> And here is how I send messages to those Kafka topics at various times.
>>>>
>>>> At time t1 Send a message "flink" to test-topic1
>>>>
>>>> (true,flink,null,null,null) // Looks good
>>>>
>>>> At time t2 Send a message "flink" to test-topic4
>>>>
>>>> (true,null,null,null,flink) // Looks good
>>>>
>>>> At time t3 Send a message "flink" to test-topic3
>>>>
>>>> (false,null,null,null,flink) // Looks good
>>>> (true,null,null,flink,flink) //Looks good
>>>>
>>>> At time t3 Send a message "flink" to test-topic2
>>>>
>>>> (false,flink,null,null,null) // Looks good
>>>> (false,null,null,flink,flink) // Looks good
>>>> *(true,null,null,null,flink) // Redundant?*
>>>> *(false,null,null,null,flink) // Redundant?*
>>>> (true,flink,flink,flink,flink) //Looks good
>>>>
>>>> Those two rows above seem to be redundant to be although the end result
>>>> is correct. Doesn't see the same behavior if I join two topics. This
>>>> unwanted message will lead to a lot of database operations underneath so
>>>> any way to optimize this? I am using Flink 1.9 so not sure if this is
>>>> already fixed in 1.10.
>>>>
>>>> Attached the code as well.
>>>>
>>>> Thanks!
>>>> kant
>>>>
>>>>
>>>>
>>>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Reply via email to