As for 2), just to clarify that co-partitioning is still needed to make sure that a record from topic-2 can get the expected data from the local materialized store from topic-1 --- this would be required for either DSL or for processor API.
What I was suggesting is that, when sending to topic-2, by default it will be partitioned on the key of the message, and hence it would not likely be co-partitioned. However, you can use `StreamPartitioner` on either stream.to(Produced.withStreamPartitioner) for DSL users, or directly topology.addSinkNode(... StreamPartitioner) for Processor users, to override the default partitioning key from the message key to scheme that is consistent with the key of topic-1. Then we can guarantee they are co-partitioned. Guozhang On Tue, Apr 9, 2019 at 7:59 PM Raman Gupta <rocketra...@gmail.com> wrote: > Hmm, not sure I understand what you are suggesting. Let me address > each step in turn: > > > 1) materialize the topic-1 into a state store > > Ok, I think that's basically what we have with the global k-table I > showed in the topology, or did you mean something else, like using the > Processor API to populate our own state store? > > > 2) use the same key as the partitioning key when writing to topic-2 to > make sure it is co-partitioned (unless the resulted > stream from topic-2 does not need to rely on it being partitioned by > the key to perform other operations, it is okay) > > Our situation is the latter. The key in topic-2 is essentially a > unique id, and partitioning should therefore be pretty random. Also > remember that the cardinality of topic-1 to topic-2 is 1:n, so there > are multiple different keys on topic-2 for each key in topic-1. > > > 3) we can just issue `get` as part of the lower-level processor API > rather than performing a join to query the materialized table from topic-1. > > Right now, we're already doing a `get` on the global k-table as part > of the DSL API. Does it make a difference whether we use the Processor > API? How does that resolve the race condition? > > Regards, > Raman > > > > On Fri, Apr 5, 2019 at 12:00 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > I see. > > > > So back to your original question, yes there will be a race condition > since > > the global ktable is updated with a separate thread other than the one > > which is reading from topic-2 and process the record and query the global > > ktable. > > > > What I can think of on top of my head, is to 1) materialize the topic-1 > > into a state store, and 2) use the same key as the partitioning key when > > writing to topic-2 to make sure it is co-partitioned (unless the resulted > > stream from topic-2 does not need to rely on it being partitioned by the > > key to perform other operations, it is okay), and then 3) we can just > issue > > `get` as part of the lower-level processor API rather than performing a > > join to query the materialized table from topic-1. Does that make sense? > > > > Guozhang > > > > On Thu, Apr 4, 2019 at 7:38 AM Raman Gupta <rocketra...@gmail.com> > wrote: > > > > > Yes, the stream transformation of `topic-1` to `topic-2` is a > > > heavyweight operation producing completely different information on > > > topic-2 than is contained on topic-1 (the cardinality is 1-n as well, > > > not 1-1). The schema evolution I am attempting to perform should have > > > captured the data at time of write of topic-2 but didn't. It is easily > > > available in topic-1 though, using some other information in the > > > payload of topic-2. > > > > > > Regards, > > > Raman > > > > > > On Thu, Apr 4, 2019 at 12:57 AM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > > > Hi Raman, > > > > > > > > What I'm not clear is that since topic-2 is a transformed topic of > > > topic-1 > > > > via "other stream", then why do you still need to join it with > topic-1? > > > Or > > > > in other words, are topic-1 and topic-2 containing different data, or > > > > topic-2 is just storing similar data of topic-1 but just in different > > > > format (since it was a transformation result of topic-1 via "other > > > stream")? > > > > > > > > > > > > > > > > Guozhang > > > > > > > > On Tue, Apr 2, 2019 at 4:07 PM Raman Gupta <rocketra...@gmail.com> > > > wrote: > > > > > > > > > Yes, I forgot to show an item on the topology: > > > > > > > > > > +-----------> global-ktable +---------+ > > > > > | | > > > > > + v > > > > > topic-1 stream +----> topic-3 > > > > > + ^ > > > > > | | > > > > > +----> other stream +--> topic-2 +----+ > > > > > > > > > > My use case is a "schema evolution" of the data in topic-2, to > produce > > > > > topic-3 via "stream". In order to perform this schema evolution, I > > > > > need to pull some attributes from the payloads in topic-1. I can't > > > > > simply join topic-1 and topic-2 because they do not share keys. The > > > > > global-ktable allows me to easily look up the values I need from > > > > > topic-1 using an attribute from the payload of topic-2, and combine > > > > > those to write to topic-3. > > > > > > > > > > Regards, > > > > > Raman > > > > > > > > > > On Tue, Apr 2, 2019 at 6:56 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > > > > Hello Raman, > > > > > > > > > > > > It seems from your case that `topic-1` is used for both the > global > > > ktable > > > > > > as well as another stream, which then be transformed to a new > stream > > > that > > > > > > will be "joined" somehow with the global ktable. Could you > elaborate > > > your > > > > > > case a bit more on why do you want to use the same source topic > for > > > two > > > > > > entities in your topology? > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Apr 2, 2019 at 3:41 PM Raman Gupta < > rocketra...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > I have a topology like this: > > > > > > > > > > > > > > +-----------> global-ktable +---------+ > > > > > > > | | > > > > > > > + v > > > > > > > topic-1 stream > > > > > > > + ^ > > > > > > > | | > > > > > > > +----> other stream +--> topic-2 +----+ > > > > > > > > > > > > > > IOW, a global ktable is built from topic-1. Meanwhile, "other > > > stream" > > > > > > > transforms topic-1 to topic-2. Finally, "stream" operators on > > > topic-2, > > > > > > > and as part of its logic, reads data from "global-ktable". > > > > > > > > > > > > > > I am worried about the race condition present in "stream" > between > > > the > > > > > > > message showing up on topic-2, and the "get" from > "global-ktable". > > > Is > > > > > > > there a way, other than retrying the `get`, to avoid this race? > > > > > > > > > > > > > > Regards, > > > > > > > Raman > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > -- > > -- Guozhang > -- -- Guozhang