Happy to hear you found a working solution, Steven! -Michael
On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker < sschlans...@opentable.com> wrote: > > > > On Jun 2, 2017, at 3:32 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > > Thanks. That helps to understand the use case better. > > > > Rephrase to make sure I understood it correctly: > > > > 1) you are providing a custom partitioner to Streams that is base on one > > field in your value (that's fine with regard to fault-tolerance :)) > > 2) you want to use interactive queries to query the store > > 3) because of your custom partitioning schema, you need to manually > > figure out the right application instance that hosts a key > > 4) thus, you use a GlobalKTable to maintain the information from K to D > > and thus to the partition ie, streams instance that hosts K > > > > If this is correct, than you cannot use the "by key" metadata interface. > > It's designed to find the streams instance base in the key only -- but > > your partitioner is based on the value. Internally, we call > > > >> final Integer partition = partitioner.partition(key, null, > sourceTopicsInfo.maxPartitions); > > > > Note, that `value==null` -- at this point, we don't have any value > > available and can't provide it to the partitioner. > > > > Thus, your approach to get all metadata is the only way you can go. > > Thanks for confirming this. The code is a little ugly but I've done worse > :) > > > > > > > Very interesting (and quite special) use case. :) > > > > > > -Matthias > > > > On 6/2/17 2:32 PM, Steven Schlansker wrote: > >> > >>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >>> > >>> I am not sure if I understand the use case correctly. Could you give > >>> some more context? > >> > >> Happily, thanks for thinking about this! > >> > >>> > >>>> backing store whose partitioning is value dependent > >>> > >>> In infer that you are using a custom store and not default RocksDB? If > >>> yes, what do you use? What does "value dependent" mean in this context? > >> > >> We're currently using the base in memory store. We tried to use RocksDB > >> but the tuning to get it running appropriately in a Linux container > without > >> tripping the cgroups OOM killer is nontrivial. > >> > >> > >>> Right now, I am wondering, why you not just set a new key to get your > >>> data grouped by the field you are interesting in? Also, if you don't > >>> partitioned your data by key, you might break your streams application > >>> with regard to fault-tolerance -- or does your custom store not rely on > >>> changelog backup for fault-tolerance? > >>> > >> > >> That's an interesting point about making transformed key. But I don't > think > >> it simplifies my problem too much. Essentially, I have a list of > messages > >> that should get delivered to destinations. Each message has a primary > key K > >> and a destination D. > >> > >> We partition over D so that all messages to the same destination are > handled by > >> the same worker, to preserve ordering and implement local rate limits > etc. > >> > >> I want to preserve the illusion to the client that they can look up a > key with > >> only K. So, as an intermediate step, we use the GlobalKTable to look > up D. Once > >> we have K,D we can then compute the partition and execute a lookup. > >> > >> Transforming the key to be a composite K,D isn't helpful because the > end user still > >> only knows K -- D's relevance is an implementation detail I wish to > hide -- so you still > >> need some sort of secondary lookup. > >> > >> We do use the changelog backup for fault tolerance -- how would having > the partition > >> based on the value break this? Is the changelog implicitly partitioned > by a partitioner > >> other than the one we give to the topology? > >> > >> Hopefully that explains my situation a bit more? Thanks! > >> > >>> > >>> -Matthias > >>> > >>> > >>> > >>> On 6/2/17 10:34 AM, Steven Schlansker wrote: > >>>> I have a KTable and backing store whose partitioning is value > dependent. > >>>> I want certain groups of messages to be ordered and that grouping is > determined > >>>> by one field (D) of the (possibly large) value. > >>>> > >>>> When I lookup by only K, obviously you don't know the partition it > should be on. > >>>> So I will build a GlobalKTable of K -> D. This gives me enough > information > >>>> to determine the partition. > >>>> > >>>> Unfortunately, the KafkaStreams metadata API doesn't fit this use > case well. > >>>> It allows you to either get all metadata, or by key -- but if you > lookup by key > >>>> it just substitutes a null value (causing a downstream NPE) > >>>> > >>>> I can iterate over all metadata and compute the mapping of K -> K,D > -> P > >>>> and then iterate over all metadata looking for P. It's not difficult > but ends > >>>> up being a bit of somewhat ugly code that feels like I shouldn't have > to write it. > >>>> > >>>> Am I missing something here? Is there a better way that I've > missed? Thanks! > >>>> > >>> > >> > > > >