SGTM.

I also had the impression that those duplicates are rather an error than
an case of eventual consistency. Using hashing to avoid sending the
payload is a good idea IMHO.

@Adam: can you update the KIP accordingly?

 - add the optimization to not send a reply from RHS to LHS on
unsubscribe (if not a tombstone)
 - explain why using offsets to avoid duplicates does not work
 - add hashing to avoid duplicates

Beside this, I don't have any further comments. Excited to finally get
this in!

Let us know when you have updated the KIP so we can move forward with
the VOTE. Thanks a lot for your patience! This was a very loooong shot!


-Matthias

On 3/8/19 8:47 AM, John Roesler wrote:
> Hi all,
> 
> This proposal sounds good to me, especially since we observe that people
> are already confused when the see duplicate results coming out of 1:1 joins
> (which is a bug). I take this as "evidence" that we're better off
> eliminating those duplicates from the start. Guozhang's proposal seems like
> a lightweight solution to the problem, so FWIW, I'm in favor.
> 
> Thanks,
> -John
> 
> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
> 
>> Hi Guozhang
>>
>> That would certainly work for eliminating those duplicate values. As it
>> stands right now, this would be consistent with swallowing changes due to
>> out-of-order processing with multiple threads, and seems like a very
>> reasonable way forward. Thank you for the suggestion!
>>
>> I have been trying to think if there are any other scenarios where we can
>> end up with duplicates, though I have not been able to identify any others
>> at the moment. I will think on it a bit more, but if anyone else has any
>> ideas, please chime in.
>>
>> Thanks,
>> Adam
>>
>>
>>
>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> One more thought regarding *c-P2: Duplicates)*: first I want to separate
>>> this issue with the more general issue that today (not only foreign-key,
>>> but also co-partition primary-key) table-table joins is still not
>> strictly
>>> respecting the timestamp ordering since the two changelog streams may be
>>> fetched and hence processed out-of-order and we do not allow a record to
>> be
>>> joined with the other table at any given time snapshot yet. So ideally
>> when
>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2)) coming
>>> at the left hand table and one record (f-k1, v3) at the right hand table,
>>> depending on the processing ordering we may get:
>>>
>>> (k1, (f-k1, v2-v3))
>>>
>>> or
>>>
>>> (k1, (f-k1, v1-v3))
>>> (k1, (f-k1, v2-v3))
>>>
>>> And this is not to be addressed by this KIP.
>>>
>>> What I would advocate is to fix the issue that is introduced in this KIP
>>> alone, that is we may have
>>>
>>> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
>>> (k1, (f-k1, v2-v3))
>>>
>>> I admit that it does not have correctness issue from the semantics along,
>>> comparing it with "discarding the first result", but it may be confusing
>>> from user's observation who do not expect to see the seemingly
>> duplicates.
>>> On the other hand, I think there's a light solution to avoid it, which is
>>> that we can still optimize away to not send the full payload of "v1" from
>>> left hand side to right hand side, but instead of just trimming off the
>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5
>>> here just as an example, we can definitely replace it with other
>>> functions), by doing which we can discard the join operation if the hash
>>> value sent back from the right hand side does not match with the left
>> hand
>>> side any more, i.e. we will only send:
>>>
>>> (k1, (f-k1, v2-v3))
>>>
>>> to down streams once.
>>>
>>> WDYT?
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <adam.bellem...@gmail.com>
>>> wrote:
>>>
>>>> Ah yes, I recall it all now. That answers that question as to why I had
>>>> caching disabled. I can certainly re-enable it since I believe the main
>>>> concern was simply about reconciling those two iterators. A lack of
>>>> knowledge there on my part.
>>>>
>>>>
>>>> Thank you John for weighing in - we certainly both do appreciate it. I
>>>> think that John hits it on the head though with his comment of "If it
>>> turns
>>>> out we're wrong about this, then it should be possible to fix the
>>> semantics
>>>> in place, without messing with the API."
>>>>
>>>> If anyone else would like to weigh in, your thoughts would be greatly
>>>> appreciated.
>>>>
>>>> Thanks
>>>>
>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>>>>> I dont know how to range scan over a caching store, probably one
>> had
>>>>>>> to open 2 iterators and merge them.
>>>>>
>>>>> That happens automatically. If you query a cached KTable, it ranges
>>> over
>>>>> the cache and the underlying RocksDB and performs the merging under
>> the
>>>>> hood.
>>>>>
>>>>>>> Other than that, I still think even the regualr join is broken
>> with
>>>>>>> caching enabled right?
>>>>>
>>>>> Why? To me, if you use the word "broker", it implies conceptually
>>>>> incorrect; I don't see this.
>>>>>
>>>>>> I once files a ticket, because with caching
>>>>>>>> enabled it would return values that havent been published
>>> downstream
>>>>> yet.
>>>>>
>>>>> For the bug report, if found
>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still need to
>> fix
>>>>> this, but it is a regular bug as any other, and we should not change
>> a
>>>>> design because of a bug.
>>>>>
>>>>> That range() returns values that have not been published downstream
>> if
>>>>> caching is enabled is how caching works and is intended behavior. Not
>>>>> sure why say it's incorrect?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote:
>>>>>>
>>>>>>
>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote:
>>>>>>> Thanks Adam,
>>>>>>>
>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there is
>> no
>>>>>>> functional/correctness requirement to disable caching. I cannot
>>>>>>> remember why Jan's proposal added this? It might be an
>>>>>>> implementation detail though (maybe just remove it from the KIP?
>>>>>>> -- might be miss leading).
>>>>>>
>>>>>> I dont know how to range scan over a caching store, probably one
>> had
>>>>>> to open 2 iterators and merge them.
>>>>>>
>>>>>> Other than that, I still think even the regualr join is broken with
>>>>>> caching enabled right? I once files a ticket, because with caching
>>>>>> enabled it would return values that havent been published
>> downstream
>>>> yet.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to