Hi, what I don't understand is what you mean by
>> But when I am reading the KTable, I have no guarantee to see all messages >> with the same key (because of the commit.interval.ms configuration). Can you elaborate? I actually think an aggregation should be the correct operator to use. However, you would need to return a pair with old/new value and do a consecutive mapValues that compare the diff over both -- ie, it's a two step approach instead of a single aggregation. About the Producer solution. This is indeed not recommended. For the exception itself, it's a know bug and a fix was proposed via KIP-91. -Matthias On 12/22/17 5:26 AM, Cedric BERTRAND wrote: > Hello, > > I'm using KafkaStream for some months and I don't succeed to implement this > use case : > > I need to compare two objects : a new one and the old one. > At the end, I need to send a message with the diff between the 2 objets. > > My first implementation was to use an aggregate and to return the diff in > the resulting KTable. > But when I am reading the KTable, I have no guarantee to see all messages > with the same key (because of the commit.interval.ms configuration). > > I have tryed another implementation (see the code below). > I compute the diff in the aggregate method and I send the message in a > KafkaProducer that is created outside KafkaStream. > By doing this, my code is working but with heavy load, I got the following > error message (many many times) : > > org.apache.kafka.common.errors.TimeoutException: Expiring 16 record(s) for > vision360-events-client-00-12: 38949 ms has passed since batch creation > plus linger time > > The fact that the Producer is created outside the DAG of KafkaStream seem > to be a bad implementation. > > > final Producer<K, V> producer = .... > > stream > .groupByKey() > .aggregate( > () -> ..., > (key, value, aggregate) -> { > ... > producer.send(new ProducerRecord<>(topic, key, myValue), > (metadata, exception) -> { > if (exception!= null) { > LOGGER.error(MessageFormatter.format("Failed to > send event : {}", key).getMessage(), exception); > } > }); > return myAggregateValue ; > }, > ..., > .... > ) > .mapValues( ...) > .to(topic); > > Is there a good way to do this in Kafka : > To do an aggregation that send a message with a diff for all messages with > the same key. > > > Thanks > > Cedric >
signature.asc
Description: OpenPGP digital signature