Thank you for the idea, I'll keep that in mind if I run into limitations of my current approach.
> On Jun 6, 2017, at 5:50 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Thanks Steven, interesting use case. > > The current streams state store metadata discovery is assuming the > `DefaultStreamPartitioner` is used, which is a limitation for such cases. > > Another workaround that I can think of is, that you can first partition on > D in the first stage to let the workers to the "real" work, then you can > pipe it to a second stage where you re-partition on K, and the second > processor is only for materializing the store for querying. I'm not sure if > it would be better since it may require doubling the store spaces (one on > the first processor and one on the second), and since you can hold the > whole K -> D map in a global state it seems this map is small enough so > maybe not worth the repartitioning. > > > Guozhang > > > > > > > On Tue, Jun 6, 2017 at 8:36 AM, Michael Noll <mich...@confluent.io> wrote: > >> Happy to hear you found a working solution, Steven! >> >> -Michael >> >> >> >> On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker < >> sschlans...@opentable.com> wrote: >> >>>> >>>> On Jun 2, 2017, at 3:32 PM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>> >>>> Thanks. That helps to understand the use case better. >>>> >>>> Rephrase to make sure I understood it correctly: >>>> >>>> 1) you are providing a custom partitioner to Streams that is base on >> one >>>> field in your value (that's fine with regard to fault-tolerance :)) >>>> 2) you want to use interactive queries to query the store >>>> 3) because of your custom partitioning schema, you need to manually >>>> figure out the right application instance that hosts a key >>>> 4) thus, you use a GlobalKTable to maintain the information from K to D >>>> and thus to the partition ie, streams instance that hosts K >>>> >>>> If this is correct, than you cannot use the "by key" metadata >> interface. >>>> It's designed to find the streams instance base in the key only -- but >>>> your partitioner is based on the value. Internally, we call >>>> >>>>> final Integer partition = partitioner.partition(key, null, >>> sourceTopicsInfo.maxPartitions); >>>> >>>> Note, that `value==null` -- at this point, we don't have any value >>>> available and can't provide it to the partitioner. >>>> >>>> Thus, your approach to get all metadata is the only way you can go. >>> >>> Thanks for confirming this. The code is a little ugly but I've done >> worse >>> :) >>> >>>> >>>> >>>> Very interesting (and quite special) use case. :) >>>> >>>> >>>> -Matthias >>>> >>>> On 6/2/17 2:32 PM, Steven Schlansker wrote: >>>>> >>>>>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>>>> >>>>>> I am not sure if I understand the use case correctly. Could you give >>>>>> some more context? >>>>> >>>>> Happily, thanks for thinking about this! >>>>> >>>>>> >>>>>>> backing store whose partitioning is value dependent >>>>>> >>>>>> In infer that you are using a custom store and not default RocksDB? >> If >>>>>> yes, what do you use? What does "value dependent" mean in this >> context? >>>>> >>>>> We're currently using the base in memory store. We tried to use >> RocksDB >>>>> but the tuning to get it running appropriately in a Linux container >>> without >>>>> tripping the cgroups OOM killer is nontrivial. >>>>> >>>>> >>>>>> Right now, I am wondering, why you not just set a new key to get your >>>>>> data grouped by the field you are interesting in? Also, if you don't >>>>>> partitioned your data by key, you might break your streams >> application >>>>>> with regard to fault-tolerance -- or does your custom store not rely >> on >>>>>> changelog backup for fault-tolerance? >>>>>> >>>>> >>>>> That's an interesting point about making transformed key. But I don't >>> think >>>>> it simplifies my problem too much. Essentially, I have a list of >>> messages >>>>> that should get delivered to destinations. Each message has a primary >>> key K >>>>> and a destination D. >>>>> >>>>> We partition over D so that all messages to the same destination are >>> handled by >>>>> the same worker, to preserve ordering and implement local rate limits >>> etc. >>>>> >>>>> I want to preserve the illusion to the client that they can look up a >>> key with >>>>> only K. So, as an intermediate step, we use the GlobalKTable to look >>> up D. Once >>>>> we have K,D we can then compute the partition and execute a lookup. >>>>> >>>>> Transforming the key to be a composite K,D isn't helpful because the >>> end user still >>>>> only knows K -- D's relevance is an implementation detail I wish to >>> hide -- so you still >>>>> need some sort of secondary lookup. >>>>> >>>>> We do use the changelog backup for fault tolerance -- how would having >>> the partition >>>>> based on the value break this? Is the changelog implicitly >> partitioned >>> by a partitioner >>>>> other than the one we give to the topology? >>>>> >>>>> Hopefully that explains my situation a bit more? Thanks! >>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> On 6/2/17 10:34 AM, Steven Schlansker wrote: >>>>>>> I have a KTable and backing store whose partitioning is value >>> dependent. >>>>>>> I want certain groups of messages to be ordered and that grouping is >>> determined >>>>>>> by one field (D) of the (possibly large) value. >>>>>>> >>>>>>> When I lookup by only K, obviously you don't know the partition it >>> should be on. >>>>>>> So I will build a GlobalKTable of K -> D. This gives me enough >>> information >>>>>>> to determine the partition. >>>>>>> >>>>>>> Unfortunately, the KafkaStreams metadata API doesn't fit this use >>> case well. >>>>>>> It allows you to either get all metadata, or by key -- but if you >>> lookup by key >>>>>>> it just substitutes a null value (causing a downstream NPE) >>>>>>> >>>>>>> I can iterate over all metadata and compute the mapping of K -> K,D >>> -> P >>>>>>> and then iterate over all metadata looking for P. It's not >> difficult >>> but ends >>>>>>> up being a bit of somewhat ugly code that feels like I shouldn't >> have >>> to write it. >>>>>>> >>>>>>> Am I missing something here? Is there a better way that I've >>> missed? Thanks! >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> > > > > -- > -- Guozhang
signature.asc
Description: Message signed with OpenPGP using GPGMail