Hi Pushkar,

I’m glad you’ve been able to work through the issues.

The GlobalKTable does store the data in memory (or on disk, depending how you 
configure it). I think the in-memory version uses a TreeMap, which is 
logarithmic time access. I think you’ll find it sufficiently fast regardless. 

Thanks,
John

On Mon, May 11, 2020, at 06:51, Pushkar Deole wrote:
> John,
> 
> I think I can get the cache structure modified to make use of GlobalKTable
> here so the data can be shared across. I could get information that the
> admin data will be uploaded well in advance before main events so the issue
> with 'missed joins' won't exists since by the time main events start
> flowing, the admin data has been synchronized to all the service instances.
> 
> By the way, I would like to ask you how GlobalKTable works internally, does
> it store all data in memory or it gets it from the backed topic everytime?
> Secondly, what kind of internal data structure does it use? Is it good for
> constant time performance?
> 
> On Thu, May 7, 2020 at 7:27 PM John Roesler <vvcep...@apache.org> wrote:
> 
> > Hi Pushkar,
> >
> > To answer your question about tuning the global store latency, I think the
> > biggest impact thing you can do is to configure the consumer that loads the
> > data for global stores. You can pass configs specifically to the global
> > consumer with the prefix: “ global.consumer.”
> >
> > Regarding the larger situation, it seems like the global table and a
> > distributed cache would display the same basic behavior in terms of the
> > potential for missed joins. Then, it probably makes sense to go for the
> > option with fewer components to implement and maintain, which to me points
> > to the global KTable.
> >
> > Since you can anticipate that missed joins can be a problem, you can build
> > in some metrics and reporting for how many misses you actually observe, and
> > potentially redesign the app if it’s actually a problem.
> >
> > I hope this helps!
> > -John
> >
> > On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> > > Thanks John... appreciate your inputs and suggestions. I have been
> > assigned
> > > recently to this task (of persisting the cache) and haven't been involved
> > > in original design and architecture and agree with all the issues you
> > have
> > > highlighted.
> > > However, at this point, i don't think the application can be converted to
> > > streams since the design is not flexible and it would require lot of
> > > rewrite of code plus subsequent testing.
> > >
> > > My first thought was to use external database only,  preferably the
> > > distributed caching systems like Apache Ignite since it will have least
> > > impact on performance. Going to database for every event would impact the
> > > throughput a lot. Probably having distributed caching (key/value pairs)
> > > would have comparatively lesser impact.
> > > Second choice is to go for GlobalKTable however this needs to be done
> > very
> > > carefully.
> > >
> > > Thanks again!
> > >
> > > On Mon, May 4, 2020 at 11:18 PM Pushkar Deole <pdeole2...@gmail.com>
> > wrote:
> > >
> > > > Thanks John... what parameters would affect the latency in case
> > > > GlobalKTable will be used and is there any configurations that could be
> > > > tuned to minimize the latency of sync with input topic?
> > > >
> > > > On Mon, May 4, 2020 at 10:20 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > > >
> > > >> Hello Pushkar,
> > > >>
> > > >> Yes, that’s correct. The operation you describe is currently not
> > > >> supported. If you want to keep the structure you described in place,
> > I’d
> > > >> suggest using an external database for the admin objects. I’ll give
> > another
> > > >> idea below.
> > > >>
> > > >> With your current architecture, I’m a little concerned about data
> > races.
> > > >> From what I saw, nothing would prevent processing stream records with
> > agent
> > > >> 10 before you process the admin record with agent 10. This problem
> > will
> > > >> persist no matter where you locate the cache.
> > > >>
> > > >> GlobalKTable would no doubt make it worse, since it increases the
> > latency
> > > >> before admin record 10 is queriable everywhere.
> > > >>
> > > >> I think you’ll want to make a call between architecture simplicity
> > > >> (remote cache or global KTable) vs the probability of missed joins.
> > > >>
> > > >> I think the “best” way to solve this problem (that comes to mind
> > anyway)
> > > >> might be to
> > > >> 1. Repartition the stream to be co-partitioned with the admin records.
> > > >> 2. Do a local (not global) stream-table join
> > > >> 3. Enable task idling
> > > >>
> > > >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make
> > the
> > > >> agent Id the new key of the stream, and then use ‘through’, (where the
> > > >> intermediate topic has the same number of partitions as the admin
> > topic) to
> > > >> do the repartitioning. In 2.6, there is a “repartition” operator that
> > will
> > > >> make this easier.
> > > >>
> > > >> The repartition ensures that all stream records with agent id 10 will
> > be
> > > >> processed by the same thread that processes the admin records with
> > agent id
> > > >> 10, hence it will be able to find agent 10 in the local KTable store.
> > > >>
> > > >> Task idling will minimize your chances of missing any enrichments.
> > When a
> > > >> task has two inputs (E.g., your repartitioned stream joining with the
> > admin
> > > >> table), it makes Streams wait until both inputs are buffered before
> > > >> processing, so it can do a better job of processing in timestamp
> > order.
> > > >>
> > > >> I hope this helps!
> > > >> -John
> > > >>
> > > >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> > > >> > If i understand correctly, Kafka is not designed to provide
> > replicated
> > > >> > caching mechanism wherein the updates to cache will be synchronous
> > > >> across
> > > >> > multiple cache instances.
> > > >> >
> > > >> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pdeole2...@gmail.com
> > >
> > > >> wrote:
> > > >> >
> > > >> > > Thanks John.
> > > >> > >
> > > >> > > Actually, this is a normal consumer-producer application wherein
> > > >> there are
> > > >> > > 2 consumers (admin consumer and main consumer) consuming messages
> > > >> from 2
> > > >> > > different topics.
> > > >> > > One of the consumers consumes messages from a admin topic and
> > > >> populates
> > > >> > > data in a cache e.g. lets say agent with agent id 10 for which the
> > > >> first
> > > >> > > name and last name is received is populated in cache. When the
> > other
> > > >> > > consumer consumes message and it has agent id 10 then it reads the
> > > >> cache,
> > > >> > > appends the first name and last name and then sends enriched
> > event to
> > > >> > > producer.
> > > >> > > In this case, each application instance consumes all the events
> > from
> > > >> admin
> > > >> > > topic (unique consumer id) and keeps them in the cache in memory.
> > > >> > > Now the requirement is to persist the cache and make is shared
> > > >> between the
> > > >> > > application instances, so each instance would consume partitions
> > of
> > > >> admin
> > > >> > > topic and write to admin cache.
> > > >> > >
> > > >> > > If we want to use kafka streams, the application is so much
> > evolved
> > > >> that
> > > >> > > it is difficult to migrate to streams at this stage. Secondly,
> > from
> > > >> past
> > > >> > > mail chains, streams also won't serve the requirement since local
> > > >> state
> > > >> > > stores would just hold the local state of admin data and the cache
> > > >> written
> > > >> > > by each instance won't be shared with other instances.
> > > >> > >
> > > >> > > Global state stores may help but again it requires writing to the
> > > >> topic
> > > >> > > which is then synced with the state stores in the instances and
> > the
> > > >> > > instances may not be in sync with each.
> > > >> > > I am not sure if this would cause any inconsistencies since i
> > don't
> > > >> know
> > > >> > > how the events would flow from source e.g. if admin data is
> > consumed
> > > >> by one
> > > >> > > instance which then modified the topic but it is not yet synced to
> > > >> all the
> > > >> > > global state stores and the next event arrived on the main
> > consumer
> > > >> on a
> > > >> > > different instance and it tried to read from store cache then it
> > > >> doesn't
> > > >> > > get the data, so the event passed on without enriched data.
> > > >> > > That's pretty much about the use case.
> > > >> > >
> > > >> > >
> > > >> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vvcep...@apache.org>
> > > >> wrote:
> > > >> > >
> > > >> > >> Hi Pushkar,
> > > >> > >>
> > > >> > >> I’ve been wondering if we should add writable tables to the
> > Streams
> > > >> api.
> > > >> > >> Can you explain more about your use case and how it would
> > integrate
> > > >> with
> > > >> > >> your application?
> > > >> > >>
> > > >> > >> Incidentally, this would also help us provide more concrete
> > advice.
> > > >> > >>
> > > >> > >> Thanks!
> > > >> > >> John
> > > >> > >>
> > > >> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> > > >> > >> > Both stores sever a different purpose.
> > > >> > >> >
> > > >> > >> > Regular stores allow you to store state the application
> > computes.
> > > >> > >> > Writing into the changelog is a fault-tolerance mechanism.
> > > >> > >> >
> > > >> > >> > Global store hold "axially" data that is provided from
> > "outside"
> > > >> of the
> > > >> > >> > app. There is no changelog topic, but only the input topic
> > (that
> > > >> is used
> > > >> > >> > to re-create the global state).
> > > >> > >> >
> > > >> > >> > Local stores are sharded and updates are "sync" as they don't
> > need
> > > >> to be
> > > >> > >> > shared with anybody else.
> > > >> > >> >
> > > >> > >> > For global stores, as all instances need to be updated,
> > updates are
> > > >> > >> > async (we don't know when which instance will update it's own
> > > >> global
> > > >> > >> > store replica).
> > > >> > >> >
> > > >> > >> > >> Say one stream thread updates the topic for global store and
> > > >> starts
> > > >> > >> > >> processing next event wherein the processor tries to read
> > the
> > > >> global
> > > >> > >> store
> > > >> > >> > >> which may not have been synced with the topic?
> > > >> > >> >
> > > >> > >> > Correct. There is no guarantee when the update to the global
> > store
> > > >> will
> > > >> > >> > be applied. As said, global stores are not designed to hold
> > data
> > > >> the
> > > >> > >> > application computes.
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > -Matthias
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > > >> > >> > > thanks... will try with GlobalKTable.
> > > >> > >> > > As a side question, I didn't really understand the
> > significance
> > > >> of
> > > >> > >> global
> > > >> > >> > > state store which kind of works in a reverse way to local
> > state
> > > >> store
> > > >> > >> i.e.
> > > >> > >> > > local state store is updated and then saved to changelog
> > topic
> > > >> > >> whereas in
> > > >> > >> > > case of global state store the topic is updated first and
> > then
> > > >> synced
> > > >> > >> to
> > > >> > >> > > global state store. Do these two work in sync i.e. the
> > update to
> > > >> > >> topic and
> > > >> > >> > > global state store ?
> > > >> > >> > >
> > > >> > >> > > Say one stream thread updates the topic for global store and
> > > >> starts
> > > >> > >> > > processing next event wherein the processor tries to read the
> > > >> global
> > > >> > >> store
> > > >> > >> > > which may not have been synced with the topic?
> > > >> > >> > >
> > > >> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <
> > mj...@apache.org
> > > >> >
> > > >> > >> wrote:
> > > >> > >> > >
> > > >> > >> > >> Yes.
> > > >> > >> > >>
> > > >> > >> > >> A `GlobalKTable` uses a global store internally.
> > > >> > >> > >>
> > > >> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> > > >> > >> > >> `Topology.addGlobalStore()` to add a global store
> > "manually".
> > > >> > >> > >>
> > > >> > >> > >>
> > > >> > >> > >> -Matthias
> > > >> > >> > >>
> > > >> > >> > >>
> > > >> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> > > >> > >> > >>> Thanks Matthias.
> > > >> > >> > >>> Can you elaborate on the replicated caching layer part?
> > > >> > >> > >>> When you say global stores, do you mean GlobalKTable
> > created
> > > >> from a
> > > >> > >> topic
> > > >> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method
> > ?
> > > >> > >> > >>>
> > > >> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> > > >> mj...@apache.org>
> > > >> > >> > >> wrote:
> > > >> > >> > >>>
> > > >> > >> > >>>> It's not possible to modify state store from "outside".
> > > >> > >> > >>>>
> > > >> > >> > >>>> If you want to build a "replicated caching layer", you
> > could
> > > >> use
> > > >> > >> global
> > > >> > >> > >>>> stores and write into the corresponding topics to update
> > all
> > > >> > >> stores. Of
> > > >> > >> > >>>> course, those updates would be async.
> > > >> > >> > >>>>
> > > >> > >> > >>>>
> > > >> > >> > >>>> -Matthias
> > > >> > >> > >>>>
> > > >> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> > > >> > >> > >>>>> Hi All,
> > > >> > >> > >>>>>
> > > >> > >> > >>>>> I am wondering if this is possible: i have been asked to
> > use
> > > >> state
> > > >> > >> > >> stores
> > > >> > >> > >>>>> as a general replicated cache among multiple instances of
> > > >> service
> > > >> > >> > >>>> instances
> > > >> > >> > >>>>> however the state store is created through streambuilder
> > but
> > > >> is
> > > >> > >> not
> > > >> > >> > >>>>> actually modified through stream processor topology
> > however
> > > >> it is
> > > >> > >> to be
> > > >> > >> > >>>>> modified from outside the stream topology. So,
> > essentially,
> > > >> the
> > > >> > >> state
> > > >> > >> > >>>> store
> > > >> > >> > >>>>> is just to be created from streambuilder and then to be
> > used
> > > >> as an
> > > >> > >> > >>>>> application level cache that will get replicated between
> > > >> > >> application
> > > >> > >> > >>>>> instances. Is this possible using state stores?
> > > >> > >> > >>>>>
> > > >> > >> > >>>>> Secondly, if possible, is this a good design approach?
> > > >> > >> > >>>>>
> > > >> > >> > >>>>> Appreciate your response since I don't know the
> > internals of
> > > >> state
> > > >> > >> > >>>> stores.
> > > >> > >> > >>>>>
> > > >> > >> > >>>>
> > > >> > >> > >>>>
> > > >> > >> > >>>
> > > >> > >> > >>
> > > >> > >> > >>
> > > >> > >> > >
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > Attachments:
> > > >> > >> > * signature.asc
> > > >> > >>
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to