[ 
https://issues.apache.org/jira/browse/CXF-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

avidd updated CXF-6776:
-----------------------
    Description: 
We have a SOAP web service that transmits large structured results sets (a list 
of rows each of which is a List<String>) using MTOM. We have now migrated to 
CXF due to some other bug in jaxws-spring. This (part of the) code seemed to 
work fine in jaxws-spring but we had strange problems there which we debugged 
to the lower layers as well. Now we have the situation that some single bytes 
as received by the client are not the bytes that were sent by the server. This 
can show as a byte having a completely wrong value or as apparently a byte 
added *and* the byte beside having the wrong value.

There are test cases that return result sets of several thousand rows each of 
which has several columns in it. The result set is "streamed" via MTOM. These 
tests fail sporadically every 10th or 20th time.
This is what I found out until now:
* The error is reproducible on different machines.
* I first saw this error with CXF 3.1.1. Then I upgraded to 3.1.5 and it still 
occurs. As said before there were similar bugs with jaxws-spring which showed 
different failures.
* The server cleanly writes all results into the output and flushes the output 
stream before closing it
* Both, client (a JUnit Test) and server (a Jetty server started by that test) 
are running on the same machine, actually in the same VM. 
* If I enable some of the logging in the below class (QueryResponse), this 
seems to reduce the probability of the error. I was not able to reproduce the 
error with the logging enabled. This is very frustrating and I think it hints 
at some race condition.

Our streaming result encodes rows like this:
* 4 bytes stating the number of columns in that row as an int (trailing nulls 
are cut off)
* for each column: 4 bytes stating the number of bytes in that column's value 
as an int
* the said number of bytes, will be converted to a string

*The test*
* If the test runs fine, the client receives 2077 containing 15 columns each. 
* Each column is a double or a string, it's basically the result of a simple 
SQL query. (Of course, if it actually were SQL we would use JDBC. It's actually 
a SAP data warehouse.)
* If the test fails, which it only does sporadically, it is always the 14th row 
failing.

Example of failure: 
When it is correctly received at the client side, then this is the bytes of row 
14 of test 2. 
* first 4 bytes: we have 15 columns
* next 4 bytes: the first column has 2 bytes
* next 2 bytes: "BA"
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...]
{code}

This is the erroneous bytes as received on the client side
* first 4 bytes: we have 15 columns
* next 4 bytes: the first columns has 2 bytes
* next 2 bytes: "BM"
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...]
{code}

At first glance, it looks like a byte was added (because of the three 
consecutive 0s). So I thought it may be an off-by-one error. But when looking 
at the 2's complement I see:
{code}
good: 65:     01000001
bad:  77, 71: 01001101 0100111
{code}
I can't see where a byte would have been inserted. It also doesn't look like 
only bits were flipped. It rather looks like total bogus to me.

Another example of failure is this:
* The bytes sent
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 51, 
0, 0, 0, 2, ...]
{code}
* The bytes received
{code}
                                                                          XX
[0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 48, 
0, 0, 0, 2,...]
{code}

Similar but different problems first occurred when we upgraded from Java 7 to 
Java 8 a year ago. Before, everything was fine. At that time we were using 
jaxws-spring and had the situation that sometimes the mime-boundary at the end 
of the attachment was missing. We hoped that an upgrade to CXF would fix the 
problem but now we even have issues with the data.
To me it looks like something is totally broken in the JRE but this only shows 
up when using MTOM, so it may be some integration problem. I wonder whether we 
are the only ones seeing this behavior.

This is my "streaming result set" class:

{code}
@XmlAccessorType ( XmlAccessType.NONE )
@XmlRootElement ( name = "streamResponse" )
@XmlType ( name = "streamResponseType" )
public class QueryResponse {
  private static final String MIME_TYPE_OCTET_STREAM = 
"application/octet-stream";
  private static final Logger LOG = 
LoggerFactory.getLogger(QueryResponse.class);
  private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
  private static final ObjectPool<String> STRINGS = ObjectPool.stringPool();
  private static final int BUFFER_SIZE = 100;
  private static final int PREMATURE_END_OF_RESULT = -1;
  private static final byte[] PREMATURE_END_OF_RESULT_BYTES = 
      ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array();
  private volatile int totalBytesRead = 0;
  private volatile int rowsRead = 0;
  private volatile int rowsWritten = 0;

  private DataHandler results;
  private BlockingQueue<List<String>> resultSet;
  private ResultSetIterator<String> resultSetIterator;
  private boolean receiving = true;

  /** Create a new response. This constructor is used by for unmarshalling the 
response. */
  QueryResponse() { }
  
  /**
   * Create a new result set encapsulating the given components.
   * @param aResults the result iterator
   */
  QueryResponse(AutoCloseableIterator<List<String>> aResults) {
    results = encode(aResults);
  }
  
  @XmlElement ( required = true )
  @XmlMimeType ( MIME_TYPE_OCTET_STREAM )
  DataHandler getResults() {
    return results;
  }
  
  /**
   * Set the result set, called by JAXB.
   * @param aDataHandler the data handler
   */
  void setResults(DataHandler aDataHandler) {
    if ( aDataHandler == null ) { throw new 
NullPointerException("aDataHandler"); }
    if ( resultSet != null ) { throw new IllegalStateException("Result set 
already exists."); }
    results = aDataHandler;

    // Pipelining
    /* parse results and fill queue while loading from the network */
    resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE);
    resultSetIterator = new ResultSetIterator<String>(resultSet);
    DataHandler dataHandler = results;
    try {
      decode(dataHandler, resultSet, resultSetIterator);
    } catch ( InterruptedException e ) {
      Thread.currentThread().interrupt();
    }
  }
  
  /**
   * Called on the client side to get a streaming and blocking iterator.
   * @return the result set as a blocking iterator
   */
  public Iterator<List<String>> getResultSet() {
    return resultSetIterator; 
  }
  
  private int fill(byte[] bytes, InputStream in) throws IOException {
    int off = 0;
    int readCount = 0;
    do {
      // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off ) 
> 0 and not EOF
      readCount = in.read(bytes, off, bytes.length - off);  
      off += readCount;
      if ( readCount > 0 ) {
        totalBytesRead += readCount;
      }
    } while ( readCount > 0 && off < bytes.length );
    if ( off > 0 && off < bytes.length ) { // end of stream is a correct 
termination
      try {
        readCount = in.read(bytes, off, bytes.length - off);
        String exception = readException(in);
        throw new RuntimeException(exception);
      } catch ( EOFException e ) {
        // There was no exception written by the server, just a premature end 
of the stream causing the client-side EOF.
        throw new RuntimeException("Premature end of stream, total bytes read: 
" + totalBytesRead); 
      }
    }
    return off;
  }

  private static void checkException(int len, InputStream in) throws 
ClassNotFoundException, IOException {
    if ( len == PREMATURE_END_OF_RESULT ) {
      String exception = readException(in);
      throw new RuntimeException(exception);
    }
  }

  private static String readException(InputStream in) throws IOException {
    ObjectInputStream objIn = new ObjectInputStream(in);
    try {
      Object object = objIn.readObject();
      if ( object != null ) {
        return object.toString();
      }
    } catch ( ClassNotFoundException e ) {
      throw new RuntimeException("Could not read exception.", e);
    }
    return "No exception received from service after premature end of result";
  }

  private DataHandler encode(AutoCloseableIterator<List<String>> aResults) {
    assert ( aResults != null );
    final PipedOutputStream out = new PipedOutputStream();
    DataHandler dh = new DataHandler(new StreamDataSource(out, 
MIME_TYPE_OCTET_STREAM));
    Encoder encoder = new Encoder(out, aResults, new ServerExceptionHandler());
    new Thread(encoder).start();
    return dh;
  }
  
  private void decode(
      DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet, 
ExceptionHandler exceptionHandler) 
      throws InterruptedException {
    Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler);
    new Thread(decoder).start();
  }
  
  private void awaitIteratorBufferNotFull() throws InterruptedException {
    while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); }
  }

  private void awaitElements() throws InterruptedException {
    while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); }
  }

  private static final class StreamDataSource implements DataSource {
    private final String name = UUID.randomUUID().toString();
    private final InputStream in;
    private final String mimeType;

    private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
      ArgumentChecks.checkNotNull(aOut, "aOut");
      ArgumentChecks.checkNotNull(aMimeType, "aMimeType");
      try {
        in = new PipedInputStream(aOut);
      } catch ( IOException e ) {
        throw new RuntimeException("Could not create input stream.", e);
      }
      mimeType = aMimeType;
    }

    @Override public String getName() { return name; }

    @Override public String getContentType() { return mimeType; }

    /**
     * {@inheritDoc}
     * 
     * This implementation violates the specification in that it is 
destructive. Only the first call will return an
     * appropriate input stream.
     */
    @Override public InputStream getInputStream() { return in; }

    @Override public OutputStream getOutputStream() { throw new 
UnsupportedOperationException(); }
  }
  
  /**
   * Decodes the contents of an input stream as written by the {@link 
com.tn_ag.sap.QueryResponse.Encoder} and writes
   * parsed rows to a {@link java.util.Queue}.
   */
  private class Decoder implements Runnable {
    private final DataHandler dataHandler;
    private final BlockingQueue<List<String>> resultSet;
    private final ExceptionHandler exceptionHandler;
    private final byte[] lenBytes = new byte[4];

    Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet, 
ExceptionHandler aHandler) {
      ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler");
      ArgumentChecks.checkNotNull(aResultSet, "aResultSet");
      ArgumentChecks.checkNotNull(aHandler, "aHandler");
      dataHandler = aDataHandler;
      resultSet = aResultSet;
      exceptionHandler = aHandler;
    }
    
    @Override
    public void run() {
      InputStream in = null;
      List<String> row;
      int len;
      try { 
        in = dataHandler.getInputStream();
        
        while ( receiving ) {
          synchronized ( resultSet ) {
            receiving = fill(lenBytes, in) > 0;       // read next row's length 
in number of columns 
            len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length to 
integer
            if ( !receiving || len == 0 ) { break; }
            checkException(len, in);                  // -1 signals an exception

            row = readRow(in, len);
            awaitIteratorBufferNotFull();
            resultSet.put(row);
            rowsRead++;
            if ( rowsRead % 1000 == 0 ) {
              LOG.debug("already received {} rows", rowsRead);
            }
            resultSet.notifyAll();                    // notify waiting 
consumer threads
          }
        }
        stopReception();
        LOG.debug("received a total of {} rows.", rowsRead);
      } catch ( InterruptedException e ) {
        LOG.info("Result reception interrupted.");
      } catch ( Exception e ) {
        exceptionHandler.handle(e);
        stopReception();
      } finally {
        receiving = false;
        try {
          if ( in != null ) { in.close(); }
        } catch ( IOException e ) {
          exceptionHandler.handle(e);
        }
      }
    }

    private List<String> readRow(InputStream in, int len) throws IOException {
      List<String> row = new ArrayList<>(len);           // create list of 
appropriate length
      for ( int col = 0; col < len; col++ ) {            // for each column     
         
        fill(lenBytes, in);                              // read the value 
length (fixed for some types if schema known)
        int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the length 
of the value as bytes to an int
        final byte[] bytes = new byte[valLen];           // allocate a buffer 
of exactly the required size
        fill(bytes, in);              
        row.add(STRINGS.internalize(new String(bytes)));
      }
      return row;
    }

    private void stopReception() {
      synchronized ( resultSet ) {
        receiving = false;           // we will stop parsing now
        resultSet.notifyAll();       // consumers can now consume the remaining 
elements from the queue
        LOG.debug("Read " + rowsRead + " rows from binary stream");
      }                
    }
  }
  
  /**
   * Encodes the given result set and writes the result to an output stream. 
   */
  private class Encoder implements Runnable {
    private final OutputStream out;
    private final AutoCloseableIterator<List<String>> iterator;
    private final ExceptionHandler handler;

    Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults, 
ExceptionHandler aHandler) {
      ArgumentChecks.checkNotNull(aOut, "aOut");
      ArgumentChecks.checkNotNull(aResults, "aResults");
      ArgumentChecks.checkNotNull(aHandler, "aHandler");
      out = aOut;
      iterator = aResults;
      handler = aHandler;
    }

    @Override
    public void run() {
      try ( AutoCloseableIterator<List<String>> iter = this.iterator; 
OutputStream out = this.out ) {
        writeResultSet(iter, out);
        out.flush();
      } catch ( Exception e ) {
        handler.handle(e);
      }
    }

    private void writeResultSet(AutoCloseableIterator<List<String>> iter, 
OutputStream out2) throws IOException {
      List<String> row = null;
      while ( this.iterator.hasNext() ) {
        try {
          row = this.iterator.next();
          writeRow(row);
          rowsWritten++;
        } catch ( Exception e ) {
          out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an 
exception instead of row size
          writeException(out, e);                   // send exception to client 
for rethrowing it there
          throw e;                                  // stop transmission, 
handle exception on server-side (logging)
        }
      }
      LOG.info("wrote {} rows to binary stream, closing output stream", 
rowsWritten);
    }

    private void writeRow(List<String> row) throws IOException {
      out.write(ByteBuffer.allocate(4).putInt(row.size()).array());     // 
write row size (in number of columns)
      for ( String s : row ) {
        if ( s == null ) { s = ""; }
        byte[] bytes = s.getBytes();
        out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); // 
write size of column in bytes
        out.write(bytes);                                               // 
write column value
      }
    }

    private void writeException(OutputStream output, Exception e) throws 
IOException {
      ObjectOutputStream objOut = new ObjectOutputStream(output);
      objOut.writeObject(toString(e));
    }

    private String toString(Throwable e) {
      if ( e instanceof UncheckedConnectionException && e.getCause() != null ) {
        e = e.getCause();
      }
      ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream();
      PrintWriter writer = new PrintWriter(stackTraceOut);
      e.printStackTrace(writer);
      writer.flush();
      return new String(stackTraceOut.toByteArray());
    }
  }
  
  private final class ResultSetIterator<T> extends ExceptionHandler implements 
Iterator<List<T>> {
    private final BlockingQueue<List<T>> queue;
    private Exception exception;
    private int returned = 0;

    private ResultSetIterator(BlockingQueue<List<T>> aQueue) {
      queue = aQueue;
    }

    /**
     * {@inheritDoc}
     * 
     * This implementation marks the current thread as interrupted if streaming 
could not be commenced. 
     */
    @Override
    public boolean hasNext() {
      if ( exception != null ) { throw new 
UncheckedConnectionException("Exception while reading results", exception); }
      try {
        synchronized ( resultSet ) {
          awaitElements();
          if ( exception != null ) { 
            throw new UncheckedConnectionException("Exception while reading 
results", exception); 
          }
          if ( resultSet.isEmpty() ) {
            LOG.debug("iterator returned " + returned + " rows");
          }
          return !resultSet.isEmpty();
        }
      } catch ( InterruptedException e ) {
        Thread.currentThread().interrupt();
        return false;
      }
    
    }

    /**
     * {@inheritDoc}
     * 
     * If the current thread is interrupted during this call, it is marked as 
interrupted and the method returns 
     * {@code null}.
     * 
     * @throws NoSuchElementException if called while {@link #hasNext()} 
returns {@code false}
     */
    @Override
    public List<T> next() throws NoSuchElementException {
      if ( exception != null ) { throw new IllegalStateException("Exception 
while reading results", exception); }
      if ( !hasNext() ) { throw new NoSuchElementException(); }
      try {
        List<T> result = queue.take();
        synchronized ( resultSet ) { resultSet.notify(); }
        returned++;
        return result;
      } catch ( InterruptedException e ) {
        Thread.currentThread().interrupt();
        return null;
      }
    }

    /**
     * This method is not supported.
     */
    @Override public void remove() { throw new 
UnsupportedOperationException("RTFM"); }

    @Override void handle(Exception aException) {
      if ( exception != null ) { 
        LOG.warn("There was another exception.", aException);
      }
      exception = aException;
    }
  }
  
  private abstract static class ExceptionHandler {
    abstract void handle(Exception exception);
  }
  
  private static class ServerExceptionHandler extends ExceptionHandler {
    @Override
    void handle(Exception aException) {
      LOG.error(FATAL, "Exception while writing response.", aException);
    }
  }
}
{code}





  was:
We have a SOAP web service that transmits large structured results sets (a list 
of rows each of which is a List<String>) using MTOM. We have now migrated to 
CXF due to some other bug in jaxws-spring. This (part of the) code seemed to 
work fine in jaxws-spring but we had strange problems there which we debugged 
to the lower layers as well. Now we have the situation that some single bytes 
as received by the client are not the bytes that were sent by the server. This 
can show as a byte having a completely wrong value or as apparently a byte 
added *and* the byte beside having the wrong value.

There are test cases that return result sets of several thousand rows each of 
which has several columns in it. The result set is "streamed" via MTOM. These 
tests fail sporadically every 10th or 20th time.
This is what I found out until now:
* The error is reproducible on different machines.
* I first saw this error with CXF 3.1.1. Then I upgraded to 3.1.5 and it still 
occurs. As said before there were similar bugs with jaxws-spring which showed 
different failures.
* The server cleanly writes all results into the output and flushes the output 
stream before closing it
* Both, client (a JUnit Test) and server (a Jetty server started by that test) 
are running on the same machine, actually in the same VM. 
* If I enable some of the logging in the below class (QueryResponse), this 
seems to reduce the probability of the error. I was not able to reproduce the 
error with the logging enabled. This is very frustrating and I think it hints 
at some race condition.

Our streaming result encodes rows like this:
* 4 bytes stating the number of columns in that row as an int (trailing nulls 
are cut off)
* for each column: 4 bytes stating the number of bytes in that column's value 
as an int
* the said number of bytes, will be converted to a string

*The test*
* If the test runs fine, the client receives 2077 containing 15 columns each. 
* Each column is a double or a string, it's basically the result of a simple 
SQL query. (Of course, if it actually were SQL we would use JDBC. It's actually 
a SAP data warehouse.)
* If the test fails, which it only does sporadically, it is always the 14th row 
failing.

Example of failure: 
When it is correctly received at the client side, then this is the bytes of row 
14 of test 2. 
* first 4 bytes: we have 15 columns
* next 4 bytes: the first column has 2 bytes
* next 2 bytes: "BA"
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...]
{code}

This is the erroneous bytes as received on the client side
* first 4 bytes: we have 15 columns
* next 4 bytes: the first columns has 2 bytes
* next 2 bytes: "BM"
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...]
{code}

At first glance, it looks like a byte was added (because of the three 
consecutive 0s). So I thought it may be an off-by-one error. But when looking 
at the 2's complement I see:
{code}
good: 65:     01000001
bad:  77, 71: 01001101 0100111
{code}
I can't see where a byte would have been inserted. It also doesn't look like 
only bits were flipped. It rather looks like total bogus to me.

Another example of failure is this:
* The bytes sent
{code}
[0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 51, 
0, 0, 0, 2, ...]
{code}
* The bytes received
{code}
                                                                          XX
[0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 48, 
0, 0, 0, 2,...]
{code}

Similar but different problems first occurred when we upgraded from Java 7 to 
Java 8 a year ago. Before, everything was fine. At that time we were using 
jaxws-spring and had the situation that sometimes the mime-boundary at the end 
of the attachment was missing. We hoped that an upgrade to CXF would fix the 
problem but now we even have issues with the data.
To me it looks like something is totally broken in the JRE but this only shows 
up when using MTOM, so it may be some integration problem. I wonder whether we 
are the only ones seeing this behavior.

This is my "streaming result set" class:

{code}
@XmlAccessorType ( XmlAccessType.NONE )
@XmlRootElement ( name = "streamResponse" )
@XmlType ( name = "streamResponseType" )
public class QueryResponse {
  private static final String MIME_TYPE_OCTET_STREAM = 
"application/octet-stream";
  private static final Logger LOG = 
LoggerFactory.getLogger(QueryResponse.class);
  private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
  private static final ObjectPool<String> STRINGS = ObjectPool.stringPool();
  private static final int BUFFER_SIZE = 100;
  private static final int PREMATURE_END_OF_RESULT = -1;
  private static final byte[] PREMATURE_END_OF_RESULT_BYTES = 
      ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array();
  private volatile int totalBytesRead = 0;
  private volatile int rowsRead = 0;
  private volatile int rowsWritten = 0;

  private DataHandler results;
  private BlockingQueue<List<String>> resultSet;
  private ResultSetIterator<String> resultSetIterator;
  private boolean receiving = true;

  /** Create a new response. This constructor is used by for unmarshalling the 
response. */
  QueryResponse() { }
  
  /**
   * Create a new result set encapsulating the given components.
   * @param aResults the result iterator
   */
  QueryResponse(AutoCloseableIterator<List<String>> aResults) {
    results = encode(aResults);
  }
  
  @XmlElement ( required = true )
  @XmlMimeType ( MIME_TYPE_OCTET_STREAM )
  DataHandler getResults() {
    return results;
  }
  
  /**
   * Set the result set, called by JAXB.
   * @param aDataHandler the data handler
   */
  void setResults(DataHandler aDataHandler) {
    if ( aDataHandler == null ) { throw new 
NullPointerException("aDataHandler"); }
    if ( resultSet != null ) { throw new IllegalStateException("Result set 
already exists."); }
    results = aDataHandler;

    // Pipelining
    /* parse results and fill queue while loading from the network */
    resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE);
    resultSetIterator = new ResultSetIterator<String>(resultSet);
    DataHandler dataHandler = results;
    try {
      decode(dataHandler, resultSet, resultSetIterator);
    } catch ( InterruptedException e ) {
      Thread.currentThread().interrupt();
    }
  }
  
  /**
   * Called on the client side to get a streaming and blocking iterator.
   * @return the result set as a blocking iterator
   */
  public Iterator<List<String>> getResultSet() {
    return resultSetIterator; 
  }
  
  private int fill(byte[] bytes, InputStream in) throws IOException {
    int off = 0;
    int readCount = 0;
    do {
      // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off ) 
> 0 and not EOF
      readCount = in.read(bytes, off, bytes.length - off);  
      off += readCount;
      if ( readCount > 0 ) {
        totalBytesRead += readCount;
      }
    } while ( readCount > 0 && off < bytes.length );
    if ( off > 0 && off < bytes.length ) { // end of stream is a correct 
termination
      try {
        readCount = in.read(bytes, off, bytes.length - off);
        String exception = readException(in);
        throw new RuntimeException(exception);
      } catch ( EOFException e ) {
        // There was no exception written by the server, just a premature end 
of the stream causing the client-side EOF.
        throw new RuntimeException("Premature end of stream, total bytes read: 
" + totalBytesRead); 
      }
    }
    return off;
  }

  private static void checkException(int len, InputStream in) throws 
ClassNotFoundException, IOException {
    if ( len == PREMATURE_END_OF_RESULT ) {
      String exception = readException(in);
      throw new RuntimeException(exception);
    }
  }

  private static String readException(InputStream in) throws IOException {
    ObjectInputStream objIn = new ObjectInputStream(in);
    try {
      Object object = objIn.readObject();
      if ( object != null ) {
        return object.toString();
      }
    } catch ( ClassNotFoundException e ) {
      throw new RuntimeException("Could not read exception.", e);
    }
    return "No exception received from service after premature end of result";
  }

  private DataHandler encode(AutoCloseableIterator<List<String>> aResults) {
    assert ( aResults != null );
    final PipedOutputStream out = new PipedOutputStream();
    DataHandler dh = new DataHandler(new StreamDataSource(out, 
MIME_TYPE_OCTET_STREAM));
    Encoder encoder = new Encoder(out, aResults, new ServerExceptionHandler());
    new Thread(encoder).start();
    return dh;
  }
  
  private void decode(
      DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet, 
ExceptionHandler exceptionHandler) 
      throws InterruptedException {
    Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler);
    new Thread(decoder).start();
  }
  
  private void awaitIteratorBufferNotFull() throws InterruptedException {
    while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); }
  }

  private void awaitElements() throws InterruptedException {
    while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); }
  }

  private static final class StreamDataSource implements DataSource {
    private final String name = UUID.randomUUID().toString();
    private final InputStream in;
    private final String mimeType;

    private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
      ArgumentChecks.checkNotNull(aOut, "aOut");
      ArgumentChecks.checkNotNull(aMimeType, "aMimeType");
      try {
        in = new PipedInputStream(aOut);
      } catch ( IOException e ) {
        throw new RuntimeException("Could not create input stream.", e);
      }
      mimeType = aMimeType;
    }

    @Override public String getName() { return name; }

    @Override public String getContentType() { return mimeType; }

    /**
     * {@inheritDoc}
     * 
     * This implementation violates the specification in that it is 
destructive. Only the first call will return an
     * appropriate input stream.
     */
    @Override public InputStream getInputStream() { return in; }

    @Override public OutputStream getOutputStream() { throw new 
UnsupportedOperationException(); }
  }
  
  /**
   * Decodes the contents of an input stream as written by the {@link 
com.tn_ag.sap.QueryResponse.Encoder} and writes
   * parsed rows to a {@link java.util.Queue}.
   */
  private class Decoder implements Runnable {
    private final DataHandler dataHandler;
    private final BlockingQueue<List<String>> resultSet;
    private final ExceptionHandler exceptionHandler;
    private final byte[] lenBytes = new byte[4];

    Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet, 
ExceptionHandler aHandler) {
      ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler");
      ArgumentChecks.checkNotNull(aResultSet, "aResultSet");
      ArgumentChecks.checkNotNull(aHandler, "aHandler");
      dataHandler = aDataHandler;
      resultSet = aResultSet;
      exceptionHandler = aHandler;
    }
    
    @Override
    public void run() {
      InputStream in = null;
      List<String> row;
      int len;
      try { 
        in = dataHandler.getInputStream();
        
        while ( receiving ) {
          synchronized ( resultSet ) {
            receiving = fill(lenBytes, in) > 0;       // read next row's length 
in number of columns 
            len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length to 
integer
            if ( !receiving || len == 0 ) { break; }
            checkException(len, in);                  // -1 signals an exception

            row = readRow(in, len);
            awaitIteratorBufferNotFull();
            resultSet.put(row);
            rowsRead++;
            if ( rowsRead % 1000 == 0 ) {
              LOG.debug("already received {} rows", rowsRead);
            }
            resultSet.notifyAll();                    // notify waiting 
consumer threads
          }
        }
        stopReception();
        LOG.debug("received a total of {} rows.", rowsRead);
      } catch ( InterruptedException e ) {
        LOG.info("Result reception interrupted.");
      } catch ( Exception e ) {
        exceptionHandler.handle(e);
        stopReception();
      } finally {
        receiving = false;
        try {
          if ( in != null ) { in.close(); }
        } catch ( IOException e ) {
          exceptionHandler.handle(e);
        }
      }
    }

    private List<String> readRow(InputStream in, int len) throws IOException {
      List<String> row = new ArrayList<>(len);           // create list of 
appropriate length
      for ( int col = 0; col < len; col++ ) {            // for each column     
         
        fill(lenBytes, in);                              // read the value 
length (fixed for some types if schema known)
        int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the length 
of the value as bytes to an int
        final byte[] bytes = new byte[valLen];           // allocate a buffer 
of exactly the required size
        fill(bytes, in);              
        row.add(STRINGS.internalize(new String(bytes)));
      }
      return row;
    }

    private void stopReception() {
      synchronized ( resultSet ) {
        receiving = false;           // we will stop parsing now
        resultSet.notifyAll();       // consumers can now consume the remaining 
elements from the queue
        LOG.debug("Read " + rowsRead + " rows from binary stream");
      }                
    }
  }
  
  /**
   * Encodes the given result set and writes the result to an output stream. 
   */
  private class Encoder implements Runnable {
    private final OutputStream out;
    private final AutoCloseableIterator<List<String>> iterator;
    private final ExceptionHandler handler;

    Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults, 
ExceptionHandler aHandler) {
      ArgumentChecks.checkNotNull(aOut, "aOut");
      ArgumentChecks.checkNotNull(aResults, "aResults");
      ArgumentChecks.checkNotNull(aHandler, "aHandler");
      out = aOut;
      iterator = aResults;
      handler = aHandler;
    }

    @Override
    public void run() {
      try ( AutoCloseableIterator<List<String>> iter = this.iterator; 
OutputStream out = this.out ) {
        writeResultSet(iter, out);
        out.flush();
      } catch ( Exception e ) {
        handler.handle(e);
      }
    }

    private void writeResultSet(AutoCloseableIterator<List<String>> iter, 
OutputStream out2) throws IOException {
      List<String> row = null;
      while ( this.iterator.hasNext() ) {
        try {
          row = this.iterator.next();
          writeRow(row);
          rowsWritten++;
        } catch ( Exception e ) {
          out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an 
exception instead of row size
          writeException(out, e);                   // send exception to client 
for rethrowing it there
          throw e;                                  // stop transmission, 
handle exception on server-side (logging)
        }
      }
      LOG.info("wrote {} rows to binary stream, closing output stream", 
rowsWritten);
    }

    private void writeRow(List<String> row) throws IOException {
      out.write(ByteBuffer.allocate(4).putInt(row.size()).array());     // 
write row size (in number of columns)
      for ( String s : row ) {
        if ( s == null ) { s = ""; }
        byte[] bytes = s.getBytes();
        out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); // 
write size of column in bytes
        out.write(bytes);                                               // 
write column value
      }
    }

    private void writeException(OutputStream output, Exception e) throws 
IOException {
      ObjectOutputStream objOut = new ObjectOutputStream(output);
      objOut.writeObject(toString(e));
    }

    private String toString(Throwable e) {
      if ( e instanceof UncheckedConnectionException && e.getCause() != null ) {
        e = e.getCause();
      }
      ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream();
      PrintWriter writer = new PrintWriter(stackTraceOut);
      e.printStackTrace(writer);
      writer.flush();
      return new String(stackTraceOut.toByteArray());
    }
  }
  
  private final class ResultSetIterator<T> extends ExceptionHandler implements 
Iterator<List<T>> {
    private final BlockingQueue<List<T>> queue;
    private Exception exception;
    private int returned = 0;

    private ResultSetIterator(BlockingQueue<List<T>> aQueue) {
      queue = aQueue;
    }

    /**
     * {@inheritDoc}
     * 
     * This implementation marks the current thread as interrupted if streaming 
could not be commenced. 
     */
    @Override
    public boolean hasNext() {
      if ( exception != null ) { throw new 
UncheckedConnectionException("Exception while reading results", exception); }
      try {
        synchronized ( resultSet ) {
          awaitElements();
          if ( exception != null ) { 
            throw new UncheckedConnectionException("Exception while reading 
results", exception); 
          }
          if ( resultSet.isEmpty() ) {
            LOG.debug("iterator returned " + returned + " rows");
          }
          return !resultSet.isEmpty();
        }
      } catch ( InterruptedException e ) {
        Thread.currentThread().interrupt();
        return false;
      }
    
    }

    /**
     * {@inheritDoc}
     * 
     * If the current thread is interrupted during this call, it is marked as 
interrupted and the method returns 
     * {@code null}.
     * 
     * @throws NoSuchElementException if called while {@link #hasNext()} 
returns {@code false}
     */
    @Override
    public List<T> next() throws NoSuchElementException {
      if ( exception != null ) { throw new IllegalStateException("Exception 
while reading results", exception); }
      if ( !hasNext() ) { throw new NoSuchElementException(); }
      try {
        List<T> result = queue.take();
        synchronized ( resultSet ) { resultSet.notify(); }
        returned++;
        return result;
      } catch ( InterruptedException e ) {
        Thread.currentThread().interrupt();
        return null;
      }
    }

    /**
     * This method is not supported.
     */
    @Override public void remove() { throw new 
UnsupportedOperationException("RTFM"); }

    @Override void handle(Exception aException) {
      if ( exception != null ) { 
        LOG.warn("There was another exception.", aException);
      }
      exception = aException;
    }
  }
  
  private abstract static class ExceptionHandler {
    abstract void handle(Exception exception);
  }
  
  private static class ServerExceptionHandler extends ExceptionHandler {
    @Override
    void handle(Exception aException) {
      LOG.error(FATAL, "Exception while writing response.", aException);
    }
  }
}
{code}




> MTOM client receives bogus
> --------------------------
>
>                 Key: CXF-6776
>                 URL: https://issues.apache.org/jira/browse/CXF-6776
>             Project: CXF
>          Issue Type: Bug
>    Affects Versions: 3.1.1, 3.1.5
>         Environment: Windows 7 Professional 64 bit, java 1.8.0_45
>            Reporter: avidd
>
> We have a SOAP web service that transmits large structured results sets (a 
> list of rows each of which is a List<String>) using MTOM. We have now 
> migrated to CXF due to some other bug in jaxws-spring. This (part of the) 
> code seemed to work fine in jaxws-spring but we had strange problems there 
> which we debugged to the lower layers as well. Now we have the situation that 
> some single bytes as received by the client are not the bytes that were sent 
> by the server. This can show as a byte having a completely wrong value or as 
> apparently a byte added *and* the byte beside having the wrong value.
> There are test cases that return result sets of several thousand rows each of 
> which has several columns in it. The result set is "streamed" via MTOM. These 
> tests fail sporadically every 10th or 20th time.
> This is what I found out until now:
> * The error is reproducible on different machines.
> * I first saw this error with CXF 3.1.1. Then I upgraded to 3.1.5 and it 
> still occurs. As said before there were similar bugs with jaxws-spring which 
> showed different failures.
> * The server cleanly writes all results into the output and flushes the 
> output stream before closing it
> * Both, client (a JUnit Test) and server (a Jetty server started by that 
> test) are running on the same machine, actually in the same VM. 
> * If I enable some of the logging in the below class (QueryResponse), this 
> seems to reduce the probability of the error. I was not able to reproduce the 
> error with the logging enabled. This is very frustrating and I think it hints 
> at some race condition.
> Our streaming result encodes rows like this:
> * 4 bytes stating the number of columns in that row as an int (trailing nulls 
> are cut off)
> * for each column: 4 bytes stating the number of bytes in that column's value 
> as an int
> * the said number of bytes, will be converted to a string
> *The test*
> * If the test runs fine, the client receives 2077 containing 15 columns each. 
> * Each column is a double or a string, it's basically the result of a simple 
> SQL query. (Of course, if it actually were SQL we would use JDBC. It's 
> actually a SAP data warehouse.)
> * If the test fails, which it only does sporadically, it is always the 14th 
> row failing.
> Example of failure: 
> When it is correctly received at the client side, then this is the bytes of 
> row 14 of test 2. 
> * first 4 bytes: we have 15 columns
> * next 4 bytes: the first column has 2 bytes
> * next 2 bytes: "BA"
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, ...]
> {code}
> This is the erroneous bytes as received on the client side
> * first 4 bytes: we have 15 columns
> * next 4 bytes: the first columns has 2 bytes
> * next 2 bytes: "BM"
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 77, 71, 0, 0, 0, ...]
> {code}
> At first glance, it looks like a byte was added (because of the three 
> consecutive 0s). So I thought it may be an off-by-one error. But when looking 
> at the 2's complement I see:
> {code}
> good: 65:     01000001
> bad:  77, 71: 01001101 0100111
> {code}
> I can't see where a byte would have been inserted. It also doesn't look like 
> only bits were flipped. It rather looks like total bogus to me.
> Another example of failure is this:
> * The bytes sent
> {code}
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 51, 
> 0, 0, 0, 2, ...]
> {code}
> * The bytes received
> {code}
>                                                                           XX
> [0, 0, 0, 15, 0, 0, 0, 2, 66, 65, 0, 0, 0, 1, 49, 0, 0, 0, 4, 48, 48, 51, 48, 
> 0, 0, 0, 2,...]
> {code}
> Similar but different problems first occurred when we upgraded from Java 7 to 
> Java 8 a year ago. Before, everything was fine. At that time we were using 
> jaxws-spring and had the situation that sometimes the mime-boundary at the 
> end of the attachment was missing. We hoped that an upgrade to CXF would fix 
> the problem but now we even have issues with the data.
> To me it looks like something is totally broken in the JRE but this only 
> shows up when using MTOM, so it may be some integration problem. I wonder 
> whether we are the only ones seeing this behavior.
> This is my "streaming result set" class:
> {code}
> @XmlAccessorType ( XmlAccessType.NONE )
> @XmlRootElement ( name = "streamResponse" )
> @XmlType ( name = "streamResponseType" )
> public class QueryResponse {
>   private static final String MIME_TYPE_OCTET_STREAM = 
> "application/octet-stream";
>   private static final Logger LOG = 
> LoggerFactory.getLogger(QueryResponse.class);
>   private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
>   private static final ObjectPool<String> STRINGS = ObjectPool.stringPool();
>   private static final int BUFFER_SIZE = 100;
>   private static final int PREMATURE_END_OF_RESULT = -1;
>   private static final byte[] PREMATURE_END_OF_RESULT_BYTES = 
>       ByteBuffer.allocate(4).putInt(PREMATURE_END_OF_RESULT).array();
>   private volatile int totalBytesRead = 0;
>   private volatile int rowsRead = 0;
>   private volatile int rowsWritten = 0;
>   private DataHandler results;
>   private BlockingQueue<List<String>> resultSet;
>   private ResultSetIterator<String> resultSetIterator;
>   private boolean receiving = true;
>   /** Create a new response. This constructor is used by for unmarshalling 
> the response. */
>   QueryResponse() { }
>   
>   /**
>    * Create a new result set encapsulating the given components.
>    * @param aResults the result iterator
>    */
>   QueryResponse(AutoCloseableIterator<List<String>> aResults) {
>     results = encode(aResults);
>   }
>   
>   @XmlElement ( required = true )
>   @XmlMimeType ( MIME_TYPE_OCTET_STREAM )
>   DataHandler getResults() {
>     return results;
>   }
>   
>   /**
>    * Set the result set, called by JAXB.
>    * @param aDataHandler the data handler
>    */
>   void setResults(DataHandler aDataHandler) {
>     if ( aDataHandler == null ) { throw new 
> NullPointerException("aDataHandler"); }
>     if ( resultSet != null ) { throw new IllegalStateException("Result set 
> already exists."); }
>     results = aDataHandler;
>     // Pipelining
>     /* parse results and fill queue while loading from the network */
>     resultSet = new ArrayBlockingQueue<List<String>>(BUFFER_SIZE);
>     resultSetIterator = new ResultSetIterator<String>(resultSet);
>     DataHandler dataHandler = results;
>     try {
>       decode(dataHandler, resultSet, resultSetIterator);
>     } catch ( InterruptedException e ) {
>       Thread.currentThread().interrupt();
>     }
>   }
>   
>   /**
>    * Called on the client side to get a streaming and blocking iterator.
>    * @return the result set as a blocking iterator
>    */
>   public Iterator<List<String>> getResultSet() {
>     return resultSetIterator; 
>   }
>   
>   private int fill(byte[] bytes, InputStream in) throws IOException {
>     int off = 0;
>     int readCount = 0;
>     do {
>       // reads at least one byte if ( readCount > 0 ) if ( bytes.length - off 
> ) > 0 and not EOF
>       readCount = in.read(bytes, off, bytes.length - off);  
>       off += readCount;
>       if ( readCount > 0 ) {
>         totalBytesRead += readCount;
>       }
>     } while ( readCount > 0 && off < bytes.length );
>     if ( off > 0 && off < bytes.length ) { // end of stream is a correct 
> termination
>       try {
>         readCount = in.read(bytes, off, bytes.length - off);
>         String exception = readException(in);
>         throw new RuntimeException(exception);
>       } catch ( EOFException e ) {
>         // There was no exception written by the server, just a premature end 
> of the stream causing the client-side EOF.
>         throw new RuntimeException("Premature end of stream, total bytes 
> read: " + totalBytesRead); 
>       }
>     }
>     return off;
>   }
>   private static void checkException(int len, InputStream in) throws 
> ClassNotFoundException, IOException {
>     if ( len == PREMATURE_END_OF_RESULT ) {
>       String exception = readException(in);
>       throw new RuntimeException(exception);
>     }
>   }
>   private static String readException(InputStream in) throws IOException {
>     ObjectInputStream objIn = new ObjectInputStream(in);
>     try {
>       Object object = objIn.readObject();
>       if ( object != null ) {
>         return object.toString();
>       }
>     } catch ( ClassNotFoundException e ) {
>       throw new RuntimeException("Could not read exception.", e);
>     }
>     return "No exception received from service after premature end of result";
>   }
>   private DataHandler encode(AutoCloseableIterator<List<String>> aResults) {
>     assert ( aResults != null );
>     final PipedOutputStream out = new PipedOutputStream();
>     DataHandler dh = new DataHandler(new StreamDataSource(out, 
> MIME_TYPE_OCTET_STREAM));
>     Encoder encoder = new Encoder(out, aResults, new 
> ServerExceptionHandler());
>     new Thread(encoder).start();
>     return dh;
>   }
>   
>   private void decode(
>       DataHandler dataHandler, final BlockingQueue<List<String>> aResultSet, 
> ExceptionHandler exceptionHandler) 
>       throws InterruptedException {
>     Decoder decoder = new Decoder(dataHandler, aResultSet, exceptionHandler);
>     new Thread(decoder).start();
>   }
>   
>   private void awaitIteratorBufferNotFull() throws InterruptedException {
>     while ( resultSet.remainingCapacity() == 0 ) { resultSet.wait(); }
>   }
>   private void awaitElements() throws InterruptedException {
>     while ( receiving && resultSet.isEmpty() ) { resultSet.wait(); }
>   }
>   private static final class StreamDataSource implements DataSource {
>     private final String name = UUID.randomUUID().toString();
>     private final InputStream in;
>     private final String mimeType;
>     private StreamDataSource(PipedOutputStream aOut, String aMimeType) {
>       ArgumentChecks.checkNotNull(aOut, "aOut");
>       ArgumentChecks.checkNotNull(aMimeType, "aMimeType");
>       try {
>         in = new PipedInputStream(aOut);
>       } catch ( IOException e ) {
>         throw new RuntimeException("Could not create input stream.", e);
>       }
>       mimeType = aMimeType;
>     }
>     @Override public String getName() { return name; }
>     @Override public String getContentType() { return mimeType; }
>     /**
>      * {@inheritDoc}
>      * 
>      * This implementation violates the specification in that it is 
> destructive. Only the first call will return an
>      * appropriate input stream.
>      */
>     @Override public InputStream getInputStream() { return in; }
>     @Override public OutputStream getOutputStream() { throw new 
> UnsupportedOperationException(); }
>   }
>   
>   /**
>    * Decodes the contents of an input stream as written by the {@link 
> com.tn_ag.sap.QueryResponse.Encoder} and writes
>    * parsed rows to a {@link java.util.Queue}.
>    */
>   private class Decoder implements Runnable {
>     private final DataHandler dataHandler;
>     private final BlockingQueue<List<String>> resultSet;
>     private final ExceptionHandler exceptionHandler;
>     private final byte[] lenBytes = new byte[4];
>     Decoder(DataHandler aDataHandler, BlockingQueue<List<String>> aResultSet, 
> ExceptionHandler aHandler) {
>       ArgumentChecks.checkNotNull(aDataHandler, "aDataHandler");
>       ArgumentChecks.checkNotNull(aResultSet, "aResultSet");
>       ArgumentChecks.checkNotNull(aHandler, "aHandler");
>       dataHandler = aDataHandler;
>       resultSet = aResultSet;
>       exceptionHandler = aHandler;
>     }
>     
>     @Override
>     public void run() {
>       InputStream in = null;
>       List<String> row;
>       int len;
>       try { 
>         in = dataHandler.getInputStream();
>         
>         while ( receiving ) {
>           synchronized ( resultSet ) {
>             receiving = fill(lenBytes, in) > 0;       // read next row's 
> length in number of columns 
>             len = ByteBuffer.wrap(lenBytes).getInt(); // convert row length 
> to integer
>             if ( !receiving || len == 0 ) { break; }
>             checkException(len, in);                  // -1 signals an 
> exception
>             row = readRow(in, len);
>             awaitIteratorBufferNotFull();
>             resultSet.put(row);
>             rowsRead++;
>             if ( rowsRead % 1000 == 0 ) {
>               LOG.debug("already received {} rows", rowsRead);
>             }
>             resultSet.notifyAll();                    // notify waiting 
> consumer threads
>           }
>         }
>         stopReception();
>         LOG.debug("received a total of {} rows.", rowsRead);
>       } catch ( InterruptedException e ) {
>         LOG.info("Result reception interrupted.");
>       } catch ( Exception e ) {
>         exceptionHandler.handle(e);
>         stopReception();
>       } finally {
>         receiving = false;
>         try {
>           if ( in != null ) { in.close(); }
>         } catch ( IOException e ) {
>           exceptionHandler.handle(e);
>         }
>       }
>     }
>     private List<String> readRow(InputStream in, int len) throws IOException {
>       List<String> row = new ArrayList<>(len);           // create list of 
> appropriate length
>       for ( int col = 0; col < len; col++ ) {            // for each column   
>            
>         fill(lenBytes, in);                              // read the value 
> length (fixed for some types if schema known)
>         int valLen = ByteBuffer.wrap(lenBytes).getInt(); // convert the 
> length of the value as bytes to an int
>         final byte[] bytes = new byte[valLen];           // allocate a buffer 
> of exactly the required size
>         fill(bytes, in);              
>         row.add(STRINGS.internalize(new String(bytes)));
>       }
>       return row;
>     }
>     private void stopReception() {
>       synchronized ( resultSet ) {
>         receiving = false;           // we will stop parsing now
>         resultSet.notifyAll();       // consumers can now consume the 
> remaining elements from the queue
>         LOG.debug("Read " + rowsRead + " rows from binary stream");
>       }                
>     }
>   }
>   
>   /**
>    * Encodes the given result set and writes the result to an output stream. 
>    */
>   private class Encoder implements Runnable {
>     private final OutputStream out;
>     private final AutoCloseableIterator<List<String>> iterator;
>     private final ExceptionHandler handler;
>     Encoder(OutputStream aOut, AutoCloseableIterator<List<String>> aResults, 
> ExceptionHandler aHandler) {
>       ArgumentChecks.checkNotNull(aOut, "aOut");
>       ArgumentChecks.checkNotNull(aResults, "aResults");
>       ArgumentChecks.checkNotNull(aHandler, "aHandler");
>       out = aOut;
>       iterator = aResults;
>       handler = aHandler;
>     }
>     @Override
>     public void run() {
>       try ( AutoCloseableIterator<List<String>> iter = this.iterator; 
> OutputStream out = this.out ) {
>         writeResultSet(iter, out);
>         out.flush();
>       } catch ( Exception e ) {
>         handler.handle(e);
>       }
>     }
>     private void writeResultSet(AutoCloseableIterator<List<String>> iter, 
> OutputStream out2) throws IOException {
>       List<String> row = null;
>       while ( this.iterator.hasNext() ) {
>         try {
>           row = this.iterator.next();
>           writeRow(row);
>           rowsWritten++;
>         } catch ( Exception e ) {
>           out.write(PREMATURE_END_OF_RESULT_BYTES); // write -1, signaling an 
> exception instead of row size
>           writeException(out, e);                   // send exception to 
> client for rethrowing it there
>           throw e;                                  // stop transmission, 
> handle exception on server-side (logging)
>         }
>       }
>       LOG.info("wrote {} rows to binary stream, closing output stream", 
> rowsWritten);
>     }
>     private void writeRow(List<String> row) throws IOException {
>       out.write(ByteBuffer.allocate(4).putInt(row.size()).array());     // 
> write row size (in number of columns)
>       for ( String s : row ) {
>         if ( s == null ) { s = ""; }
>         byte[] bytes = s.getBytes();
>         out.write(ByteBuffer.allocate(4).putInt(bytes.length).array()); // 
> write size of column in bytes
>         out.write(bytes);                                               // 
> write column value
>       }
>     }
>     private void writeException(OutputStream output, Exception e) throws 
> IOException {
>       ObjectOutputStream objOut = new ObjectOutputStream(output);
>       objOut.writeObject(toString(e));
>     }
>     private String toString(Throwable e) {
>       if ( e instanceof UncheckedConnectionException && e.getCause() != null 
> ) {
>         e = e.getCause();
>       }
>       ByteArrayOutputStream stackTraceOut = new ByteArrayOutputStream();
>       PrintWriter writer = new PrintWriter(stackTraceOut);
>       e.printStackTrace(writer);
>       writer.flush();
>       return new String(stackTraceOut.toByteArray());
>     }
>   }
>   
>   private final class ResultSetIterator<T> extends ExceptionHandler 
> implements Iterator<List<T>> {
>     private final BlockingQueue<List<T>> queue;
>     private Exception exception;
>     private int returned = 0;
>     private ResultSetIterator(BlockingQueue<List<T>> aQueue) {
>       queue = aQueue;
>     }
>     /**
>      * {@inheritDoc}
>      * 
>      * This implementation marks the current thread as interrupted if 
> streaming could not be commenced. 
>      */
>     @Override
>     public boolean hasNext() {
>       if ( exception != null ) { throw new 
> UncheckedConnectionException("Exception while reading results", exception); }
>       try {
>         synchronized ( resultSet ) {
>           awaitElements();
>           if ( exception != null ) { 
>             throw new UncheckedConnectionException("Exception while reading 
> results", exception); 
>           }
>           if ( resultSet.isEmpty() ) {
>             LOG.debug("iterator returned " + returned + " rows");
>           }
>           return !resultSet.isEmpty();
>         }
>       } catch ( InterruptedException e ) {
>         Thread.currentThread().interrupt();
>         return false;
>       }
>     
>     }
>     /**
>      * {@inheritDoc}
>      * 
>      * If the current thread is interrupted during this call, it is marked as 
> interrupted and the method returns 
>      * {@code null}.
>      * 
>      * @throws NoSuchElementException if called while {@link #hasNext()} 
> returns {@code false}
>      */
>     @Override
>     public List<T> next() throws NoSuchElementException {
>       if ( exception != null ) { throw new IllegalStateException("Exception 
> while reading results", exception); }
>       if ( !hasNext() ) { throw new NoSuchElementException(); }
>       try {
>         List<T> result = queue.take();
>         synchronized ( resultSet ) { resultSet.notify(); }
>         returned++;
>         return result;
>       } catch ( InterruptedException e ) {
>         Thread.currentThread().interrupt();
>         return null;
>       }
>     }
>     /**
>      * This method is not supported.
>      */
>     @Override public void remove() { throw new 
> UnsupportedOperationException("RTFM"); }
>     @Override void handle(Exception aException) {
>       if ( exception != null ) { 
>         LOG.warn("There was another exception.", aException);
>       }
>       exception = aException;
>     }
>   }
>   
>   private abstract static class ExceptionHandler {
>     abstract void handle(Exception exception);
>   }
>   
>   private static class ServerExceptionHandler extends ExceptionHandler {
>     @Override
>     void handle(Exception aException) {
>       LOG.error(FATAL, "Exception while writing response.", aException);
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to