Hello Adam,

Please see my comments inline.

On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hi Guozhang
>
> *Re: Questions*
> *1)* I do not yet have a solution to this, but I also did not look that
> closely at it when I begun this KIP. I admit that I was unaware of exactly
> how the GlobalKTable worked alongside the KTable/KStream topologies. You
> mention "It means the two topologies will be merged, and that merged
> topology can only be executed as a single task, by a single thread. " - is
> the problem here that the merged topology would be parallelized to other
> threads/instances? While I am becoming familiar with how the topologies are
> created under the hood, I am not yet fully clear on the implications of
> your statement. I will look into this further.
>
>
Yes. The issue is that today each task is executed by a single thread only
at any given time, and hence any state stores are only accessed by a single
thread (except for interactive queries, and for global tables where the
global update thread write to the global store, and the local thread read
from the global store), if we let the global store update thread to be also
triggering joins and puts send the results into the downstream operators,
then it means that the global store update thread can access on any state
stores in the subsequent part of the topology, breaking our current
threading model.


> *2)* " do you mean that although we have a duplicated state store:
> ModifiedEvents in addition to the original Events with only the enhanced
> key, this is not avoidable anyways even if we do re-keying?" Yes, that is
> correct, that is what I meant. I need to improve my knowledge around this
> component too. I have been browsing the KIP-213 discussion thread and
> looking at Jan's code
>
> *Re: Comments*
> *1) *Makes sense. I will update the diagram accordingly. Thanks!
>
> *2)* Wouldn't outer join require that we emit records from the right
> GlobalKTable that have no match in the left KTable? This seems undefined to
> me with the current proposal (above issues aside), since multiple threads
> would be producing the same output event for a single GlobalKTable update.
>
>
I was considering mainly about the semantics of table-table joins, that
whether we should add this operator inside our API. Implementation wise, we
will only have one global store update thread per instance, so there will
not be multiple threads producing the same output, but still there would be
other issues that we should consider indeed, as mentioned above. Again this
comment is not about implementations, but API wise if it is desirable to
add it.


>
> Questions for you both:
> Q1) Is a KTable always materialized? I am looking at the code under the
> hood, and it seems to me that it's either materialized with an explicit
> Materialized object, or it's given an anonymous name and the default serdes
> are used. Am I correct in this observation?
>
>
A KTable is not always materialized. For example, a KTable generated from
`KTable#filter` or `KTable#mapValues` does not create a new materialized
state store, but we use the caller `KTable` 's state store for anyone who
wants to query it in joins.

Moving forward, we are also trying to optimize the topology to only
"logically" materialize a KTable when necessary, this is summarized in
https://issues.apache.org/jira/browse/KAFKA-6761


>
> Thanks,
> Adam
>
>

-- 
-- Guozhang

Reply via email to