Currently topic-1 and topic-2 have a different number of partitions (due to vastly different concurrency/processing time requirements). So in order to accomplish this, I'd also need to open a can of worms and repartition topic-1 to create topic-1a, so that it can be co-partitioned with topic-2a, which would have to be rebuilt from topic-2.
This is a heck of a lot of hoops to jump through, and in any case, even if I did this, I'm still unclear on how this actually solves the race condition (question #3 in my previous message)? Regards, Raman On Fri, Apr 12, 2019 at 1:57 PM Guozhang Wang <wangg...@gmail.com> wrote: > > As for 2), just to clarify that co-partitioning is still needed to make > sure that a record from topic-2 can get the expected data from the local > materialized store from topic-1 --- this would be required for either DSL > or for processor API. > > What I was suggesting is that, when sending to topic-2, by default it will > be partitioned on the key of the message, and hence it would not likely be > co-partitioned. However, you can use `StreamPartitioner` on either > stream.to(Produced.withStreamPartitioner) > for DSL users, or directly topology.addSinkNode(... StreamPartitioner) for > Processor users, to override the default partitioning key from the message > key to scheme that is consistent with the key of topic-1. Then we can > guarantee they are co-partitioned. > > Guozhang > > On Tue, Apr 9, 2019 at 7:59 PM Raman Gupta <rocketra...@gmail.com> wrote: > > > Hmm, not sure I understand what you are suggesting. Let me address > > each step in turn: > > > > > 1) materialize the topic-1 into a state store > > > > Ok, I think that's basically what we have with the global k-table I > > showed in the topology, or did you mean something else, like using the > > Processor API to populate our own state store? > > > > > 2) use the same key as the partitioning key when writing to topic-2 to > > make sure it is co-partitioned (unless the resulted > > stream from topic-2 does not need to rely on it being partitioned by > > the key to perform other operations, it is okay) > > > > Our situation is the latter. The key in topic-2 is essentially a > > unique id, and partitioning should therefore be pretty random. Also > > remember that the cardinality of topic-1 to topic-2 is 1:n, so there > > are multiple different keys on topic-2 for each key in topic-1. > > > > > 3) we can just issue `get` as part of the lower-level processor API > > rather than performing a join to query the materialized table from topic-1. > > > > Right now, we're already doing a `get` on the global k-table as part > > of the DSL API. Does it make a difference whether we use the Processor > > API? How does that resolve the race condition? > > > > Regards, > > Raman > > > > > > > > On Fri, Apr 5, 2019 at 12:00 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > > I see. > > > > > > So back to your original question, yes there will be a race condition > > since > > > the global ktable is updated with a separate thread other than the one > > > which is reading from topic-2 and process the record and query the global > > > ktable. > > > > > > What I can think of on top of my head, is to 1) materialize the topic-1 > > > into a state store, and 2) use the same key as the partitioning key when > > > writing to topic-2 to make sure it is co-partitioned (unless the resulted > > > stream from topic-2 does not need to rely on it being partitioned by the > > > key to perform other operations, it is okay), and then 3) we can just > > issue > > > `get` as part of the lower-level processor API rather than performing a > > > join to query the materialized table from topic-1. Does that make sense? > > > > > > Guozhang > > > > > > On Thu, Apr 4, 2019 at 7:38 AM Raman Gupta <rocketra...@gmail.com> > > wrote: > > > > > > > Yes, the stream transformation of `topic-1` to `topic-2` is a > > > > heavyweight operation producing completely different information on > > > > topic-2 than is contained on topic-1 (the cardinality is 1-n as well, > > > > not 1-1). The schema evolution I am attempting to perform should have > > > > captured the data at time of write of topic-2 but didn't. It is easily > > > > available in topic-1 though, using some other information in the > > > > payload of topic-2. > > > > > > > > Regards, > > > > Raman > > > > > > > > On Thu, Apr 4, 2019 at 12:57 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > > > > Hi Raman, > > > > > > > > > > What I'm not clear is that since topic-2 is a transformed topic of > > > > topic-1 > > > > > via "other stream", then why do you still need to join it with > > topic-1? > > > > Or > > > > > in other words, are topic-1 and topic-2 containing different data, or > > > > > topic-2 is just storing similar data of topic-1 but just in different > > > > > format (since it was a transformation result of topic-1 via "other > > > > stream")? > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Tue, Apr 2, 2019 at 4:07 PM Raman Gupta <rocketra...@gmail.com> > > > > wrote: > > > > > > > > > > > Yes, I forgot to show an item on the topology: > > > > > > > > > > > > +-----------> global-ktable +---------+ > > > > > > | | > > > > > > + v > > > > > > topic-1 stream +----> topic-3 > > > > > > + ^ > > > > > > | | > > > > > > +----> other stream +--> topic-2 +----+ > > > > > > > > > > > > My use case is a "schema evolution" of the data in topic-2, to > > produce > > > > > > topic-3 via "stream". In order to perform this schema evolution, I > > > > > > need to pull some attributes from the payloads in topic-1. I can't > > > > > > simply join topic-1 and topic-2 because they do not share keys. The > > > > > > global-ktable allows me to easily look up the values I need from > > > > > > topic-1 using an attribute from the payload of topic-2, and combine > > > > > > those to write to topic-3. > > > > > > > > > > > > Regards, > > > > > > Raman > > > > > > > > > > > > On Tue, Apr 2, 2019 at 6:56 PM Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > Hello Raman, > > > > > > > > > > > > > > It seems from your case that `topic-1` is used for both the > > global > > > > ktable > > > > > > > as well as another stream, which then be transformed to a new > > stream > > > > that > > > > > > > will be "joined" somehow with the global ktable. Could you > > elaborate > > > > your > > > > > > > case a bit more on why do you want to use the same source topic > > for > > > > two > > > > > > > entities in your topology? > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > On Tue, Apr 2, 2019 at 3:41 PM Raman Gupta < > > rocketra...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > I have a topology like this: > > > > > > > > > > > > > > > > +-----------> global-ktable +---------+ > > > > > > > > | | > > > > > > > > + v > > > > > > > > topic-1 stream > > > > > > > > + ^ > > > > > > > > | | > > > > > > > > +----> other stream +--> topic-2 +----+ > > > > > > > > > > > > > > > > IOW, a global ktable is built from topic-1. Meanwhile, "other > > > > stream" > > > > > > > > transforms topic-1 to topic-2. Finally, "stream" operators on > > > > topic-2, > > > > > > > > and as part of its logic, reads data from "global-ktable". > > > > > > > > > > > > > > > > I am worried about the race condition present in "stream" > > between > > > > the > > > > > > > > message showing up on topic-2, and the "get" from > > "global-ktable". > > > > Is > > > > > > > > there a way, other than retrying the `get`, to avoid this race? > > > > > > > > > > > > > > > > Regards, > > > > > > > > Raman > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > -- > -- Guozhang