Hi Mansour, Your use case sounds a bit complicated. On one hand you need parallel processing, on the other hand you need blocks/sequential processing. This may have a bad smell, but I don't know the details of your use case. I would say try to come up with a way for real parallel processing where ordering doesn't matter. At the end this will make it both simpler and faster.
Based on your use case, here are some things you could look at: 1) A dynamic endpoint of 1 toD("vm:${header.groupId}?queueSize=1&concurrentConsumers=1& defaultBlockWhenFull=true") The question is when this is really dynamic who will listen to this endpoint? https://camel.apache.org/components/3.18.x/vm-component.html 2) Some other constructs: https://camel.apache.org/manual/try-catch-finally.html https://camel.apache.org/components/3.18.x/eips/throttle-eip.html https://camel.apache.org/components/3.18.x/eips/recipientList-eip.html (It has parallel processing) https://camel.apache.org/components/3.18.x/eips/split-eip.html (It has parralel processing) But as said combinations of above are not obvious. A throttle can be used for example when an external API has rate limiting, but for internal use in Camel I try to avoid it. You may eloborate a bit about what you trying to achieve functionally. Raymond On Thu, Sep 8, 2022 at 4:07 AM Mansour Al Akeel <mansour.alak...@gmail.com> wrote: > I have a stream of commands that needs to be processed in parallel for > performance reasons, and to be grouped on a key for sequential processing > to avoid conflicts. > > I don't know the number of groups in advance. One way that came to my mind, > is dynamic channels/queues. So the stream of commands is sorted into > queues, where each queue represents a group. Then handle those > sequentially. > > I have been looking at toD, and dynamic router, but no luck. > I am hoping to be able to achieve something like: > > from("direct:commands") > .to("direct:${header.groupId}) > .process(new MyProcessor()) > > > Therefore processing commands in parallel, across all queues, while > blocking on the same queue. > If an error occurs, during the process of a command, I would like to be > able to flush/return the failed, and all the pending ones. In other words, > empty the queue. > > > How to achieve this ? > I am open for advice about better ideas/setup. >