Thanks Steven, interesting use case.

The current streams state store metadata discovery is assuming the
`DefaultStreamPartitioner` is used, which is a limitation for such cases.

Another workaround that I can think of is, that you can first partition on
D in the first stage to let the workers to the "real" work, then you can
pipe it to a second stage where you re-partition on K, and the second
processor is only for materializing the store for querying. I'm not sure if
it would be better since it may require doubling the store spaces (one on
the first processor and one on the second), and since you can hold the
whole K -> D map in a global state it seems this map is small enough so
maybe not worth the repartitioning.


Guozhang






On Tue, Jun 6, 2017 at 8:36 AM, Michael Noll <mich...@confluent.io> wrote:

> Happy to hear you found a working solution, Steven!
>
> -Michael
>
>
>
> On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker <
> sschlans...@opentable.com> wrote:
>
> > >
> > > On Jun 2, 2017, at 3:32 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> > >
> > > Thanks. That helps to understand the use case better.
> > >
> > > Rephrase to make sure I understood it correctly:
> > >
> > > 1) you are providing a custom partitioner to Streams that is base on
> one
> > > field in your value (that's fine with regard to fault-tolerance :))
> > > 2) you want to use interactive queries to query the store
> > > 3) because of your custom partitioning schema, you need to manually
> > > figure out the right application instance that hosts a key
> > > 4) thus, you use a GlobalKTable to maintain the information from K to D
> > > and thus to the partition ie, streams instance that hosts K
> > >
> > > If this is correct, than you cannot use the "by key" metadata
> interface.
> > > It's designed to find the streams instance base in the key only -- but
> > > your partitioner is based on the value. Internally, we call
> > >
> > >> final Integer partition = partitioner.partition(key, null,
> > sourceTopicsInfo.maxPartitions);
> > >
> > > Note, that `value==null` -- at this point, we don't have any value
> > > available and can't provide it to the partitioner.
> > >
> > > Thus, your approach to get all metadata is the only way you can go.
> >
> > Thanks for confirming this.  The code is a little ugly but I've done
> worse
> > :)
> >
> > >
> > >
> > > Very interesting (and quite special) use case. :)
> > >
> > >
> > > -Matthias
> > >
> > > On 6/2/17 2:32 PM, Steven Schlansker wrote:
> > >>
> > >>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> > >>>
> > >>> I am not sure if I understand the use case correctly. Could you give
> > >>> some more context?
> > >>
> > >> Happily, thanks for thinking about this!
> > >>
> > >>>
> > >>>> backing store whose partitioning is value dependent
> > >>>
> > >>> In infer that you are using a custom store and not default RocksDB?
> If
> > >>> yes, what do you use? What does "value dependent" mean in this
> context?
> > >>
> > >> We're currently using the base in memory store.  We tried to use
> RocksDB
> > >> but the tuning to get it running appropriately in a Linux container
> > without
> > >> tripping the cgroups OOM killer is nontrivial.
> > >>
> > >>
> > >>> Right now, I am wondering, why you not just set a new key to get your
> > >>> data grouped by the field you are interesting in? Also, if you don't
> > >>> partitioned your data by key, you might break your streams
> application
> > >>> with regard to fault-tolerance -- or does your custom store not rely
> on
> > >>> changelog backup for fault-tolerance?
> > >>>
> > >>
> > >> That's an interesting point about making transformed key.  But I don't
> > think
> > >> it simplifies my problem too much.  Essentially, I have a list of
> > messages
> > >> that should get delivered to destinations.  Each message has a primary
> > key K
> > >> and a destination D.
> > >>
> > >> We partition over D so that all messages to the same destination are
> > handled by
> > >> the same worker, to preserve ordering and implement local rate limits
> > etc.
> > >>
> > >> I want to preserve the illusion to the client that they can look up a
> > key with
> > >> only K.  So, as an intermediate step, we use the GlobalKTable to look
> > up D.  Once
> > >> we have K,D we can then compute the partition and execute a lookup.
> > >>
> > >> Transforming the key to be a composite K,D isn't helpful because the
> > end user still
> > >> only knows K -- D's relevance is an implementation detail I wish to
> > hide -- so you still
> > >> need some sort of secondary lookup.
> > >>
> > >> We do use the changelog backup for fault tolerance -- how would having
> > the partition
> > >> based on the value break this?  Is the changelog implicitly
> partitioned
> > by a partitioner
> > >> other than the one we give to the topology?
> > >>
> > >> Hopefully that explains my situation a bit more?  Thanks!
> > >>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>>
> > >>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
> > >>>> I have a KTable and backing store whose partitioning is value
> > dependent.
> > >>>> I want certain groups of messages to be ordered and that grouping is
> > determined
> > >>>> by one field (D) of the (possibly large) value.
> > >>>>
> > >>>> When I lookup by only K, obviously you don't know the partition it
> > should be on.
> > >>>> So I will build a GlobalKTable of K -> D.  This gives me enough
> > information
> > >>>> to determine the partition.
> > >>>>
> > >>>> Unfortunately, the KafkaStreams metadata API doesn't fit this use
> > case well.
> > >>>> It allows you to either get all metadata, or by key -- but if you
> > lookup by key
> > >>>> it just substitutes a null value (causing a downstream NPE)
> > >>>>
> > >>>> I can iterate over all metadata and compute the mapping of K -> K,D
> > -> P
> > >>>> and then iterate over all metadata looking for P.  It's not
> difficult
> > but ends
> > >>>> up being a bit of somewhat ugly code that feels like I shouldn't
> have
> > to write it.
> > >>>>
> > >>>> Am I missing something here?  Is there a better way that I've
> > missed?  Thanks!
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to