[ https://issues.apache.org/jira/browse/CXF-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15156640#comment-15156640 ]
avidd commented on CXF-6776: ---------------------------- The attached zip file contains an example that produces the described error sporadically. To do so, it transfers the same result set a thousand times. It may be required to execute the test a few times to see the error. Sometimes you will get a different error, like stream closed, missing mime attachment. However, I assume these are only consequences of the original error which is that some series of bytes seem to not be received by the client. > MTOM client receives bogus > -------------------------- > > Key: CXF-6776 > URL: https://issues.apache.org/jira/browse/CXF-6776 > Project: CXF > Issue Type: Bug > Affects Versions: 3.1.1, 3.1.5 > Environment: Windows 7 Professional 64 bit, java 1.8.0_45 > Reporter: avidd > Attachments: CXF-6776-example.zip > > > We have a SOAP web service that transmits large structured results sets (a > list of rows each of which is a List<String>) using MTOM. We have now > migrated to CXF due to some other bug in jaxws-spring. This (part of the) > code seemed to work fine in jaxws-spring but we had strange problems there > which we debugged to the lower layers as well. Now we have the situation that > some single bytes as received by the client are not the bytes that were sent > by the server. This can show as a byte having a completely wrong value or as > apparently a byte added *and* the byte beside having the wrong value. > There are test cases that return result sets of several thousand rows each of > which has several columns in it. The result set is "streamed" via MTOM. These > tests fail sporadically every 10th or 20th time. > This is what I found out until now: > * The error is reproducible on different machines. > * I first saw this error with CXF 3.1.1. Then I upgraded to 3.1.5 and it > still occurs. As said before there were similar bugs with jaxws-spring which > showed different failures. > * The server cleanly writes all results into the output and flushes the > output stream before closing it > * Both, client (a JUnit Test) and server (a Jetty server started by that > test) are running on the same machine, actually in the same VM. > * If I enable some of the logging in the below class (QueryResponse), this > seems to reduce the probability of the error. I was not able to reproduce the > error with the logging enabled. This is very frustrating and I think it hints > at some race condition. > Our streaming result encodes rows like this: > * 4 bytes stating the number of columns in that row as an int (trailing nulls > are cut off) > * for each column: 4 bytes stating the number of bytes in that column's value > as an int > * the said number of bytes, will be converted to a string > *The test* > * If the test runs fine, the client receives 2077 containing 15 columns each. > * Each column is a double or a string, it's basically the result of a simple > SQL query. (Of course, if it actually were SQL we would use JDBC. It's > actually a SAP data warehouse.) > * If the test fails, which it only does sporadically, it is always the 14th > row failing. > *Example 1 of failure* > When it is correctly received at the client side, then this is the bytes of > row 14 of test 2. > * first 4 bytes: we have 15 columns > * next 4 bytes: the first column has 2 bytes > * next 2 bytes: "BA" > {code} > [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...] > {code} > This is the erroneous bytes as received on the client side > * first 4 bytes: we have 15 columns > * next 4 bytes: the first columns has 2 bytes > * next 2 bytes: "BM" > {code} > [0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...] > {code} > At first glance, it looks like a byte was added (because of the three > consecutive 0s). So I thought it may be an off-by-one error. But when looking > at the 2's complement I see: > {code} > good: 65: 01000001 > bad: 77, 71: 01001101 0100111 > {code} > I can't see where a byte would have been inserted. It also doesn't look like > only bits were flipped. It rather looks like total bogus to me. > *Example 2 of Failure* > * The bytes sent > {code} > [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 51, > 0, 0, 0, 2, ...] > {code} > * The bytes received > {code} > XX > [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 48, > 0, 0, 0, 2,...] > {code} > Similar but different problems first occurred when we upgraded from Java 7 to > Java 8 a year ago. Before, everything was fine. At that time we were using > jaxws-spring and had the situation that sometimes the mime-boundary at the > end of the attachment was missing. We hoped that an upgrade to CXF would fix > the problem but now we even have issues with the data. > To me it looks like something is totally broken in the JRE but this only > shows up when using MTOM, so it may be some integration problem. I wonder > whether we are the only ones seeing this behavior. > This is my "streaming result set" class: > {code} > @XmlAccessorType ( XmlAccessType.NONE ) > @XmlRootElement ( name = "streamResponse" ) > @XmlType ( name = "streamResponseType" ) > public class QueryResponse { > private static final String MIME_TYPE_OCTET_STREAM = > "application/octet-stream"; > private static final Logger LOG = > LoggerFactory.getLogger(QueryResponse.class); > private static final Marker FATAL = MarkerFactory.getMarker("FATAL"); > private static final ObjectPool<String> STRINGS = ObjectPool.stringPool(); > private static final int BUFFER_SIZE = 100; > private static final int PREMATURE_END_OF_RESULT = -1; > private static final byte[] PREMATURE_END_OF_RESULT_BYTES = > ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array(); > private volatile int totalBytesRead = 0; > private volatile int rowsRead = 0; > private volatile int rowsWritten = 0; > private DataHandler results; > private BlockingQueue<List<String>> resultSet; > private ResultSetIterator<String> resultSetIterator; > private boolean receiving = true; > /** Create a new response. This constructor is used by for unmarshalling > the response. */ > QueryResponse() { } > > /** > * Create a new result set encapsulating the given components. > * @param aResults the result iterator > */ > QueryResponse(AutoCloseableIterator<List<String>> aResults) { > results = encode(aResults); > } > > @XmlElement ( required = true ) > @XmlMimeType ( MIME_TYPE_OCTET_STREAM ) > DataHandler getResults() { > return results; > } > > /** > * Set the result set, called by JAXB. > * @param aDataHandler the data handler > */ > void setResults(DataHandler aDataHandler) { > if ( aDataHandler == null ) { throw new > NullPointerException("aDataHandler"); } > if ( resultSet != null ) { throw new IllegalStateException("Result set > already exists."); } > results = aDataHandler; > // Pipelining > /* parse results and fill queue while loading from the network */ > resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE); > resultSetIterator = new ResultSetIterator<String>(resultSet); > DataHandler dataHandler = results; > try { > decode(dataHandler, resultSet, resultSetIterator); > } catch ( InterruptedException e ) { > Thread.currentThread().interrupt(); > } > } > > /** > * Called on the client side to get a streaming and blocking iterator. > * @return the result set as a blocking iterator > */ > public Iterator<List<String>> getResultSet() { > return resultSetIterator; > } > > private int fill(byte[] bytes, InputStream in) throws IOException { > int off = 0; > int readCount = 0; > do { > // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off > ) > 0 and not EOF > readCount = in.read(bytes, off, bytes.length - off); > off += readCount; > if ( readCount > 0 ) { > totalBytesRead += readCount; > } > } while ( readCount > 0 && off < bytes.length ); > if ( off > 0 && off < bytes.length ) { // end of stream is a correct > termination > try { > readCount = in.read(bytes, off, bytes.length - off); > String exception = readException(in); > throw new RuntimeException(exception); > } catch ( EOFException e ) { > // There was no exception written by the server, just a premature end > of the stream causing the client-side EOF. > throw new RuntimeException("Premature end of stream, total bytes > read: " + totalBytesRead); > } > } > return off; > } > private static void checkException(int len, InputStream in) throws > ClassNotFoundException, IOException { > if ( len == PREMATURE_END_OF_RESULT ) { > String exception = readException(in); > throw new RuntimeException(exception); > } > } > private static String readException(InputStream in) throws IOException { > ObjectInputStream objIn = new ObjectInputStream(in); > try { > Object object = objIn.readObject(); > if ( object != null ) { > return object.toString(); > } > } catch ( ClassNotFoundException e ) { > throw new RuntimeException("Could not read exception.", e); > } > return "No exception received from service after premature end of result"; > } > private DataHandler encode(AutoCloseableIterator<List<String>> aResults) { > assert ( aResults != null ); > final PipedOutputStream out = new PipedOutputStream(); > DataHandler dh = new DataHandler(new StreamDataSource(out, > MIME_TYPE_OCTET_STREAM)); > Encoder encoder = new Encoder(out, aResults, new > ServerExceptionHandler()); > new Thread(encoder).start(); > return dh; > } > > private void decode( > DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet, > ExceptionHandler exceptionHandler) > throws InterruptedException { > Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler); > new Thread(decoder).start(); > } > > private void awaitIteratorBufferNotFull() throws InterruptedException { > while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); } > } > private void awaitElements() throws InterruptedException { > while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); } > } > private static final class StreamDataSource implements DataSource { > private final String name = UUID.randomUUID().toString(); > private final InputStream in; > private final String mimeType; > private StreamDataSource(PipedOutputStream aOut, String aMimeType) { > ArgumentChecks.checkNotNull(aOut, "aOut"); > ArgumentChecks.checkNotNull(aMimeType, "aMimeType"); > try { > in = new PipedInputStream(aOut); > } catch ( IOException e ) { > throw new RuntimeException("Could not create input stream.", e); > } > mimeType = aMimeType; > } > @Override public String getName() { return name; } > @Override public String getContentType() { return mimeType; } > /** > * {@inheritDoc} > * > * This implementation violates the specification in that it is > destructive. Only the first call will return an > * appropriate input stream. > */ > @Override public InputStream getInputStream() { return in; } > @Override public OutputStream getOutputStream() { throw new > UnsupportedOperationException(); } > } > > /** > * Decodes the contents of an input stream as written by the {@link > com.tn_ag.sap.QueryResponse.Encoder} and writes > * parsed rows to a {@link java.util.Queue}. > */ > private class Decoder implements Runnable { > private final DataHandler dataHandler; > private final BlockingQueue<List<String>> resultSet; > private final ExceptionHandler exceptionHandler; > private final byte[] lenBytes = new byte[4]; > Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet, > ExceptionHandler aHandler) { > ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler"); > ArgumentChecks.checkNotNull(aResultSet, "aResultSet"); > ArgumentChecks.checkNotNull(aHandler, "aHandler"); > dataHandler = aDataHandler; > resultSet = aResultSet; > exceptionHandler = aHandler; > } > > @Override > public void run() { > InputStream in = null; > List<String> row; > int len; > try { > in = dataHandler.getInputStream(); > > while ( receiving ) { > synchronized ( resultSet ) { > receiving = fill(lenBytes, in) > 0; // read next row's > length in number of columns > len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length > to integer > if ( !receiving || len == 0 ) { break; } > checkException(len, in); // -1 signals an > exception > row = readRow(in, len); > awaitIteratorBufferNotFull(); > resultSet.put(row); > rowsRead++; > if ( rowsRead % 1000 == 0 ) { > LOG.debug("already received {} rows", rowsRead); > } > resultSet.notifyAll(); // notify waiting > consumer threads > } > } > stopReception(); > LOG.debug("received a total of {} rows.", rowsRead); > } catch ( InterruptedException e ) { > LOG.info("Result reception interrupted."); > } catch ( Exception e ) { > exceptionHandler.handle(e); > stopReception(); > } finally { > receiving = false; > try { > if ( in != null ) { in.close(); } > } catch ( IOException e ) { > exceptionHandler.handle(e); > } > } > } > private List<String> readRow(InputStream in, int len) throws IOException { > List<String> row = new ArrayList<>(len); // create list of > appropriate length > for ( int col = 0; col < len; col++ ) { // for each column > > fill(lenBytes, in); // read the value > length (fixed for some types if schema known) > int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the > length of the value as bytes to an int > final byte[] bytes = new byte[valLen]; // allocate a buffer > of exactly the required size > fill(bytes, in); > row.add(STRINGS.internalize(new String(bytes))); > } > return row; > } > private void stopReception() { > synchronized ( resultSet ) { > receiving = false; // we will stop parsing now > resultSet.notifyAll(); // consumers can now consume the > remaining elements from the queue > LOG.debug("Read " + rowsRead + " rows from binary stream"); > } > } > } > > /** > * Encodes the given result set and writes the result to an output stream. > */ > private class Encoder implements Runnable { > private final OutputStream out; > private final AutoCloseableIterator<List<String>> iterator; > private final ExceptionHandler handler; > Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults, > ExceptionHandler aHandler) { > ArgumentChecks.checkNotNull(aOut, "aOut"); > ArgumentChecks.checkNotNull(aResults, "aResults"); > ArgumentChecks.checkNotNull(aHandler, "aHandler"); > out = aOut; > iterator = aResults; > handler = aHandler; > } > @Override > public void run() { > try ( AutoCloseableIterator<List<String>> iter = this.iterator; > OutputStream out = this.out ) { > writeResultSet(iter, out); > out.flush(); > } catch ( Exception e ) { > handler.handle(e); > } > } > private void writeResultSet(AutoCloseableIterator<List<String>> iter, > OutputStream out2) throws IOException { > List<String> row = null; > while ( this.iterator.hasNext() ) { > try { > row = this.iterator.next(); > writeRow(row); > rowsWritten++; > } catch ( Exception e ) { > out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an > exception instead of row size > writeException(out, e); // send exception to > client for rethrowing it there > throw e; // stop transmission, > handle exception on server-side (logging) > } > } > LOG.info("wrote {} rows to binary stream, closing output stream", > rowsWritten); > } > private void writeRow(List<String> row) throws IOException { > out.write(ByteBuffer.allocate(4).putInt(row.size()).array()); // > write row size (in number of columns) > for ( String s : row ) { > if ( s == null ) { s = ""; } > byte[] bytes = s.getBytes(); > out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); // > write size of column in bytes > out.write(bytes); // > write column value > } > } > private void writeException(OutputStream output, Exception e) throws > IOException { > ObjectOutputStream objOut = new ObjectOutputStream(output); > objOut.writeObject(toString(e)); > } > private String toString(Throwable e) { > if ( e instanceof UncheckedConnectionException && e.getCause() != null > ) { > e = e.getCause(); > } > ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream(); > PrintWriter writer = new PrintWriter(stackTraceOut); > e.printStackTrace(writer); > writer.flush(); > return new String(stackTraceOut.toByteArray()); > } > } > > private final class ResultSetIterator<T> extends ExceptionHandler > implements Iterator<List<T>> { > private final BlockingQueue<List<T>> queue; > private Exception exception; > private int returned = 0; > private ResultSetIterator(BlockingQueue<List<T>> aQueue) { > queue = aQueue; > } > /** > * {@inheritDoc} > * > * This implementation marks the current thread as interrupted if > streaming could not be commenced. > */ > @Override > public boolean hasNext() { > if ( exception != null ) { throw new > UncheckedConnectionException("Exception while reading results", exception); } > try { > synchronized ( resultSet ) { > awaitElements(); > if ( exception != null ) { > throw new UncheckedConnectionException("Exception while reading > results", exception); > } > if ( resultSet.isEmpty() ) { > LOG.debug("iterator returned " + returned + " rows"); > } > return !resultSet.isEmpty(); > } > } catch ( InterruptedException e ) { > Thread.currentThread().interrupt(); > return false; > } > > } > /** > * {@inheritDoc} > * > * If the current thread is interrupted during this call, it is marked as > interrupted and the method returns > * {@code null}. > * > * @throws NoSuchElementException if called while {@link #hasNext()} > returns {@code false} > */ > @Override > public List<T> next() throws NoSuchElementException { > if ( exception != null ) { throw new IllegalStateException("Exception > while reading results", exception); } > if ( !hasNext() ) { throw new NoSuchElementException(); } > try { > List<T> result = queue.take(); > synchronized ( resultSet ) { resultSet.notify(); } > returned++; > return result; > } catch ( InterruptedException e ) { > Thread.currentThread().interrupt(); > return null; > } > } > /** > * This method is not supported. > */ > @Override public void remove() { throw new > UnsupportedOperationException("RTFM"); } > @Override void handle(Exception aException) { > if ( exception != null ) { > LOG.warn("There was another exception.", aException); > } > exception = aException; > } > } > > private abstract static class ExceptionHandler { > abstract void handle(Exception exception); > } > > private static class ServerExceptionHandler extends ExceptionHandler { > @Override > void handle(Exception aException) { > LOG.error(FATAL, "Exception while writing response.", aException); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)