Hello John, Matthias

Sorry for bothering you, however this is now getting crazier. Initially I
was under the impression that the cache being hold by application is in the
form of key/value where key is the instance of agentId (e.g. 10) and value
will hold other attributes (and their respective values) for that agent
e.g. firstName='Dave', Lastname='Richards' etc.
Now, what I understand is the cache is holding data structures with the key
as fieldName like agentFirstName and it is mapped to value that contains an
arraylist of all firstNames. Then there is another key agentLastName and
that stores all the last names in another arraylist. And in order to tie
the related fields together there is another structures that is holding
agentIds again in form lf arraylist and the index of this arraylist is used
to tie all the field together. e.g. in the arraylist there is agentId 10 at
say index 2 then the application goes into all above mentioned cached
arraylists to fetch all the fields at index 2 in respective arraylist  and
returns the data accordingly.

If the key would have been agentID and value would have been corresponding
fields then it would have been easier to write entire structure into a
topic. Now that the key is field name like mentioned above so storing the
structure in topic is impossible because when multiple instances would
operate on that topic with same key then the updates are going to collide.
Since GlobalKTable in each instance would not be in sync, this could result
in lost updates and other sorts of issues.

Currently this design works because each instance is reading all the
records from input topic like each instance reads all events for all agents
one by one and populates the cache in each instance in memory, so there are
no synchronization issues. But the moment we want to persist this cache
structure to topic I am sure it will fail.

There is an option of complete rewrite of cache to have instance of agentId
as key and values as corresponding fieldName and their respective values
but that would involve a significant effort and testing.
I don't know if you see any workaround to use these structures as they are
and persist them to topic without any sync issues when it gets scaled to
multiple application instances.

Thanks in advance for all the help.


On Thu, May 7, 2020 at 7:27 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Pushkar,
>
> To answer your question about tuning the global store latency, I think the
> biggest impact thing you can do is to configure the consumer that loads the
> data for global stores. You can pass configs specifically to the global
> consumer with the prefix: “ global.consumer.”
>
> Regarding the larger situation, it seems like the global table and a
> distributed cache would display the same basic behavior in terms of the
> potential for missed joins. Then, it probably makes sense to go for the
> option with fewer components to implement and maintain, which to me points
> to the global KTable.
>
> Since you can anticipate that missed joins can be a problem, you can build
> in some metrics and reporting for how many misses you actually observe, and
> potentially redesign the app if it’s actually a problem.
>
> I hope this helps!
> -John
>
> On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> > Thanks John... appreciate your inputs and suggestions. I have been
> assigned
> > recently to this task (of persisting the cache) and haven't been involved
> > in original design and architecture and agree with all the issues you
> have
> > highlighted.
> > However, at this point, i don't think the application can be converted to
> > streams since the design is not flexible and it would require lot of
> > rewrite of code plus subsequent testing.
> >
> > My first thought was to use external database only,  preferably the
> > distributed caching systems like Apache Ignite since it will have least
> > impact on performance. Going to database for every event would impact the
> > throughput a lot. Probably having distributed caching (key/value pairs)
> > would have comparatively lesser impact.
> > Second choice is to go for GlobalKTable however this needs to be done
> very
> > carefully.
> >
> > Thanks again!
> >
> > On Mon, May 4, 2020 at 11:18 PM Pushkar Deole <pdeole2...@gmail.com>
> wrote:
> >
> > > Thanks John... what parameters would affect the latency in case
> > > GlobalKTable will be used and is there any configurations that could be
> > > tuned to minimize the latency of sync with input topic?
> > >
> > > On Mon, May 4, 2020 at 10:20 PM John Roesler <vvcep...@apache.org>
> wrote:
> > >
> > >> Hello Pushkar,
> > >>
> > >> Yes, that’s correct. The operation you describe is currently not
> > >> supported. If you want to keep the structure you described in place,
> I’d
> > >> suggest using an external database for the admin objects. I’ll give
> another
> > >> idea below.
> > >>
> > >> With your current architecture, I’m a little concerned about data
> races.
> > >> From what I saw, nothing would prevent processing stream records with
> agent
> > >> 10 before you process the admin record with agent 10. This problem
> will
> > >> persist no matter where you locate the cache.
> > >>
> > >> GlobalKTable would no doubt make it worse, since it increases the
> latency
> > >> before admin record 10 is queriable everywhere.
> > >>
> > >> I think you’ll want to make a call between architecture simplicity
> > >> (remote cache or global KTable) vs the probability of missed joins.
> > >>
> > >> I think the “best” way to solve this problem (that comes to mind
> anyway)
> > >> might be to
> > >> 1. Repartition the stream to be co-partitioned with the admin records.
> > >> 2. Do a local (not global) stream-table join
> > >> 3. Enable task idling
> > >>
> > >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make
> the
> > >> agent Id the new key of the stream, and then use ‘through’, (where the
> > >> intermediate topic has the same number of partitions as the admin
> topic) to
> > >> do the repartitioning. In 2.6, there is a “repartition” operator that
> will
> > >> make this easier.
> > >>
> > >> The repartition ensures that all stream records with agent id 10 will
> be
> > >> processed by the same thread that processes the admin records with
> agent id
> > >> 10, hence it will be able to find agent 10 in the local KTable store.
> > >>
> > >> Task idling will minimize your chances of missing any enrichments.
> When a
> > >> task has two inputs (E.g., your repartitioned stream joining with the
> admin
> > >> table), it makes Streams wait until both inputs are buffered before
> > >> processing, so it can do a better job of processing in timestamp
> order.
> > >>
> > >> I hope this helps!
> > >> -John
> > >>
> > >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> > >> > If i understand correctly, Kafka is not designed to provide
> replicated
> > >> > caching mechanism wherein the updates to cache will be synchronous
> > >> across
> > >> > multiple cache instances.
> > >> >
> > >> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole <pdeole2...@gmail.com
> >
> > >> wrote:
> > >> >
> > >> > > Thanks John.
> > >> > >
> > >> > > Actually, this is a normal consumer-producer application wherein
> > >> there are
> > >> > > 2 consumers (admin consumer and main consumer) consuming messages
> > >> from 2
> > >> > > different topics.
> > >> > > One of the consumers consumes messages from a admin topic and
> > >> populates
> > >> > > data in a cache e.g. lets say agent with agent id 10 for which the
> > >> first
> > >> > > name and last name is received is populated in cache. When the
> other
> > >> > > consumer consumes message and it has agent id 10 then it reads the
> > >> cache,
> > >> > > appends the first name and last name and then sends enriched
> event to
> > >> > > producer.
> > >> > > In this case, each application instance consumes all the events
> from
> > >> admin
> > >> > > topic (unique consumer id) and keeps them in the cache in memory.
> > >> > > Now the requirement is to persist the cache and make is shared
> > >> between the
> > >> > > application instances, so each instance would consume partitions
> of
> > >> admin
> > >> > > topic and write to admin cache.
> > >> > >
> > >> > > If we want to use kafka streams, the application is so much
> evolved
> > >> that
> > >> > > it is difficult to migrate to streams at this stage. Secondly,
> from
> > >> past
> > >> > > mail chains, streams also won't serve the requirement since local
> > >> state
> > >> > > stores would just hold the local state of admin data and the cache
> > >> written
> > >> > > by each instance won't be shared with other instances.
> > >> > >
> > >> > > Global state stores may help but again it requires writing to the
> > >> topic
> > >> > > which is then synced with the state stores in the instances and
> the
> > >> > > instances may not be in sync with each.
> > >> > > I am not sure if this would cause any inconsistencies since i
> don't
> > >> know
> > >> > > how the events would flow from source e.g. if admin data is
> consumed
> > >> by one
> > >> > > instance which then modified the topic but it is not yet synced to
> > >> all the
> > >> > > global state stores and the next event arrived on the main
> consumer
> > >> on a
> > >> > > different instance and it tried to read from store cache then it
> > >> doesn't
> > >> > > get the data, so the event passed on without enriched data.
> > >> > > That's pretty much about the use case.
> > >> > >
> > >> > >
> > >> > > On Sun, May 3, 2020 at 9:42 PM John Roesler <vvcep...@apache.org>
> > >> wrote:
> > >> > >
> > >> > >> Hi Pushkar,
> > >> > >>
> > >> > >> I’ve been wondering if we should add writable tables to the
> Streams
> > >> api.
> > >> > >> Can you explain more about your use case and how it would
> integrate
> > >> with
> > >> > >> your application?
> > >> > >>
> > >> > >> Incidentally, this would also help us provide more concrete
> advice.
> > >> > >>
> > >> > >> Thanks!
> > >> > >> John
> > >> > >>
> > >> > >> On Fri, May 1, 2020, at 15:28, Matthias J. Sax wrote:
> > >> > >> > Both stores sever a different purpose.
> > >> > >> >
> > >> > >> > Regular stores allow you to store state the application
> computes.
> > >> > >> > Writing into the changelog is a fault-tolerance mechanism.
> > >> > >> >
> > >> > >> > Global store hold "axially" data that is provided from
> "outside"
> > >> of the
> > >> > >> > app. There is no changelog topic, but only the input topic
> (that
> > >> is used
> > >> > >> > to re-create the global state).
> > >> > >> >
> > >> > >> > Local stores are sharded and updates are "sync" as they don't
> need
> > >> to be
> > >> > >> > shared with anybody else.
> > >> > >> >
> > >> > >> > For global stores, as all instances need to be updated,
> updates are
> > >> > >> > async (we don't know when which instance will update it's own
> > >> global
> > >> > >> > store replica).
> > >> > >> >
> > >> > >> > >> Say one stream thread updates the topic for global store and
> > >> starts
> > >> > >> > >> processing next event wherein the processor tries to read
> the
> > >> global
> > >> > >> store
> > >> > >> > >> which may not have been synced with the topic?
> > >> > >> >
> > >> > >> > Correct. There is no guarantee when the update to the global
> store
> > >> will
> > >> > >> > be applied. As said, global stores are not designed to hold
> data
> > >> the
> > >> > >> > application computes.
> > >> > >> >
> > >> > >> >
> > >> > >> > -Matthias
> > >> > >> >
> > >> > >> >
> > >> > >> > On 4/30/20 11:11 PM, Pushkar Deole wrote:
> > >> > >> > > thanks... will try with GlobalKTable.
> > >> > >> > > As a side question, I didn't really understand the
> significance
> > >> of
> > >> > >> global
> > >> > >> > > state store which kind of works in a reverse way to local
> state
> > >> store
> > >> > >> i.e.
> > >> > >> > > local state store is updated and then saved to changelog
> topic
> > >> > >> whereas in
> > >> > >> > > case of global state store the topic is updated first and
> then
> > >> synced
> > >> > >> to
> > >> > >> > > global state store. Do these two work in sync i.e. the
> update to
> > >> > >> topic and
> > >> > >> > > global state store ?
> > >> > >> > >
> > >> > >> > > Say one stream thread updates the topic for global store and
> > >> starts
> > >> > >> > > processing next event wherein the processor tries to read the
> > >> global
> > >> > >> store
> > >> > >> > > which may not have been synced with the topic?
> > >> > >> > >
> > >> > >> > > On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <
> mj...@apache.org
> > >> >
> > >> > >> wrote:
> > >> > >> > >
> > >> > >> > >> Yes.
> > >> > >> > >>
> > >> > >> > >> A `GlobalKTable` uses a global store internally.
> > >> > >> > >>
> > >> > >> > >> You can also use `StreamsBuilder.addGlobalStore()` or
> > >> > >> > >> `Topology.addGlobalStore()` to add a global store
> "manually".
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >> -Matthias
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> > >> > >> > >>> Thanks Matthias.
> > >> > >> > >>> Can you elaborate on the replicated caching layer part?
> > >> > >> > >>> When you say global stores, do you mean GlobalKTable
> created
> > >> from a
> > >> > >> topic
> > >> > >> > >>> e.g. using StreamsBuilder.globalTable(String topic) method
> ?
> > >> > >> > >>>
> > >> > >> > >>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> > >> mj...@apache.org>
> > >> > >> > >> wrote:
> > >> > >> > >>>
> > >> > >> > >>>> It's not possible to modify state store from "outside".
> > >> > >> > >>>>
> > >> > >> > >>>> If you want to build a "replicated caching layer", you
> could
> > >> use
> > >> > >> global
> > >> > >> > >>>> stores and write into the corresponding topics to update
> all
> > >> > >> stores. Of
> > >> > >> > >>>> course, those updates would be async.
> > >> > >> > >>>>
> > >> > >> > >>>>
> > >> > >> > >>>> -Matthias
> > >> > >> > >>>>
> > >> > >> > >>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> > >> > >> > >>>>> Hi All,
> > >> > >> > >>>>>
> > >> > >> > >>>>> I am wondering if this is possible: i have been asked to
> use
> > >> state
> > >> > >> > >> stores
> > >> > >> > >>>>> as a general replicated cache among multiple instances of
> > >> service
> > >> > >> > >>>> instances
> > >> > >> > >>>>> however the state store is created through streambuilder
> but
> > >> is
> > >> > >> not
> > >> > >> > >>>>> actually modified through stream processor topology
> however
> > >> it is
> > >> > >> to be
> > >> > >> > >>>>> modified from outside the stream topology. So,
> essentially,
> > >> the
> > >> > >> state
> > >> > >> > >>>> store
> > >> > >> > >>>>> is just to be created from streambuilder and then to be
> used
> > >> as an
> > >> > >> > >>>>> application level cache that will get replicated between
> > >> > >> application
> > >> > >> > >>>>> instances. Is this possible using state stores?
> > >> > >> > >>>>>
> > >> > >> > >>>>> Secondly, if possible, is this a good design approach?
> > >> > >> > >>>>>
> > >> > >> > >>>>> Appreciate your response since I don't know the
> internals of
> > >> state
> > >> > >> > >>>> stores.
> > >> > >> > >>>>>
> > >> > >> > >>>>
> > >> > >> > >>>>
> > >> > >> > >>>
> > >> > >> > >>
> > >> > >> > >>
> > >> > >> > >
> > >> > >> >
> > >> > >> >
> > >> > >> > Attachments:
> > >> > >> > * signature.asc
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to