Hi Renato,

Can you describe a little more about the nature of the join+aggregation
logic? It sounds a little like the KTable represents the result of aggregating
messages from the KStream?

If that's the case, the operation you probably wanted was like:

> KStream.groupBy().aggregate()

which produces a KTable view of the aggregation result, and also guarantees
that when processing the second message, you'll see the result of having 
processed the first.

Let me know if I've misunderstood.

Thanks,
-John

On Wed, Feb 19, 2020, at 14:03, Renato Melo wrote:
> Hi Kafka Community,
> 
> Please take a look into my use case:
> 
> Fist message1
> 1. We have a KStream joined to a KTable(Compact Topic).
> 2. We received a message1 from the KStream, aggregates the message1 to 
> the joined messageA from KTable. 
> 3. We pushes back the messageA with aggregated message1 into KTable.
> 
> Second message 2
> 4. Message2 arrives on KStream and joins to the expected update 
> MessageA from the KTable. For our surprise messageA was not yet updated.
> 5. We aggregate message2 into messageA.
> 6. We pushes messageA to the KTable(Compact topic) and the first 
> aggregated message is overwritten.
> 
> Is there a way to speed up the update in the Ktable (Compact Topic)?
> 
> Is something wrong with my use case?
> 
> I do appreciate any help. Thank you in advanced.
> 
> Renato de Melo
> 
> 
>

Reply via email to