[
https://issues.apache.org/jira/browse/CXF-8022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andriy Redko resolved CXF-8022.
-------------------------------
Resolution: Fixed
> Thread hangs using Reactor Flux when Exeption is Thrown
> -------------------------------------------------------
>
> Key: CXF-8022
> URL: https://issues.apache.org/jira/browse/CXF-8022
> Project: CXF
> Issue Type: Bug
> Components: JAX-RS
> Affects Versions: 3.2.8, 3.3.1
> Environment: * JDK 11
> * CXF 3.3.1
> * Spring Boot (latest)
> * Undertow (latest)
> * Reactor (latest)
> Reporter: KimJohn Quinn
> Assignee: Andriy Redko
> Priority: Blocker
> Labels: cxf, reactor, spring-boot
> Fix For: 3.2.9, 3.3.2
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> We have run into a serious issue that was not expected or caught until late
> it appears with CXF and Reactor in Undertow (using Spring Boot)...
> The Flux throws an exception inside and hangs infinitely, requiring the
> process to be forcibly killed (kill -9).
> We are using Spring Boot (latest), CXF (latest) with Reactor extension,
> Undertow and Reactor. Has anyone run into this and are there or could there
> be any potential work around to handle this?
> We wanted to stick with using CXF for both our standard REST resources and
> Reactor resources because CXF has always worked well for us plus we are using
> the OpenAPI swagger integration. Not being able to resolve this or have an
> established workaround may cause us to have to completely revert to Spring
> controllers.
> There seems to be a problem with the onComplete handling of Fluxes when it
> throws a RuntimeException. Basically, if you try to the endpoint, it will
> hang. I've tracked it down to the writeTo function in
> StreamingAsyncSubscriber where the queue is empty but "completed" is still
> false so it goes into an infinite loop. You can repeat it by running the
> code below.
> {code:java}
> @Path("/errors")
> @Produces(APPLICATION_JSON)
> public Flux<String> errors()
> {
> Flux<String> response = Flux
> .range(1, 5)
> .flatMap(item->
> {
> if (item <=4)
> {
> return Mono.just("item: " + item);
> }
> else
> {
> System.out.println("---Hitting exception");
> return Mono.error(new RuntimeException("runtime
> error"));
> }
> });
> return response;
> } {code}
>
> I also posted this on the forums hoping someone can assist -
> [http://cxf.547215.n5.nabble.com/Thread-hangs-when-using-reactor-cxf-and-undertow-td5796369.html]
>
> The log output, up until manually killing the process looks like:
> [DEBUG] o.a.c.j.interceptor.JAXRSInInterceptor : Found operation: errors
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.interceptor.OneWayProcessorIntercep tor@6e760b48
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.interceptor.ServiceInvokerIntercept or@6d7702a0
> [DEBUG] o.a.cxf.service.invoker.AbstractInvoker : Invoking method public
> reactor.core.publisher.Flux io.logicdrop.drools.endpoints.DroolsR
> ulesetResource.errors() on object
> io.logicdrop.drools.endpoints.DroolsRulesetResource@23718909 with params [].
> [DEBUG] reactor.util.Loggers$LoggerFactory : Using Slf4j logging framework
> ---Hitting exception
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.interceptor.OutgoingChainIntercepto r@308be65a
> [DEBUG] o.a.c.i.OutgoingChainInterceptor : Interceptors contributed by bus: []
> [DEBUG] o.a.c.i.OutgoingChainInterceptor : Interceptors contributed by
> service: []
> [DEBUG] o.a.c.i.OutgoingChainInterceptor : Interceptors contributed by
> endpoint: [org.apache.cxf.interceptor.MessageSenderIntercept or@3f0061c3]
> [DEBUG] o.a.c.i.OutgoingChainInterceptor : Interceptors contributed by
> binding: [org.apache.cxf.jaxrs.interceptor.JAXRSOutIntercept or@5419dbea]
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Adding interceptor
> org.apache.cxf.interceptor.MessageSenderInterceptor@3f0061c3 to phase
> prepare-send
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Adding interceptor
> org.apache.cxf.jaxrs.interceptor.JAXRSOutInterceptor@5419dbea to phas e
> marshal
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Chain
> org.apache.cxf.phase.PhaseInterceptorChain@63b5b52 was created. Current flow:
> prepare-send [MessageSenderInterceptor]
> marshal [JAXRSOutInterceptor]
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.interceptor.MessageSenderIntercepto r@3f0061c3
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Adding interceptor
> org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndi
> ngInterceptor@30cf09f to phase prepare-send-ending
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Chain
> org.apache.cxf.phase.PhaseInterceptorChain@63b5b52 was modified. Current flow:
> prepare-send [MessageSenderInterceptor]
> marshal [JAXRSOutInterceptor]
> prepare-send-ending [MessageSenderEndingInterceptor]
> [DEBUG] o.apache.cxf.phase.PhaseInterceptorChain : Invoking handleMessage on
> interceptor org.apache.cxf.jaxrs.interceptor.JAXRSOutIntercept or@5419dbea
> [DEBUG] o.a.c.j.interceptor.JAXRSOutInterceptor : Response content type is:
> application/json
> [DEBUG] o.apache.cxf.ws.addressing.ContextUtils : retrieving MAPs from
> context property javax.xml.ws.addressing.context.inbound
> [DEBUG] o.apache.cxf.ws.addressing.ContextUtils : WS-Addressing - failed to
> retrieve Message Addressing Properties from context
> Same scenario with a Mono appears to show the stream "completing"...
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)