Yes you are probably right. So I was inspired be the KIP 150 blog post, so the entire statement would be like this:
KTable<Integer,CustomerList> customerGrouped= kStreamBuilder.stream(stringSerde, customerMessageSerde, CUSTOMER_TOPIC) .groupBy((key,value) -> Integer.parseInt(value.customer.replaceFirst("cust","")),integerSerde,customerMessageSerde) .aggregate(CustomerList::new,(ckey, custMessage, customerList) -> { customerList.lst.add(custMessage); return customerList; },customerListSerde,CUSTOMER_STORE); The second oddity is beacause I want to gather history, all the previous records. Exactly the same logic as the shopping cart , purchases and wishList described in KIP 150. The result of the joins will contain all the history for particular key. You mention repartioning. I also have a feeling thea GlobaKTables are more suitable for look ups and not what I am trying to do here. How do I repartition the topics? Is it by using keyed messages or are there other alternatives? I've noticed that when I run two instances of the application than each have only half of the records posted to the 4 partitions topic. I'd be happy to join the slack, do I need an invite? /Artur n On Thu, Nov 30, 2017 at 9:30 PM, Jan Filipiak <jan.filip...@trivago.com> wrote: > There are some oddities in your topology that make make we wonder if they > are the true drivers of your question. > > https://github.com/afuyo/KStreamsDemo/blob/master/src/main/ > java/kstream.demo/CustomerStreamPipelineHDI.java#L300 > Feels like it should be a KTable to begin with for example otherwise it is > not clear how big this is supposed to grow > https://github.com/afuyo/KStreamsDemo/blob/master/src/main/ > java/kstream.demo/CustomerStreamPipelineHDI.java#L325 > Same thing for policies. KGlobalTable might be chipped in later if you fat > up from too many repartitions as some sort of > performance optimisation, but my opinions on it are not to high. > > > Hope that helps, just keep the questions coming, also check if you might > want to join confluentcommunity on slack. > Could never imaging that something like a insurance can really be modelled > as 4 streams ;) > > Best Jan > > > > > > > On 30.11.2017 21:07, Artur Mrozowski wrote: > >> what if I start two instances of that application? Does the state migrate >> between the applications? Is it then I have to use a global table? >> >> BR >> Artur >> >> On Thu, Nov 30, 2017 at 7:40 PM, Jan Filipiak <jan.filip...@trivago.com> >> wrote: >> >> Hi, >>> >>> Haven't checked your code. But from what you describe you should be fine. >>> Upgrading the version might help here and there but should still work >>> with >>> 0.10 >>> I guess. >>> >>> Best Jan >>> >>> >>> >>> On 30.11.2017 19:16, Artur Mrozowski wrote: >>> >>> Thank you Damian, it was very helpful. >>>> I have implemented my solution in version 0.11.0.2 but there is one >>>> thing >>>> I >>>> still wonder. >>>> So what I try to do is what is described in KIP 150. Since it didn't >>>> make >>>> to the release for 1.0 I do it the old fashioned way. >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+ >>>> Kafka-Streams+Cogroup >>>> First >>>> KTable<K, V1> table1 = >>>> builder.stream("topic1").groupByKey().aggregate(initializer1, >>>> aggregator1, aggValueSerde1, storeName1); >>>> >>>> >>>> for all the four topics and then I join the results. >>>> And here is the thing, the topics are partitioned and I don't used >>>> global >>>> tables, nor keyed messages and it seems to work fine. >>>> >>>> From Confluents documentation one could get impression that when >>>> reading >>>> from partitoned topics you need to use global tables. But is it really >>>> necessary in this case? >>>> And if not then why? >>>> >>>> Thanks again >>>> Artur >>>> >>>> Here is the link to my implementation >>>> >>>> https://github.com/afuyo/KStreamsDemo/blob/master/src/main/ >>>> java/kstream.demo/CustomerStreamPipelineHDI.java >>>> >>>> On Wed, Nov 22, 2017 at 12:10 PM, Damian Guy <damian....@gmail.com> >>>> wrote: >>>> >>>> Hi Artur, >>>> >>>>> KafkaStreams 0.10.0.0 is quite old and a lot has changed and been fixed >>>>> since then. If possible i'd recommend upgrading to at least 0.11.0.2 or >>>>> 1.0. >>>>> For joins you need to ensure that the topics have the same number of >>>>> partitions (which they do) and that they are keyed the same. >>>>> >>>>> Thanks, >>>>> Damian >>>>> >>>>> On Wed, 22 Nov 2017 at 11:02 Artur Mrozowski <art...@gmail.com> wrote: >>>>> >>>>> Hi, >>>>> >>>>>> I am joining 4 different topic with 4 partitions each using 0.10.0.0 >>>>>> version of Kafka Streams. The joins are KTable to KTable. Is there >>>>>> anything I should be aware of considering partitions or version of >>>>>> Kafka >>>>>> Streams? In other words should I be expecting consistent results or >>>>>> do I >>>>>> need to for example use Global tables. >>>>>> >>>>>> I'd like to run that application on Kubernetes later on. Should I >>>>>> think >>>>>> >>>>>> of >>>>> >>>>> anything or do different instances of the same Kafka Streams >>>>>> application >>>>>> take care of management of the state? >>>>>> >>>>>> Grateful for any thoughts or a piece of advice >>>>>> >>>>>> Best Regards >>>>>> /Artur >>>>>> >>>>>> >>>>>> >