[ https://issues.apache.org/jira/browse/CXF-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365424#comment-16365424 ]
ASF GitHub Bot commented on CXF-7642: ------------------------------------- johnament closed pull request #381: [CXF-7642] Introduce separate modules for RxJava & RxJava2 URL: https://github.com/apache/cxf/pull/381 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java new file mode 100644 index 00000000000..e7dfc8a46d6 --- /dev/null +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/AbstractStreamingResponseExtension.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.ext; + +import java.util.Collections; + +import javax.ws.rs.core.MediaType; + +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; +import org.apache.cxf.service.invoker.Invoker; + +public abstract class AbstractStreamingResponseExtension implements JAXRSServerFactoryCustomizationExtension { + protected abstract Invoker createInvoker(JAXRSServerFactoryBean bean); + + @Override + public final void customize(JAXRSServerFactoryBean bean) { + bean.setInvoker(createInvoker(bean)); + StreamingResponseProvider<Object> streamProvider = new StreamingResponseProvider<>(); + streamProvider.setProduceMediaTypes(Collections.singletonList(MediaType.APPLICATION_JSON)); + bean.setProvider(streamProvider); + } +} diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java new file mode 100644 index 00000000000..a57b57484dd --- /dev/null +++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorCustomizer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.reactor.server; + +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension; +import org.apache.cxf.service.invoker.Invoker; + +public class ReactorCustomizer extends AbstractStreamingResponseExtension { + @Override + protected Invoker createInvoker(JAXRSServerFactoryBean bean) { + Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true) + .getOrDefault("useStreamingSubscriber", null); + ReactorInvoker invoker = new ReactorInvoker(); + if (useStreamingSubscriber != null) { + invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber); + } + return invoker; + } +} \ No newline at end of file diff --git a/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension b/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension new file mode 100644 index 00000000000..0bca2eb9a08 --- /dev/null +++ b/rt/rs/extensions/reactor/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension @@ -0,0 +1 @@ +org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer \ No newline at end of file diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml index 2e3df9ea030..c016384f682 100644 --- a/rt/rs/extensions/rx/pom.xml +++ b/rt/rs/extensions/rx/pom.xml @@ -45,25 +45,6 @@ <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <scope>provided</scope> - <optional>true</optional> - </dependency> - <dependency> - <groupId>io.reactivex.rxjava2</groupId> - <artifactId>rxjava</artifactId> - <scope>provided</scope> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.apache.cxf</groupId> - <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - <optional>true</optional> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> </dependency> </dependencies> </project> diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java new file mode 100644 index 00000000000..55bf3b9706d --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/server/ObservableCustomizer.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.server; + +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension; +import org.apache.cxf.service.invoker.Invoker; + +public class ObservableCustomizer extends AbstractStreamingResponseExtension { + @Override + protected Invoker createInvoker(JAXRSServerFactoryBean bean) { + return new ObservableInvoker(); + } +} diff --git a/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension b/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension new file mode 100644 index 00000000000..bb0e3d186f0 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension @@ -0,0 +1 @@ +org.apache.cxf.jaxrs.rx.server.ObservableCustomizer \ No newline at end of file diff --git a/rt/rs/extensions/rx2/pom.xml b/rt/rs/extensions/rx2/pom.xml new file mode 100644 index 00000000000..74f97642336 --- /dev/null +++ b/rt/rs/extensions/rx2/pom.xml @@ -0,0 +1,55 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>cxf-rt-rs-extension-rx2</artifactId> + <packaging>bundle</packaging> + <name>Apache CXF JAX-RS Extensions: RxJava2</name> + <description>Apache CXF JAX-RS Extensions: RxJava2</description> + <url>http://cxf.apache.org</url> + <parent> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-parent</artifactId> + <version>3.2.3-SNAPSHOT</version> + <relativePath>../../../../parent/pom.xml</relativePath> + </parent> + <dependencies> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-frontend-jaxrs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>io.reactivex.rxjava2</groupId> + <artifactId>rxjava</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java similarity index 100% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvoker.java diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java similarity index 100% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java similarity index 100% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java similarity index 100% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java similarity index 100% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java similarity index 100% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java new file mode 100644 index 00000000000..e57bb39a959 --- /dev/null +++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOCustomizer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx2.server; + +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension; +import org.apache.cxf.service.invoker.Invoker; + +public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension { + @Override + protected Invoker createInvoker(JAXRSServerFactoryBean bean) { + Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true) + .getOrDefault("useStreamingSubscriber", null); + ReactiveIOInvoker invoker = new ReactiveIOInvoker(); + if (useStreamingSubscriber != null) { + invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber); + } + return invoker; + } +} diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java similarity index 92% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java rename to rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java index 13092dc0314..e8871f45446 100644 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java +++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java @@ -41,14 +41,14 @@ protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) { final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - single.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); return asyncResponse; } protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) { final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) { - f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); } return asyncResponse; } diff --git a/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension b/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension new file mode 100644 index 00000000000..7fd3d78e82d --- /dev/null +++ b/rt/rs/extensions/rx2/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension @@ -0,0 +1 @@ +org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer \ No newline at end of file diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml index 4398491fb37..debbe21497d 100644 --- a/rt/rs/pom.xml +++ b/rt/rs/pom.xml @@ -38,6 +38,7 @@ <module>extensions/providers</module> <module>extensions/search</module> <module>extensions/rx</module> + <module>extensions/rx2</module> <module>extensions/reactor</module> <module>extensions/reactivestreams</module> <module>security</module> diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml index 6106cbcfcde..283964c91b7 100644 --- a/systests/jaxrs/pom.xml +++ b/systests/jaxrs/pom.xml @@ -322,6 +322,11 @@ <artifactId>cxf-rt-rs-extension-rx</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-extension-rx2</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-rs-extension-reactor</artifactId> diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java index 5063b5e928a..2b18c839a7c 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java @@ -19,8 +19,6 @@ package org.apache.cxf.systest.jaxrs.reactive; -import java.util.Collections; - import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.cxf.Bus; @@ -28,8 +26,7 @@ import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; -import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker; +import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; @@ -52,13 +49,9 @@ protected void run() { private JAXRSServerFactoryBean createFactoryBean(Bus bus, boolean useStreamingSubscriber, String relAddress) { JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - ReactiveIOInvoker invoker = new ReactiveIOInvoker(); - invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber); - sf.setInvoker(invoker); + sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber); sf.setProvider(new JacksonJsonProvider()); - StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>(); - streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); - sf.setProvider(streamProvider); + new ReactiveIOCustomizer().customize(sf); sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(RxJava2FlowableService.class); sf.setResourceProvider(RxJava2FlowableService.class, diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java index 85d576d6791..15759b11b52 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableServer.java @@ -19,8 +19,6 @@ package org.apache.cxf.systest.jaxrs.reactive; -import java.util.Collections; - import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.cxf.Bus; @@ -28,8 +26,7 @@ import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; -import org.apache.cxf.jaxrs.rx.server.ObservableInvoker; +import org.apache.cxf.jaxrs.rx.server.ObservableCustomizer; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; @@ -45,11 +42,8 @@ protected void run() { // Make sure default JSONProvider is not loaded bus.setProperty("skip.default.json.provider.registration", true); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - sf.setInvoker(new ObservableInvoker()); sf.setProvider(new JacksonJsonProvider()); - StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>(); - streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); - sf.setProvider(streamProvider); + new ObservableCustomizer().customize(sf); sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(RxJavaObservableService.class); sf.setResourceProvider(RxJavaObservableService.class, diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java index a3ea3deec4d..d66660cdc27 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java @@ -77,6 +77,7 @@ public void testTextJsonImplicitListAsyncStream() throws Exception { .doOnNext(helloWorldBean -> holder.value = helloWorldBean) .subscribe(); Thread.sleep(500); + assertNotNull(holder.value); assertEquals("Hello", holder.value.getGreeting()); assertEquals("World", holder.value.getAudience()); } diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java index dac4c2c219b..8965073dd0d 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java @@ -19,8 +19,6 @@ package org.apache.cxf.systest.jaxrs.reactor; -import java.util.Collections; - import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.cxf.Bus; @@ -28,8 +26,7 @@ import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; -import org.apache.cxf.jaxrs.reactor.server.ReactorInvoker; +import org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; public class ReactorServer extends AbstractBusTestServerBase { @@ -44,13 +41,9 @@ protected void run() { // Make sure default JSONProvider is not loaded bus.setProperty("skip.default.json.provider.registration", true); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - ReactorInvoker invoker = new ReactorInvoker(); - invoker.setUseStreamingSubscriberIfPossible(false); - sf.setInvoker(invoker); + sf.getProperties(true).put("useStreamingSubscriber", false); sf.setProvider(new JacksonJsonProvider()); - StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>(); - streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); - sf.setProvider(streamProvider); + new ReactorCustomizer().customize(sf); sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(FluxService.class, MonoService.class); sf.setResourceProvider(FluxService.class, @@ -61,11 +54,8 @@ protected void run() { server1 = sf.create(); JAXRSServerFactoryBean sf2 = new JAXRSServerFactoryBean(); - sf2.setInvoker(new ReactorInvoker()); - StreamingResponseProvider<HelloWorldBean> streamProvider2 = new StreamingResponseProvider<HelloWorldBean>(); - streamProvider2.setProduceMediaTypes(Collections.singletonList("application/json")); - sf2.setProvider(streamProvider2); sf2.setProvider(new JacksonJsonProvider()); + new ReactorCustomizer().customize(sf2); sf2.getOutInterceptors().add(new LoggingOutInterceptor()); sf2.setResourceClasses(FluxService.class); sf2.setResourceProvider(FluxService.class, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Split rxjava and rxjava2 modules > -------------------------------- > > Key: CXF-7642 > URL: https://issues.apache.org/jira/browse/CXF-7642 > Project: CXF > Issue Type: Improvement > Components: JAX-RS > Affects Versions: 3.2.2 > Reporter: John D. Ament > Priority: Major > > Split the two modules (rxjava, rxjava2) to fix the dependency hierarchy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)