[ https://issues.apache.org/jira/browse/CXF-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779693#comment-17779693 ]
Sebastian Violet commented on CXF-8950: --------------------------------------- Made the following changes to the code and it works well. I have it delay closing the client if the return type is {*}Response{*}: {code:java} diff --git a/pom.xml b/pom.xml index 375dd56..41ed82b 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,15 @@ <build> <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <compilerArgs> + --enable-preview + </compilerArgs> + </configuration> + </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> @@ -95,9 +104,7 @@ <!-- <mainClass>LargeDataTester</mainClass>--> <executable>java</executable> <arguments> - <argument>-Xmx512m</argument> - <argument>-XX:+HeapDumpOnOutOfMemoryError</argument> - <argument>-XX:HeapDumpPath=/tmp</argument> + <argument>--enable-preview</argument> <argument>-classpath</argument> <classpath /> <argument>LargeDataTester</argument> diff --git a/src/main/java/LargeDataTester.java b/src/main/java/LargeDataTester.java index 2900f08..0aa50df 100644 --- a/src/main/java/LargeDataTester.java +++ b/src/main/java/LargeDataTester.java @@ -21,6 +21,9 @@ import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import lombok.AccessLevel; import lombok.AllArgsConstructor; @@ -75,7 +78,7 @@ public class LargeDataTester { try { final DataServiceImpl statusServiceClient = DataServiceImpl.builder().apiUrl(endpoint).build(); - Response response = statusServiceClient.data(); + Response response = statusServiceClient.dataRaw(); log.info("Response Status is: {}", response.getStatus()); response.getHeaders().forEach((s, objects) -> log.info("Response Header, {}: {}", s, objects)); @@ -127,7 +130,11 @@ public class LargeDataTester { // --------------------------------------------------- @GET @Path("data") - Response data(); + Response dataRaw(); + + @GET + @Path("data") + Data data(); } @Builder @@ -146,21 +153,39 @@ public class LargeDataTester { .noBackoff() .build(); //No retries + private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); + private <T> T doWithRetry(Function<DataService, T> functionToRetry) { return this.doWithRetry(functionToRetry, this.retryTemplate); } private <T> T doWithRetry(Function<DataService, T> functionToRetry, RetryTemplate templateToUse) { DataService service = this.getService(); + final AtomicBoolean isResponse = new AtomicBoolean(false); try { - return templateToUse.execute(retryContext -> functionToRetry.apply(service)); + T response = templateToUse.execute(retryContext -> functionToRetry.apply(service)); + isResponse.set(response instanceof Response); + return response; } finally { - WebClient.client(service).close(); + executorService.execute(() -> { + try { + // Allow enough time for the input stream to be read by consumer of Response + if(isResponse.get()) { + Thread.sleep(60000); + } + } catch (InterruptedException e) {} + WebClient.client(service).close(); + }); } } @Override - public Response data() { + public Response dataRaw() { + return this.doWithRetry(service -> service.dataRaw()); + } + + @Override + public Data data() { return this.doWithRetry(service -> service.data()); } {code} ℹ️ {*}Note{*}: This uses Java 20 virtual threads to reduce overhead of platform threads. > HttpClient in CXF closing prematurely; 4.0.4-SNAPSHOT > ----------------------------------------------------- > > Key: CXF-8950 > URL: https://issues.apache.org/jira/browse/CXF-8950 > Project: CXF > Issue Type: Bug > Components: JAX-RS > Affects Versions: 4.0.3, 4.0.4 > Reporter: Sebastian Violet > Assignee: Daniel Kulp > Priority: Critical > Fix For: 3.6.3, 4.0.4 > > Attachments: CXF-HTTPClient-LargePayload.zip > > > When processing requests using the JAX RS client which used the new > HttpClient, there is the input stream is closed prematurely. > > [^CXF-HTTPClient-LargePayload.zip] , you will see that we are using > {*}Response{*}, because we are interested in getting response status as well > as the response headers. This fails with the following error: > {code:java} > Exception in thread "main" java.lang.RuntimeException: java.io.IOException: > closed > at LargeDataTester.main(LargeDataTester.java:89) > Caused by: java.io.IOException: closed > at > java.net.http/jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.current(ResponseSubscribers.java:448) > at > java.net.http/jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.read(ResponseSubscribers.java:508) > at java.base/java.io.FilterInputStream.read(FilterInputStream.java:119) > at > org.apache.cxf.transport.http.HttpClientHTTPConduit$HttpClientFilteredInputStream.read(HttpClientHTTPConduit.java:422) > at > java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:197) > at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:333) > at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:376) > at java.base/sun.nio.cs.StreamDecoder.lockedRead(StreamDecoder.java:219) > at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:173) > at java.base/java.io.InputStreamReader.read(InputStreamReader.java:189) > at java.base/java.io.Reader.read(Reader.java:265) > at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1610) > at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1589) > at org.apache.commons.io.IOUtils.copy(IOUtils.java:1384) > at org.apache.commons.io.IOUtils.copy(IOUtils.java:1153) > at org.apache.commons.io.IOUtils.toString(IOUtils.java:3105) > at LargeDataTester.main(LargeDataTester.java:84) > Caused by: java.io.IOException: selector manager closed > at > java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.selectorClosedException(HttpClientImpl.java:1061) > at > java.net.http/jdk.internal.net.http.HttpClientImpl.closeSubscribers(HttpClientImpl.java:552) > at > java.net.http/jdk.internal.net.http.HttpClientImpl.stop(HttpClientImpl.java:543) > at > java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.shutdown(HttpClientImpl.java:1165) > at > java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.run(HttpClientImpl.java:1364) > {code} > > {color:#FF0000}*You can execute the code like so:*{color} > {code:java} > mvn compile exec:exec{code} > ℹ️ {*}Note{*}: You can do a diff between what works(Letting CXF deserialize > the object response payload automatically), and the last commit(Using > Response). > {code:java} > git diff HEAD^ HEAD {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)