Hi

This topic is also discussed on the chat room
https://camel.zulipchat.com/#narrow/stream/257302-camel-quarkus/topic/Fully.20Non-Blocking.20Reactive

On Tue, Jun 28, 2022 at 7:49 PM Keith Link <kl...@acm.org> wrote:

> I am trying to get a Camel Quarkus example that only use the Vertx IO
> threads on front and backside. I have from:( platform-http:/xxxx) on the
> frontside and sending the exchange to a multicast to 3 routes that all us
> async processors that use the Quarkus Resteasy reactive HTTP clients to
> make the outbound calls and return Completionstages. The multicast uses a
> GroupedExchangeAggregationStrategy to collect the list of exchanges. The
> last step is sending the list of exchanges to async processor that uses the
> list to aggregate the backside responses . The thread that everything runs
> on is: vert.x-worker-thread-0 according to the log output and the debugger.
>     GroupedExchangeAggregationStrategy aggregationStrategy = new
> GroupedExchangeAggregationStrategy();
>
>     from("platform-http:/movie?produces=application/json")
>             .routeId("platform-http")
>             .to("log:Before Multicast")
>             .multicast(aggregationStrategy)
>                 .parallelAggregate()
>             .to("direct:movies", "direct:episodes", "direct:people")
>             .end()
>             .process(aggregateProcessor) // Combine the responses
>             .to("log:platform-http Route");
>
>     from("direct:movies").routeId("movies")
>             .process(movieProcessor);
>     from("direct:episodes").routeId("episodes")
>             .process(episodesProcessor);
>     from("direct:people").routeId("people")
>             .process(peopleProcessor);
>
> ==========================================================================================
> @ApplicationScoped
> public class AggregateProcessor implements AsyncProcessor {
> private static final Logger LOG =
> LoggerFactory.getLogger(AggregateProcessor.class);
>
> @NonBlocking
> public boolean process(Exchange exchange, AsyncCallback callback) {
>
>     ResponseHolder aResponseHolder = new ResponseHolder();
>
>     processAllExchanges(exchange, aResponseHolder);
>     setExchangeBodyToResponseHolderContents(exchange, aResponseHolder);
>
>     LOG.info("AggregateProcessor Done");
>
>     callback.done(false);
>     return false;
>
> }
>
>
> Keith Link
> kl...@acm.org
>
>
>
>
>

-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to