For 3), my reasoning is that: since topic-2 is derived from topic1, and hence as long as they are co-partitioned (meaning that the record from topic-1 resulting in the record(s) in topic-2 are guaranteed to be sent to the same client) there's no race condition since the record to query will always be guaranteed to be materialized already.
As for the multiple partitions issue, yeah that's indeed a problem. I cannot come up with a very elegant solution at the top of my head, maybe you can use interactive query from your stream (resulted from topic-2) to query the materialized state out of topic-1 as part of your processing logic, but that also means inter-client communication so there may be a perf hit that'll be considered. Guozhang On Fri, Apr 12, 2019 at 2:22 PM Raman Gupta <rocketra...@gmail.com> wrote: > Currently topic-1 and topic-2 have a different number of partitions > (due to vastly different concurrency/processing time requirements). So > in order to accomplish this, I'd also need to open a can of worms and > repartition topic-1 to create topic-1a, so that it can be > co-partitioned with topic-2a, which would have to be rebuilt from > topic-2. > > This is a heck of a lot of hoops to jump through, and in any case, > even if I did this, I'm still unclear on how this actually solves the > race condition (question #3 in my previous message)? > > Regards, > Raman > > On Fri, Apr 12, 2019 at 1:57 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > As for 2), just to clarify that co-partitioning is still needed to make > > sure that a record from topic-2 can get the expected data from the local > > materialized store from topic-1 --- this would be required for either DSL > > or for processor API. > > > > What I was suggesting is that, when sending to topic-2, by default it > will > > be partitioned on the key of the message, and hence it would not likely > be > > co-partitioned. However, you can use `StreamPartitioner` on either > > stream.to(Produced.withStreamPartitioner) > > for DSL users, or directly topology.addSinkNode(... StreamPartitioner) > for > > Processor users, to override the default partitioning key from the > message > > key to scheme that is consistent with the key of topic-1. Then we can > > guarantee they are co-partitioned. > > > > Guozhang > > > > On Tue, Apr 9, 2019 at 7:59 PM Raman Gupta <rocketra...@gmail.com> > wrote: > > > > > Hmm, not sure I understand what you are suggesting. Let me address > > > each step in turn: > > > > > > > 1) materialize the topic-1 into a state store > > > > > > Ok, I think that's basically what we have with the global k-table I > > > showed in the topology, or did you mean something else, like using the > > > Processor API to populate our own state store? > > > > > > > 2) use the same key as the partitioning key when writing to topic-2 > to > > > make sure it is co-partitioned (unless the resulted > > > stream from topic-2 does not need to rely on it being partitioned by > > > the key to perform other operations, it is okay) > > > > > > Our situation is the latter. The key in topic-2 is essentially a > > > unique id, and partitioning should therefore be pretty random. Also > > > remember that the cardinality of topic-1 to topic-2 is 1:n, so there > > > are multiple different keys on topic-2 for each key in topic-1. > > > > > > > 3) we can just issue `get` as part of the lower-level processor API > > > rather than performing a join to query the materialized table from > topic-1. > > > > > > Right now, we're already doing a `get` on the global k-table as part > > > of the DSL API. Does it make a difference whether we use the Processor > > > API? How does that resolve the race condition? > > > > > > Regards, > > > Raman > > > > > > > > > > > > On Fri, Apr 5, 2019 at 12:00 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > > > I see. > > > > > > > > So back to your original question, yes there will be a race condition > > > since > > > > the global ktable is updated with a separate thread other than the > one > > > > which is reading from topic-2 and process the record and query the > global > > > > ktable. > > > > > > > > What I can think of on top of my head, is to 1) materialize the > topic-1 > > > > into a state store, and 2) use the same key as the partitioning key > when > > > > writing to topic-2 to make sure it is co-partitioned (unless the > resulted > > > > stream from topic-2 does not need to rely on it being partitioned by > the > > > > key to perform other operations, it is okay), and then 3) we can just > > > issue > > > > `get` as part of the lower-level processor API rather than > performing a > > > > join to query the materialized table from topic-1. Does that make > sense? > > > > > > > > Guozhang > > > > > > > > On Thu, Apr 4, 2019 at 7:38 AM Raman Gupta <rocketra...@gmail.com> > > > wrote: > > > > > > > > > Yes, the stream transformation of `topic-1` to `topic-2` is a > > > > > heavyweight operation producing completely different information on > > > > > topic-2 than is contained on topic-1 (the cardinality is 1-n as > well, > > > > > not 1-1). The schema evolution I am attempting to perform should > have > > > > > captured the data at time of write of topic-2 but didn't. It is > easily > > > > > available in topic-1 though, using some other information in the > > > > > payload of topic-2. > > > > > > > > > > Regards, > > > > > Raman > > > > > > > > > > On Thu, Apr 4, 2019 at 12:57 AM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > > > > Hi Raman, > > > > > > > > > > > > What I'm not clear is that since topic-2 is a transformed topic > of > > > > > topic-1 > > > > > > via "other stream", then why do you still need to join it with > > > topic-1? > > > > > Or > > > > > > in other words, are topic-1 and topic-2 containing different > data, or > > > > > > topic-2 is just storing similar data of topic-1 but just in > different > > > > > > format (since it was a transformation result of topic-1 via > "other > > > > > stream")? > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Apr 2, 2019 at 4:07 PM Raman Gupta < > rocketra...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Yes, I forgot to show an item on the topology: > > > > > > > > > > > > > > +-----------> global-ktable +---------+ > > > > > > > | | > > > > > > > + v > > > > > > > topic-1 stream +----> topic-3 > > > > > > > + ^ > > > > > > > | | > > > > > > > +----> other stream +--> topic-2 +----+ > > > > > > > > > > > > > > My use case is a "schema evolution" of the data in topic-2, to > > > produce > > > > > > > topic-3 via "stream". In order to perform this schema > evolution, I > > > > > > > need to pull some attributes from the payloads in topic-1. I > can't > > > > > > > simply join topic-1 and topic-2 because they do not share > keys. The > > > > > > > global-ktable allows me to easily look up the values I need > from > > > > > > > topic-1 using an attribute from the payload of topic-2, and > combine > > > > > > > those to write to topic-3. > > > > > > > > > > > > > > Regards, > > > > > > > Raman > > > > > > > > > > > > > > On Tue, Apr 2, 2019 at 6:56 PM Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > Hello Raman, > > > > > > > > > > > > > > > > It seems from your case that `topic-1` is used for both the > > > global > > > > > ktable > > > > > > > > as well as another stream, which then be transformed to a new > > > stream > > > > > that > > > > > > > > will be "joined" somehow with the global ktable. Could you > > > elaborate > > > > > your > > > > > > > > case a bit more on why do you want to use the same source > topic > > > for > > > > > two > > > > > > > > entities in your topology? > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Tue, Apr 2, 2019 at 3:41 PM Raman Gupta < > > > rocketra...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > I have a topology like this: > > > > > > > > > > > > > > > > > > +-----------> global-ktable +---------+ > > > > > > > > > | | > > > > > > > > > + v > > > > > > > > > topic-1 stream > > > > > > > > > + ^ > > > > > > > > > | | > > > > > > > > > +----> other stream +--> topic-2 +----+ > > > > > > > > > > > > > > > > > > IOW, a global ktable is built from topic-1. Meanwhile, > "other > > > > > stream" > > > > > > > > > transforms topic-1 to topic-2. Finally, "stream" operators > on > > > > > topic-2, > > > > > > > > > and as part of its logic, reads data from "global-ktable". > > > > > > > > > > > > > > > > > > I am worried about the race condition present in "stream" > > > between > > > > > the > > > > > > > > > message showing up on topic-2, and the "get" from > > > "global-ktable". > > > > > Is > > > > > > > > > there a way, other than retrying the `get`, to avoid this > race? > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Raman > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > -- > > -- Guozhang > -- -- Guozhang