[ 
https://issues.apache.org/jira/browse/CXF-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15156640#comment-15156640
 ] 

avidd commented on CXF-6776:
----------------------------

The attached zip file contains an example that produces the described error 
sporadically. To do so, it transfers the same result set a thousand times. It 
may be required to execute the test a few times to see the error. Sometimes you 
will get a different error, like stream closed, missing mime attachment. 
However, I assume these are only consequences of the original error which is 
that some series of bytes seem to not be received by the client.

> MTOM client receives bogus
> --------------------------
>
>                 Key: CXF-6776
>                 URL: https://issues.apache.org/jira/browse/CXF-6776
>             Project: CXF
>          Issue Type: Bug
>    Affects Versions: 3.1.1, 3.1.5
>         Environment: Windows 7 Professional 64 bit, java 1.8.0_45
>            Reporter: avidd
>         Attachments: CXF-6776-example.zip
>
>
> We have a SOAP web service that transmits large structured results sets (a 
> list of rows each of which is a List<String>) using MTOM. We have now 
> migrated to CXF due to some other bug in jaxws-spring. This (part of the) 
> code seemed to work fine in jaxws-spring but we had strange problems there 
> which we debugged to the lower layers as well. Now we have the situation that 
> some single bytes as received by the client are not the bytes that were sent 
> by the server. This can show as a byte having a completely wrong value or as 
> apparently a byte added *and* the byte beside having the wrong value.
> There are test cases that return result sets of several thousand rows each of 
> which has several columns in it. The result set is "streamed" via MTOM. These 
> tests fail sporadically every 10th or 20th time.
> This is what I found out until now:
> * The error is reproducible on different machines.
> * I first saw this error with CXF 3.1.1. Then I upgraded to 3.1.5 and it 
> still occurs. As said before there were similar bugs with jaxws-spring which 
> showed different failures.
> * The server cleanly writes all results into the output and flushes the 
> output stream before closing it
> * Both, client (a JUnit Test) and server (a Jetty server started by that 
> test) are running on the same machine, actually in the same VM. 
> * If I enable some of the logging in the below class (QueryResponse), this 
> seems to reduce the probability of the error. I was not able to reproduce the 
> error with the logging enabled. This is very frustrating and I think it hints 
> at some race condition.
> Our streaming result encodes rows like this:
> * 4 bytes stating the number of columns in that row as an int (trailing nulls 
> are cut off)
> * for each column: 4 bytes stating the number of bytes in that column's value 
> as an int
> * the said number of bytes, will be converted to a string
> *The test*
> * If the test runs fine, the client receives 2077 containing 15 columns each. 
> * Each column is a double or a string, it's basically the result of a simple 
> SQL query. (Of course, if it actually were SQL we would use JDBC. It's 
> actually a SAP data warehouse.)
> * If the test fails, which it only does sporadically, it is always the 14th 
> row failing.
> *Example 1 of failure*
> When it is correctly received at the client side, then this is the bytes of 
> row 14 of test 2. 
> * first 4 bytes: we have 15 columns
> * next 4 bytes: the first column has 2 bytes
> * next 2 bytes: "BA"
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...]
> {code}
> This is the erroneous bytes as received on the client side
> * first 4 bytes: we have 15 columns
> * next 4 bytes: the first columns has 2 bytes
> * next 2 bytes: "BM"
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...]
> {code}
> At first glance, it looks like a byte was added (because of the three 
> consecutive 0s). So I thought it may be an off-by-one error. But when looking 
> at the 2's complement I see:
> {code}
> good: 65:     01000001
> bad:  77, 71: 01001101 0100111
> {code}
> I can't see where a byte would have been inserted. It also doesn't look like 
> only bits were flipped. It rather looks like total bogus to me.
> *Example 2 of Failure*
> * The bytes sent
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 51, 
> 0, 0, 0, 2, ...]
> {code}
> * The bytes received
> {code}
>                                                                           XX
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 48, 
> 0, 0, 0, 2,...]
> {code}
> Similar but different problems first occurred when we upgraded from Java 7 to 
> Java 8 a year ago.  Before, everything was fine. At that time we were using 
> jaxws-spring and had the situation that sometimes the mime-boundary at the 
> end of the attachment was missing. We hoped that an upgrade to CXF would fix 
> the problem but now we even have issues with the data.
> To me it looks like something is totally broken in the JRE but this only 
> shows up when using MTOM, so it may be some integration problem. I wonder 
> whether we are the only ones seeing this behavior.
> This is my "streaming result set" class:
> {code}
> @XmlAccessorType ( XmlAccessType.NONE )
> @XmlRootElement ( name = "streamResponse" )
> @XmlType ( name = "streamResponseType" )
> public class QueryResponse {
>   private static final String MIME_TYPE_OCTET_STREAM = 
> "application/octet-stream";
>   private static final Logger LOG = 
> LoggerFactory.getLogger(QueryResponse.class);
>   private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
>   private static final ObjectPool<String> STRINGS = ObjectPool.stringPool();
>   private static final int BUFFER_SIZE = 100;
>   private static final int PREMATURE_END_OF_RESULT = -1;
>   private static final byte[] PREMATURE_END_OF_RESULT_BYTES = 
>       ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array();
>   private volatile int totalBytesRead = 0;
>   private volatile int rowsRead = 0;
>   private volatile int rowsWritten = 0;
>   private DataHandler results;
>   private BlockingQueue<List<String>> resultSet;
>   private ResultSetIterator<String> resultSetIterator;
>   private boolean receiving = true;
>   /** Create a new response. This constructor is used by for unmarshalling 
> the response. */
>   QueryResponse() { }
>   
>   /**
>    * Create a new result set encapsulating the given components.
>    * @param aResults the result iterator
>    */
>   QueryResponse(AutoCloseableIterator<List<String>> aResults) {
>     results = encode(aResults);
>   }
>   
>   @XmlElement ( required = true )
>   @XmlMimeType ( MIME_TYPE_OCTET_STREAM )
>   DataHandler getResults() {
>     return results;
>   }
>   
>   /**
>    * Set the result set, called by JAXB.
>    * @param aDataHandler the data handler
>    */
>   void setResults(DataHandler aDataHandler) {
>     if ( aDataHandler == null ) { throw new 
> NullPointerException("aDataHandler"); }
>     if ( resultSet != null ) { throw new IllegalStateException("Result set 
> already exists."); }
>     results = aDataHandler;
>     // Pipelining
>     /* parse results and fill queue while loading from the network */
>     resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE);
>     resultSetIterator = new ResultSetIterator<String>(resultSet);
>     DataHandler dataHandler = results;
>     try {
>       decode(dataHandler, resultSet, resultSetIterator);
>     } catch ( InterruptedException e ) {
>       Thread.currentThread().interrupt();
>     }
>   }
>   
>   /**
>    * Called on the client side to get a streaming and blocking iterator.
>    * @return the result set as a blocking iterator
>    */
>   public Iterator<List<String>> getResultSet() {
>     return resultSetIterator; 
>   }
>   
>   private int fill(byte[] bytes, InputStream in) throws IOException {
>     int off = 0;
>     int readCount = 0;
>     do {
>       // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off 
> ) > 0 and not EOF
>       readCount = in.read(bytes, off, bytes.length - off);  
>       off += readCount;
>       if ( readCount > 0 ) {
>         totalBytesRead += readCount;
>       }
>     } while ( readCount > 0 && off < bytes.length );
>     if ( off > 0 && off < bytes.length ) { // end of stream is a correct 
> termination
>       try {
>         readCount = in.read(bytes, off, bytes.length - off);
>         String exception = readException(in);
>         throw new RuntimeException(exception);
>       } catch ( EOFException e ) {
>         // There was no exception written by the server, just a premature end 
> of the stream causing the client-side EOF.
>         throw new RuntimeException("Premature end of stream, total bytes 
> read: " + totalBytesRead); 
>       }
>     }
>     return off;
>   }
>   private static void checkException(int len, InputStream in) throws 
> ClassNotFoundException, IOException {
>     if ( len == PREMATURE_END_OF_RESULT ) {
>       String exception = readException(in);
>       throw new RuntimeException(exception);
>     }
>   }
>   private static String readException(InputStream in) throws IOException {
>     ObjectInputStream objIn = new ObjectInputStream(in);
>     try {
>       Object object = objIn.readObject();
>       if ( object != null ) {
>         return object.toString();
>       }
>     } catch ( ClassNotFoundException e ) {
>       throw new RuntimeException("Could not read exception.", e);
>     }
>     return "No exception received from service after premature end of result";
>   }
>   private DataHandler encode(AutoCloseableIterator<List<String>> aResults) {
>     assert ( aResults != null );
>     final PipedOutputStream out = new PipedOutputStream();
>     DataHandler dh = new DataHandler(new StreamDataSource(out, 
> MIME_TYPE_OCTET_STREAM));
>     Encoder encoder = new Encoder(out, aResults, new 
> ServerExceptionHandler());
>     new Thread(encoder).start();
>     return dh;
>   }
>   
>   private void decode(
>       DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet, 
> ExceptionHandler exceptionHandler) 
>       throws InterruptedException {
>     Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler);
>     new Thread(decoder).start();
>   }
>   
>   private void awaitIteratorBufferNotFull() throws InterruptedException {
>     while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); }
>   }
>   private void awaitElements() throws InterruptedException {
>     while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); }
>   }
>   private static final class StreamDataSource implements DataSource {
>     private final String name = UUID.randomUUID().toString();
>     private final InputStream in;
>     private final String mimeType;
>     private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
>       ArgumentChecks.checkNotNull(aOut, "aOut");
>       ArgumentChecks.checkNotNull(aMimeType, "aMimeType");
>       try {
>         in = new PipedInputStream(aOut);
>       } catch ( IOException e ) {
>         throw new RuntimeException("Could not create input stream.", e);
>       }
>       mimeType = aMimeType;
>     }
>     @Override public String getName() { return name; }
>     @Override public String getContentType() { return mimeType; }
>     /**
>      * {@inheritDoc}
>      * 
>      * This implementation violates the specification in that it is 
> destructive. Only the first call will return an
>      * appropriate input stream.
>      */
>     @Override public InputStream getInputStream() { return in; }
>     @Override public OutputStream getOutputStream() { throw new 
> UnsupportedOperationException(); }
>   }
>   
>   /**
>    * Decodes the contents of an input stream as written by the {@link 
> com.tn_ag.sap.QueryResponse.Encoder} and writes
>    * parsed rows to a {@link java.util.Queue}.
>    */
>   private class Decoder implements Runnable {
>     private final DataHandler dataHandler;
>     private final BlockingQueue<List<String>> resultSet;
>     private final ExceptionHandler exceptionHandler;
>     private final byte[] lenBytes = new byte[4];
>     Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet, 
> ExceptionHandler aHandler) {
>       ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler");
>       ArgumentChecks.checkNotNull(aResultSet, "aResultSet");
>       ArgumentChecks.checkNotNull(aHandler, "aHandler");
>       dataHandler = aDataHandler;
>       resultSet = aResultSet;
>       exceptionHandler = aHandler;
>     }
>     
>     @Override
>     public void run() {
>       InputStream in = null;
>       List<String> row;
>       int len;
>       try { 
>         in = dataHandler.getInputStream();
>         
>         while ( receiving ) {
>           synchronized ( resultSet ) {
>             receiving = fill(lenBytes, in) > 0;       // read next row's 
> length in number of columns 
>             len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length 
> to integer
>             if ( !receiving || len == 0 ) { break; }
>             checkException(len, in);                  // -1 signals an 
> exception
>             row = readRow(in, len);
>             awaitIteratorBufferNotFull();
>             resultSet.put(row);
>             rowsRead++;
>             if ( rowsRead % 1000 == 0 ) {
>               LOG.debug("already received {} rows", rowsRead);
>             }
>             resultSet.notifyAll();                    // notify waiting 
> consumer threads
>           }
>         }
>         stopReception();
>         LOG.debug("received a total of {} rows.", rowsRead);
>       } catch ( InterruptedException e ) {
>         LOG.info("Result reception interrupted.");
>       } catch ( Exception e ) {
>         exceptionHandler.handle(e);
>         stopReception();
>       } finally {
>         receiving = false;
>         try {
>           if ( in != null ) { in.close(); }
>         } catch ( IOException e ) {
>           exceptionHandler.handle(e);
>         }
>       }
>     }
>     private List<String> readRow(InputStream in, int len) throws IOException {
>       List<String> row = new ArrayList<>(len);           // create list of 
> appropriate length
>       for ( int col = 0; col < len; col++ ) {            // for each column   
>            
>         fill(lenBytes, in);                              // read the value 
> length (fixed for some types if schema known)
>         int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the 
> length of the value as bytes to an int
>         final byte[] bytes = new byte[valLen];           // allocate a buffer 
> of exactly the required size
>         fill(bytes, in);              
>         row.add(STRINGS.internalize(new String(bytes)));
>       }
>       return row;
>     }
>     private void stopReception() {
>       synchronized ( resultSet ) {
>         receiving = false;           // we will stop parsing now
>         resultSet.notifyAll();       // consumers can now consume the 
> remaining elements from the queue
>         LOG.debug("Read " + rowsRead + " rows from binary stream");
>       }                
>     }
>   }
>   
>   /**
>    * Encodes the given result set and writes the result to an output stream. 
>    */
>   private class Encoder implements Runnable {
>     private final OutputStream out;
>     private final AutoCloseableIterator<List<String>> iterator;
>     private final ExceptionHandler handler;
>     Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults, 
> ExceptionHandler aHandler) {
>       ArgumentChecks.checkNotNull(aOut, "aOut");
>       ArgumentChecks.checkNotNull(aResults, "aResults");
>       ArgumentChecks.checkNotNull(aHandler, "aHandler");
>       out = aOut;
>       iterator = aResults;
>       handler = aHandler;
>     }
>     @Override
>     public void run() {
>       try ( AutoCloseableIterator<List<String>> iter = this.iterator; 
> OutputStream out = this.out ) {
>         writeResultSet(iter, out);
>         out.flush();
>       } catch ( Exception e ) {
>         handler.handle(e);
>       }
>     }
>     private void writeResultSet(AutoCloseableIterator<List<String>> iter, 
> OutputStream out2) throws IOException {
>       List<String> row = null;
>       while ( this.iterator.hasNext() ) {
>         try {
>           row = this.iterator.next();
>           writeRow(row);
>           rowsWritten++;
>         } catch ( Exception e ) {
>           out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an 
> exception instead of row size
>           writeException(out, e);                   // send exception to 
> client for rethrowing it there
>           throw e;                                  // stop transmission, 
> handle exception on server-side (logging)
>         }
>       }
>       LOG.info("wrote {} rows to binary stream, closing output stream", 
> rowsWritten);
>     }
>     private void writeRow(List<String> row) throws IOException {
>       out.write(ByteBuffer.allocate(4).putInt(row.size()).array());     // 
> write row size (in number of columns)
>       for ( String s : row ) {
>         if ( s == null ) { s = ""; }
>         byte[] bytes = s.getBytes();
>         out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); // 
> write size of column in bytes
>         out.write(bytes);                                               // 
> write column value
>       }
>     }
>     private void writeException(OutputStream output, Exception e) throws 
> IOException {
>       ObjectOutputStream objOut = new ObjectOutputStream(output);
>       objOut.writeObject(toString(e));
>     }
>     private String toString(Throwable e) {
>       if ( e instanceof UncheckedConnectionException && e.getCause() != null 
> ) {
>         e = e.getCause();
>       }
>       ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream();
>       PrintWriter writer = new PrintWriter(stackTraceOut);
>       e.printStackTrace(writer);
>       writer.flush();
>       return new String(stackTraceOut.toByteArray());
>     }
>   }
>   
>   private final class ResultSetIterator<T> extends ExceptionHandler 
> implements Iterator<List<T>> {
>     private final BlockingQueue<List<T>> queue;
>     private Exception exception;
>     private int returned = 0;
>     private ResultSetIterator(BlockingQueue<List<T>> aQueue) {
>       queue = aQueue;
>     }
>     /**
>      * {@inheritDoc}
>      * 
>      * This implementation marks the current thread as interrupted if 
> streaming could not be commenced. 
>      */
>     @Override
>     public boolean hasNext() {
>       if ( exception != null ) { throw new 
> UncheckedConnectionException("Exception while reading results", exception); }
>       try {
>         synchronized ( resultSet ) {
>           awaitElements();
>           if ( exception != null ) { 
>             throw new UncheckedConnectionException("Exception while reading 
> results", exception); 
>           }
>           if ( resultSet.isEmpty() ) {
>             LOG.debug("iterator returned " + returned + " rows");
>           }
>           return !resultSet.isEmpty();
>         }
>       } catch ( InterruptedException e ) {
>         Thread.currentThread().interrupt();
>         return false;
>       }
>     
>     }
>     /**
>      * {@inheritDoc}
>      * 
>      * If the current thread is interrupted during this call, it is marked as 
> interrupted and the method returns 
>      * {@code null}.
>      * 
>      * @throws NoSuchElementException if called while {@link #hasNext()} 
> returns {@code false}
>      */
>     @Override
>     public List<T> next() throws NoSuchElementException {
>       if ( exception != null ) { throw new IllegalStateException("Exception 
> while reading results", exception); }
>       if ( !hasNext() ) { throw new NoSuchElementException(); }
>       try {
>         List<T> result = queue.take();
>         synchronized ( resultSet ) { resultSet.notify(); }
>         returned++;
>         return result;
>       } catch ( InterruptedException e ) {
>         Thread.currentThread().interrupt();
>         return null;
>       }
>     }
>     /**
>      * This method is not supported.
>      */
>     @Override public void remove() { throw new 
> UnsupportedOperationException("RTFM"); }
>     @Override void handle(Exception aException) {
>       if ( exception != null ) { 
>         LOG.warn("There was another exception.", aException);
>       }
>       exception = aException;
>     }
>   }
>   
>   private abstract static class ExceptionHandler {
>     abstract void handle(Exception exception);
>   }
>   
>   private static class ServerExceptionHandler extends ExceptionHandler {
>     @Override
>     void handle(Exception aException) {
>       LOG.error(FATAL, "Exception while writing response.", aException);
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to