Hi Pushkar, I’m glad you’ve been able to work through the issues.
The GlobalKTable does store the data in memory (or on disk, depending how you configure it). I think the in-memory version uses a TreeMap, which is logarithmic time access. I think you’ll find it sufficiently fast regardless. Thanks, John On Mon, May 11, 2020, at 06:51, Pushkar Deole wrote: > John, > > I think I can get the cache structure modified to make use of GlobalKTable > here so the data can be shared across. I could get information that the > admin data will be uploaded well in advance before main events so the issue > with 'missed joins' won't exists since by the time main events start > flowing, the admin data has been synchronized to all the service instances. > > By the way, I would like to ask you how GlobalKTable works internally, does > it store all data in memory or it gets it from the backed topic everytime? > Secondly, what kind of internal data structure does it use? Is it good for > constant time performance? > > On Thu, May 7, 2020 at 7:27 PM John Roesler <vvcep...@apache.org> wrote: > > > Hi Pushkar, > > > > To answer your question about tuning the global store latency, I think the > > biggest impact thing you can do is to configure the consumer that loads the > > data for global stores. You can pass configs specifically to the global > > consumer with the prefix: “ global.consumer.” > > > > Regarding the larger situation, it seems like the global table and a > > distributed cache would display the same basic behavior in terms of the > > potential for missed joins. Then, it probably makes sense to go for the > > option with fewer components to implement and maintain, which to me points > > to the global KTable. > > > > Since you can anticipate that missed joins can be a problem, you can build > > in some metrics and reporting for how many misses you actually observe, and > > potentially redesign the app if it’s actually a problem. > > > > I hope this helps! > > -John > > > > On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote: > > > Thanks John... appreciate your inputs and suggestions. I have been > > assigned > > > recently to this task (of persisting the cache) and haven't been involved > > > in original design and architecture and agree with all the issues you > > have > > > highlighted. > > > However, at this point, i don't think the application can be converted to > > > streams since the design is not flexible and it would require lot of > > > rewrite of code plus subsequent testing. > > > > > > My first thought was to use external database only, preferably the > > > distributed caching systems like Apache Ignite since it will have least > > > impact on performance. Going to database for every event would impact the > > > throughput a lot. Probably having distributed caching (key/value pairs) > > > would have comparatively lesser impact. > > > Second choice is to go for GlobalKTable however this needs to be done > > very > > > carefully. > > > > > > Thanks again! > > > > > > On Mon, May 4, 2020 at 11:18 PM Pushkar Deole <pdeole2...@gmail.com> > > wrote: > > > > > > > Thanks John... what parameters would affect the latency in case > > > > GlobalKTable will be used and is there any configurations that could be > > > > tuned to minimize the latency of sync with input topic? > > > > > > > > On Mon, May 4, 2020 at 10:20 PM John Roesler <vvcep...@apache.org> > > wrote: > > > > > > > >> Hello Pushkar, > > > >> > > > >> Yes, that’s correct. The operation you describe is currently not > > > >> supported. If you want to keep the structure you described in place, > > I’d > > > >> suggest using an external database for the admin objects. I’ll give > > another > > > >> idea below. > > > >> > > > >> With your current architecture, I’m a little concerned about data > > races. > > > >> From what I saw, nothing would prevent processing stream records with > > agent > > > >> 10 before you process the admin record with agent 10. This problem > > will > > > >> persist no matter where you locate the cache. > > > >> > > > >> GlobalKTable would no doubt make it worse, since it increases the > > latency > > > >> before admin record 10 is queriable everywhere. > > > >> > > > >> I think you’ll want to make a call between architecture simplicity > > > >> (remote cache or global KTable) vs the probability of missed joins. > > > >> > > > >> I think the “best” way to solve this problem (that comes to mind > > anyway) > > > >> might be to > > > >> 1. Repartition the stream to be co-partitioned with the admin records. > > > >> 2. Do a local (not global) stream-table join > > > >> 3. Enable task idling > > > >> > > > >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make > > the > > > >> agent Id the new key of the stream, and then use ‘through’, (where the > > > >> intermediate topic has the same number of partitions as the admin > > topic) to > > > >> do the repartitioning. In 2.6, there is a “repartition” operator that > > will > > > >> make this easier. > > > >> > > > >> The repartition ensures that all stream records with agent id 10 will > > be > > > >> processed by the same thread that processes the admin records with > > agent id > > > >> 10, hence it will be able to find agent 10 in the local KTable store. > > > >> > > > >> Task idling will minimize your chances of missing any enrichments. > > When a > > > >> task has two inputs (E.g., your repartitioned stream joining with the > > admin > > > >> table), it makes Streams wait until both inputs are buffered before > > > >> processing, so it can do a better job of processing in timestamp > > order. > > > >> > > > >> I hope this helps! > > > >> -John > > > >> > > > >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote: > > > >> > If i understand correctly, Kafka is not designed to provide > > replicated > > > >> > caching mechanism wherein the updates to cache will be synchronous > > > >> across > > > >> > multiple cache instances. > > > >> > > > > >> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pdeole2...@gmail.com > > > > > > >> wrote: > > > >> > > > > >> > > Thanks John. > > > >> > > > > > >> > > Actually, this is a normal consumer-producer application wherein > > > >> there are > > > >> > > 2 consumers (admin consumer and main consumer) consuming messages > > > >> from 2 > > > >> > > different topics. > > > >> > > One of the consumers consumes messages from a admin topic and > > > >> populates > > > >> > > data in a cache e.g. lets say agent with agent id 10 for which the > > > >> first > > > >> > > name and last name is received is populated in cache. When the > > other > > > >> > > consumer consumes message and it has agent id 10 then it reads the > > > >> cache, > > > >> > > appends the first name and last name and then sends enriched > > event to > > > >> > > producer. > > > >> > > In this case, each application instance consumes all the events > > from > > > >> admin > > > >> > > topic (unique consumer id) and keeps them in the cache in memory. > > > >> > > Now the requirement is to persist the cache and make is shared > > > >> between the > > > >> > > application instances, so each instance would consume partitions > > of > > > >> admin > > > >> > > topic and write to admin cache. > > > >> > > > > > >> > > If we want to use kafka streams, the application is so much > > evolved > > > >> that > > > >> > > it is difficult to migrate to streams at this stage. Secondly, > > from > > > >> past > > > >> > > mail chains, streams also won't serve the requirement since local > > > >> state > > > >> > > stores would just hold the local state of admin data and the cache > > > >> written > > > >> > > by each instance won't be shared with other instances. > > > >> > > > > > >> > > Global state stores may help but again it requires writing to the > > > >> topic > > > >> > > which is then synced with the state stores in the instances and > > the > > > >> > > instances may not be in sync with each. > > > >> > > I am not sure if this would cause any inconsistencies since i > > don't > > > >> know > > > >> > > how the events would flow from source e.g. if admin data is > > consumed > > > >> by one > > > >> > > instance which then modified the topic but it is not yet synced to > > > >> all the > > > >> > > global state stores and the next event arrived on the main > > consumer > > > >> on a > > > >> > > different instance and it tried to read from store cache then it > > > >> doesn't > > > >> > > get the data, so the event passed on without enriched data. > > > >> > > That's pretty much about the use case. > > > >> > > > > > >> > > > > > >> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vvcep...@apache.org> > > > >> wrote: > > > >> > > > > > >> > >> Hi Pushkar, > > > >> > >> > > > >> > >> I’ve been wondering if we should add writable tables to the > > Streams > > > >> api. > > > >> > >> Can you explain more about your use case and how it would > > integrate > > > >> with > > > >> > >> your application? > > > >> > >> > > > >> > >> Incidentally, this would also help us provide more concrete > > advice. > > > >> > >> > > > >> > >> Thanks! > > > >> > >> John > > > >> > >> > > > >> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote: > > > >> > >> > Both stores sever a different purpose. > > > >> > >> > > > > >> > >> > Regular stores allow you to store state the application > > computes. > > > >> > >> > Writing into the changelog is a fault-tolerance mechanism. > > > >> > >> > > > > >> > >> > Global store hold "axially" data that is provided from > > "outside" > > > >> of the > > > >> > >> > app. There is no changelog topic, but only the input topic > > (that > > > >> is used > > > >> > >> > to re-create the global state). > > > >> > >> > > > > >> > >> > Local stores are sharded and updates are "sync" as they don't > > need > > > >> to be > > > >> > >> > shared with anybody else. > > > >> > >> > > > > >> > >> > For global stores, as all instances need to be updated, > > updates are > > > >> > >> > async (we don't know when which instance will update it's own > > > >> global > > > >> > >> > store replica). > > > >> > >> > > > > >> > >> > >> Say one stream thread updates the topic for global store and > > > >> starts > > > >> > >> > >> processing next event wherein the processor tries to read > > the > > > >> global > > > >> > >> store > > > >> > >> > >> which may not have been synced with the topic? > > > >> > >> > > > > >> > >> > Correct. There is no guarantee when the update to the global > > store > > > >> will > > > >> > >> > be applied. As said, global stores are not designed to hold > > data > > > >> the > > > >> > >> > application computes. > > > >> > >> > > > > >> > >> > > > > >> > >> > -Matthias > > > >> > >> > > > > >> > >> > > > > >> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote: > > > >> > >> > > thanks... will try with GlobalKTable. > > > >> > >> > > As a side question, I didn't really understand the > > significance > > > >> of > > > >> > >> global > > > >> > >> > > state store which kind of works in a reverse way to local > > state > > > >> store > > > >> > >> i.e. > > > >> > >> > > local state store is updated and then saved to changelog > > topic > > > >> > >> whereas in > > > >> > >> > > case of global state store the topic is updated first and > > then > > > >> synced > > > >> > >> to > > > >> > >> > > global state store. Do these two work in sync i.e. the > > update to > > > >> > >> topic and > > > >> > >> > > global state store ? > > > >> > >> > > > > > >> > >> > > Say one stream thread updates the topic for global store and > > > >> starts > > > >> > >> > > processing next event wherein the processor tries to read the > > > >> global > > > >> > >> store > > > >> > >> > > which may not have been synced with the topic? > > > >> > >> > > > > > >> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax < > > mj...@apache.org > > > >> > > > > >> > >> wrote: > > > >> > >> > > > > > >> > >> > >> Yes. > > > >> > >> > >> > > > >> > >> > >> A `GlobalKTable` uses a global store internally. > > > >> > >> > >> > > > >> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or > > > >> > >> > >> `Topology.addGlobalStore()` to add a global store > > "manually". > > > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> -Matthias > > > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote: > > > >> > >> > >>> Thanks Matthias. > > > >> > >> > >>> Can you elaborate on the replicated caching layer part? > > > >> > >> > >>> When you say global stores, do you mean GlobalKTable > > created > > > >> from a > > > >> > >> topic > > > >> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method > > ? > > > >> > >> > >>> > > > >> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax < > > > >> mj...@apache.org> > > > >> > >> > >> wrote: > > > >> > >> > >>> > > > >> > >> > >>>> It's not possible to modify state store from "outside". > > > >> > >> > >>>> > > > >> > >> > >>>> If you want to build a "replicated caching layer", you > > could > > > >> use > > > >> > >> global > > > >> > >> > >>>> stores and write into the corresponding topics to update > > all > > > >> > >> stores. Of > > > >> > >> > >>>> course, those updates would be async. > > > >> > >> > >>>> > > > >> > >> > >>>> > > > >> > >> > >>>> -Matthias > > > >> > >> > >>>> > > > >> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote: > > > >> > >> > >>>>> Hi All, > > > >> > >> > >>>>> > > > >> > >> > >>>>> I am wondering if this is possible: i have been asked to > > use > > > >> state > > > >> > >> > >> stores > > > >> > >> > >>>>> as a general replicated cache among multiple instances of > > > >> service > > > >> > >> > >>>> instances > > > >> > >> > >>>>> however the state store is created through streambuilder > > but > > > >> is > > > >> > >> not > > > >> > >> > >>>>> actually modified through stream processor topology > > however > > > >> it is > > > >> > >> to be > > > >> > >> > >>>>> modified from outside the stream topology. So, > > essentially, > > > >> the > > > >> > >> state > > > >> > >> > >>>> store > > > >> > >> > >>>>> is just to be created from streambuilder and then to be > > used > > > >> as an > > > >> > >> > >>>>> application level cache that will get replicated between > > > >> > >> application > > > >> > >> > >>>>> instances. Is this possible using state stores? > > > >> > >> > >>>>> > > > >> > >> > >>>>> Secondly, if possible, is this a good design approach? > > > >> > >> > >>>>> > > > >> > >> > >>>>> Appreciate your response since I don't know the > > internals of > > > >> state > > > >> > >> > >>>> stores. > > > >> > >> > >>>>> > > > >> > >> > >>>> > > > >> > >> > >>>> > > > >> > >> > >>> > > > >> > >> > >> > > > >> > >> > >> > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > > >> > >> > Attachments: > > > >> > >> > * signature.asc > > > >> > >> > > > >> > > > > > >> > > > > >> > > > > > > > > > >