There is no built-in support for this atm. Async processing support as suggested via KIP-408 might help in the future. But there in not much activity on this KIP atm. (https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams)
> If I add submit Threads in foreach to thread pool, then I am not > understanding how can I block committing the message until it is successfully > executed Well. You can't block committing. What you could try instead is, to use a `transform()` instead of `foreach()` and attach a state store to transform. Each time you receive an input message, you first put it into the store. If the message is successfully processed, you remove if from the store. If you crash and restart, you check the store for pending messages and retry processing them. -Matthias On 6/13/19 1:52 PM, Divya Goel wrote: > Hi, > > I have the requirement to dedup messages within the window and take bunch of > actions on the filtered message. I understand that we can get parallelism > with the number of Kstream thread and can get maximum parallelism as number > of partitions. But the actions that I take on the filtered message are > various IO operations. As my applications is IO bound, I want to be able to > execute multiple message after the window parallelly, and not sequentially. > > I am using following code to start with. The forEach allows to execute > messages sequentially. If I add submit Threads in foreach to thread pool, > then I am not understanding how can I block committing the message until it > is successfully executed. Some message executed in the window can fail as > well. Please let me know, if you have any suggestions to process the windowed > messages parallelly with Kstream. > > builder.<String, String>stream(topic) > .groupByKey() > > .windowedBy(TimeWindows.of(Duration.ofMillis(windowedTime)).advanceBy(Duration.ofMillis(windowedTime))) > .reduce((value1, value2) -> value2, Materialized.as(reducerTopic)) > .toStream() > .foreach((key, value) -> System.out.println(key + " => " + value)); > > Thanks, > Divya >
signature.asc
Description: OpenPGP digital signature