[ 
https://issues.apache.org/jira/browse/CXF-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779693#comment-17779693
 ] 

Sebastian Violet commented on CXF-8950:
---------------------------------------

Made the following changes to the code and it works well. I have it delay 
closing the client if the return type is {*}Response{*}:
{code:java}
diff --git a/pom.xml b/pom.xml
index 375dd56..41ed82b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,15 @@
 
   <build>
     <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <compilerArgs>
+            --enable-preview
+          </compilerArgs>
+        </configuration>
+      </plugin>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>exec-maven-plugin</artifactId>
@@ -95,9 +104,7 @@
 <!--          <mainClass>LargeDataTester</mainClass>-->
           <executable>java</executable>
           <arguments>
-            <argument>-Xmx512m</argument>
-            <argument>-XX:+HeapDumpOnOutOfMemoryError</argument>
-            <argument>-XX:HeapDumpPath=/tmp</argument>
+            <argument>--enable-preview</argument>
             <argument>-classpath</argument>
             <classpath />
             <argument>LargeDataTester</argument>
diff --git a/src/main/java/LargeDataTester.java 
b/src/main/java/LargeDataTester.java
index 2900f08..0aa50df 100644
--- a/src/main/java/LargeDataTester.java
+++ b/src/main/java/LargeDataTester.java
@@ -21,6 +21,9 @@ import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
@@ -75,7 +78,7 @@ public class LargeDataTester {
     try {
       final DataServiceImpl statusServiceClient = 
DataServiceImpl.builder().apiUrl(endpoint).build();
 
-      Response response = statusServiceClient.data();
+      Response response = statusServiceClient.dataRaw();
 
       log.info("Response Status is: {}", response.getStatus());
       response.getHeaders().forEach((s, objects) -> log.info("Response Header, 
{}: {}", s, objects));
@@ -127,7 +130,11 @@ public class LargeDataTester {
     // ---------------------------------------------------
     @GET
     @Path("data")
-    Response data();
+    Response dataRaw();
+
+    @GET
+    @Path("data")
+    Data data();
   }
 
   @Builder
@@ -146,21 +153,39 @@ public class LargeDataTester {
         .noBackoff()
         .build(); //No retries
 
+    private final ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor();
+
     private <T> T doWithRetry(Function<DataService, T> functionToRetry) {
       return this.doWithRetry(functionToRetry, this.retryTemplate);
     }
 
     private <T> T doWithRetry(Function<DataService, T> functionToRetry, 
RetryTemplate templateToUse) {
       DataService service = this.getService();
+      final AtomicBoolean isResponse = new AtomicBoolean(false);
       try {
-        return templateToUse.execute(retryContext -> 
functionToRetry.apply(service));
+        T response = templateToUse.execute(retryContext -> 
functionToRetry.apply(service));
+        isResponse.set(response instanceof Response);
+        return response;
       } finally {
-        WebClient.client(service).close();
+        executorService.execute(() -> {
+          try {
+            // Allow enough time for the input stream to be read by consumer 
of Response
+            if(isResponse.get()) {
+              Thread.sleep(60000);
+            }
+          } catch (InterruptedException e) {}
+          WebClient.client(service).close();
+        });
       }
     }
 
     @Override
-    public Response data() {
+    public Response dataRaw() {
+      return this.doWithRetry(service -> service.dataRaw());
+    }
+
+    @Override
+    public Data data() {
       return this.doWithRetry(service -> service.data());
     }
{code}
ℹ️ {*}Note{*}: This uses Java 20 virtual threads to reduce overhead of platform 
threads.

> HttpClient in CXF closing prematurely; 4.0.4-SNAPSHOT
> -----------------------------------------------------
>
>                 Key: CXF-8950
>                 URL: https://issues.apache.org/jira/browse/CXF-8950
>             Project: CXF
>          Issue Type: Bug
>          Components: JAX-RS
>    Affects Versions: 4.0.3, 4.0.4
>            Reporter: Sebastian Violet
>            Assignee: Daniel Kulp
>            Priority: Critical
>             Fix For: 3.6.3, 4.0.4
>
>         Attachments: CXF-HTTPClient-LargePayload.zip
>
>
> When processing requests using the JAX RS client which used the new 
> HttpClient, there is the input stream is closed prematurely.
>  
> [^CXF-HTTPClient-LargePayload.zip] , you will see that we are using 
> {*}Response{*}, because we are interested in getting response status as well 
> as the response headers. This fails with the following error:
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: java.io.IOException: 
> closed
>     at LargeDataTester.main(LargeDataTester.java:89)
> Caused by: java.io.IOException: closed
>     at 
> java.net.http/jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.current(ResponseSubscribers.java:448)
>     at 
> java.net.http/jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.read(ResponseSubscribers.java:508)
>     at java.base/java.io.FilterInputStream.read(FilterInputStream.java:119)
>     at 
> org.apache.cxf.transport.http.HttpClientHTTPConduit$HttpClientFilteredInputStream.read(HttpClientHTTPConduit.java:422)
>     at 
> java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:197)
>     at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:333)
>     at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:376)
>     at java.base/sun.nio.cs.StreamDecoder.lockedRead(StreamDecoder.java:219)
>     at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:173)
>     at java.base/java.io.InputStreamReader.read(InputStreamReader.java:189)
>     at java.base/java.io.Reader.read(Reader.java:265)
>     at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1610)
>     at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1589)
>     at org.apache.commons.io.IOUtils.copy(IOUtils.java:1384)
>     at org.apache.commons.io.IOUtils.copy(IOUtils.java:1153)
>     at org.apache.commons.io.IOUtils.toString(IOUtils.java:3105)
>     at LargeDataTester.main(LargeDataTester.java:84)
> Caused by: java.io.IOException: selector manager closed
>     at 
> java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.selectorClosedException(HttpClientImpl.java:1061)
>     at 
> java.net.http/jdk.internal.net.http.HttpClientImpl.closeSubscribers(HttpClientImpl.java:552)
>     at 
> java.net.http/jdk.internal.net.http.HttpClientImpl.stop(HttpClientImpl.java:543)
>     at 
> java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.shutdown(HttpClientImpl.java:1165)
>     at 
> java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.run(HttpClientImpl.java:1364)
>  {code}
>  
> {color:#FF0000}*You can execute the code like so:*{color}
> {code:java}
> mvn compile exec:exec{code}
> ℹ️ {*}Note{*}: You can do a diff between what works(Letting CXF deserialize 
> the object response payload automatically), and the last commit(Using 
> Response).
> {code:java}
> git diff HEAD^ HEAD {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to