Hi,

what I don't understand is what you mean by

>> But when I am reading the KTable, I have no guarantee to see all messages
>> with the same key (because of the commit.interval.ms configuration).

Can you elaborate? I actually think an aggregation should be the correct
operator to use. However, you would need to return a pair with old/new
value and do a consecutive mapValues that compare the diff over both --
ie, it's a two step approach instead of a single aggregation.

About the Producer solution. This is indeed not recommended. For the
exception itself, it's a know bug and a fix was proposed via KIP-91.


-Matthias

On 12/22/17 5:26 AM, Cedric BERTRAND wrote:
> Hello,
> 
> I'm using KafkaStream for some months and I don't succeed to implement this
> use case :
> 
> I need to compare two objects : a new one and the old one.
> At the end, I need to send a message with the diff between the 2 objets.
> 
> My first implementation was to use an aggregate and to return the diff in
> the resulting KTable.
> But when I am reading the KTable, I have no guarantee to see all messages
> with the same key (because of the commit.interval.ms configuration).
> 
> I have tryed another implementation (see the code below).
> I compute the diff in the aggregate method and I send the message in a
> KafkaProducer that is created outside KafkaStream.
> By doing this, my code is working but with heavy load, I got the following
> error message (many many times) :
> 
> org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s) for
> vision360-events-client-00-12: 38949 ms has passed since batch creation
> plus linger time
> 
> The fact that the Producer is created outside the DAG of KafkaStream seem
> to be a bad implementation.
> 
> 
> final Producer<K, V> producer = ....
> 
> stream
>     .groupByKey()
>     .aggregate(
>         () -> ...,
>         (key, value, aggregate) -> {
>             ...
>             producer.send(new ProducerRecord<>(topic, key, myValue),
>                 (metadata, exception) -> {
>                     if (exception!= null) {
>                         LOGGER.error(MessageFormatter.format("Failed to
> send event : {}", key).getMessage(), exception);
>                     }
>                 });
>             return myAggregateValue  ;
>         },
>         ...,
>         ....
>     )
>     .mapValues( ...)
>     .to(topic);
> 
> Is there a good way to do this in Kafka :
> To do an aggregation that send a message with a diff for all messages with
> the same key.
> 
> 
> Thanks
> 
> Cedric
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to