Hi John, Thank you for your reply. Let me clarify.
I used the word aggregate, but we are using aggregate functions. Our case is a relationship whole-part between messageA and message1, 2, n. Like order and order items. So translating our case, messageA is the order and message1 and 2 are items. When I said we aggregate, I was trying to say we add item to the order. So we have an order in the KTable. When the first item arrives, Kafka Streams joins the item to order. Then we add the item to the order. Do some calculations. And them we have a separated Kafka producer that pushes the order back to the KTable. After the first item we expected this: Order (item1) Then the second item arrives and the Kafka Streams joins the item2 to Order in the streams, but the order is not updated yet. So we add the item2 to order and instead of having: Order(item1, item2) we have Order(item2) I hope I made more clear our scenario. Regards, Renato de Melo Em quarta-feira, 19 de fevereiro de 2020 18:12:15 BRT, John Roesler <vvcep...@apache.org> escreveu: Hi Renato, Can you describe a little more about the nature of the join+aggregation logic? It sounds a little like the KTable represents the result of aggregating messages from the KStream? If that's the case, the operation you probably wanted was like: > KStream.groupBy().aggregate() which produces a KTable view of the aggregation result, and also guarantees that when processing the second message, you'll see the result of having processed the first. Let me know if I've misunderstood. Thanks, -John On Wed, Feb 19, 2020, at 14:03, Renato Melo wrote: > Hi Kafka Community, > > Please take a look into my use case: > > Fist message1 > 1. We have a KStream joined to a KTable(Compact Topic). > 2. We received a message1 from the KStream, aggregates the message1 to > the joined messageA from KTable. > 3. We pushes back the messageA with aggregated message1 into KTable. > > Second message 2 > 4. Message2 arrives on KStream and joins to the expected update > MessageA from the KTable. For our surprise messageA was not yet updated. > 5. We aggregate message2 into messageA. > 6. We pushes messageA to the KTable(Compact topic) and the first > aggregated message is overwritten. > > Is there a way to speed up the update in the Ktable (Compact Topic)? > > Is something wrong with my use case? > > I do appreciate any help. Thank you in advanced. > > Renato de Melo > > >