This is an automated email from the ASF dual-hosted git repository.
lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new a0fcbf8 [SSHD-1022] Fixed NPE SftpOutputStreamAsync#flush() if
invoked with no bytes written in-between
a0fcbf8 is described below
commit a0fcbf8e8756e2de6c39c25c9c9959e6c7306e3c
Author: Lyor Goldstein <[email protected]>
AuthorDate: Thu Jun 25 21:22:38 2020 +0300
[SSHD-1022] Fixed NPE SftpOutputStreamAsync#flush() if invoked with no
bytes written in-between
---
CHANGES.md | 2 +
.../subsystem/sftp/impl/SftpOutputStreamAsync.java | 131 ++++++++++++++++-----
.../sshd/client/subsystem/sftp/SftpTest.java | 43 ++++++-
3 files changed, 146 insertions(+), 30 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 930ad69..24b15a3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,3 +13,5 @@
## Minor code helpers
## Behavioral changes and enhancements
+
+* [SSHD-1022](https://issues.apache.org/jira/browse/SSHD-1022) NPE in
`SftpOutputStreamAsync#flush()` if no data written in between.
diff --git
a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
index b5b809c..b442ffe 100644
---
a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
+++
b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
@@ -33,6 +33,8 @@ import org.apache.sshd.common.subsystem.sftp.SftpConstants;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.OutputStreamWithChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements an output stream for a given remote file
@@ -40,6 +42,7 @@ import org.apache.sshd.common.util.io.OutputStreamWithChannel;
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
public class SftpOutputStreamAsync extends OutputStreamWithChannel {
+ protected final Logger log;
protected final byte[] bb = new byte[1];
protected final int bufferSize;
protected Buffer buffer;
@@ -47,12 +50,13 @@ public class SftpOutputStreamAsync extends
OutputStreamWithChannel {
protected long offset;
protected final Deque<SftpAckData> pendingWrites = new LinkedList<>();
- private final AbstractSftpClient client;
+ private final AbstractSftpClient clientInstance;
private final String path;
public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, Collection<OpenMode> mode)
throws IOException {
- this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.log = LoggerFactory.getLogger(getClass());
+ this.clientInstance = Objects.requireNonNull(client, "No SFTP client
instance");
this.path = path;
this.handle = client.open(path, mode);
this.bufferSize = bufferSize;
@@ -60,7 +64,8 @@ public class SftpOutputStreamAsync extends
OutputStreamWithChannel {
public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, CloseableHandle handle) throws
IOException {
- this.client = Objects.requireNonNull(client, "No SFTP client
instance");
+ this.log = LoggerFactory.getLogger(getClass());
+ this.clientInstance = Objects.requireNonNull(client, "No SFTP client
instance");
this.path = path;
this.handle = handle;
this.bufferSize = bufferSize;
@@ -72,7 +77,7 @@ public class SftpOutputStreamAsync extends
OutputStreamWithChannel {
* @return {@link SftpClient} instance used to access the remote file
*/
public final AbstractSftpClient getClient() {
- return client;
+ return clientInstance;
}
public void setOffset(long offset) {
@@ -102,23 +107,39 @@ public class SftpOutputStreamAsync extends
OutputStreamWithChannel {
@Override
public void write(byte[] b, int off, int len) throws IOException {
byte[] id = handle.getIdentifier();
+ SftpClient client = getClient();
Session session = client.getSession();
+ boolean traceEnabled = log.isTraceEnabled();
+ int writtenCount = 0;
+ int totalLen = len;
do {
if (buffer == null) {
+ if (traceEnabled) {
+ log.trace("write({}) allocate buffer size={} after {}/{}
bytes",
+ this, bufferSize, writtenCount, totalLen);
+ }
+
buffer =
session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
int hdr = 9 + 16 + 8 + id.length + buffer.wpos();
buffer.rpos(hdr);
buffer.wpos(hdr);
}
+
int max = bufferSize - (9 + 16 + id.length + 72);
int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos()));
buffer.putRawBytes(b, off, nb);
+
+ off += nb;
+ len -= nb;
+ writtenCount += nb;
+
if (buffer.available() == max) {
+ if (traceEnabled) {
+ log.trace("write({}) flush after {}/{} bytes", this,
writtenCount, totalLen);
+ }
flush();
}
- off += nb;
- len -= nb;
} while (len > 0);
}
@@ -128,19 +149,43 @@ public class SftpOutputStreamAsync extends
OutputStreamWithChannel {
throw new IOException("flush(" + getPath() + ") stream is closed");
}
- for (;;) {
+ boolean debugEnabled = log.isDebugEnabled();
+ AbstractSftpClient client = getClient();
+ for (int ackIndex = 0;;) {
SftpAckData ack = pendingWrites.peek();
- if (ack != null) {
- Buffer response = client.receive(ack.id, 0L);
- if (response != null) {
- pendingWrites.removeFirst();
- client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE,
response);
- } else {
- break;
+ if (ack == null) {
+ if (debugEnabled) {
+ log.debug("flush({}) processed {} pending writes", this,
ackIndex);
}
- } else {
break;
}
+
+ ackIndex++;
+ if (debugEnabled) {
+ log.debug("flush({}) waiting for ack #{}: {}", this, ackIndex,
ack);
+ }
+
+ Buffer response = client.receive(ack.id, 0L);
+ if (response == null) {
+ if (debugEnabled) {
+ log.debug("flush({}) no response for ack #{}: {}", this,
ackIndex, ack);
+ }
+ break;
+ }
+
+ if (debugEnabled) {
+ log.debug("flush({}) processing ack #{}: {}", this, ackIndex,
ack);
+ }
+
+ ack = pendingWrites.removeFirst();
+ client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+ }
+
+ if (buffer == null) {
+ if (debugEnabled) {
+ log.debug("flush({}) no pending buffer to flush", this);
+ }
+ return;
}
byte[] id = handle.getIdentifier();
@@ -163,7 +208,11 @@ public class SftpOutputStreamAsync extends
OutputStreamWithChannel {
}
int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
- pendingWrites.add(new SftpAckData(reqId, offset, avail));
+ SftpAckData ack = new SftpAckData(reqId, offset, avail);
+ if (debugEnabled) {
+ log.debug("flush({}) enueue pending ack={}", this, ack);
+ }
+ pendingWrites.add(ack);
offset += avail;
buffer = null;
@@ -171,23 +220,51 @@ public class SftpOutputStreamAsync extends
OutputStreamWithChannel {
@Override
public void close() throws IOException {
- if (isOpen()) {
+ if (!isOpen()) {
+ return;
+ }
+
+ try {
+ boolean debugEnabled = log.isDebugEnabled();
+
try {
- try {
- if ((buffer != null) && (buffer.available() > 0)) {
- flush();
+ int pendingSize = (buffer == null) ? 0 : buffer.available();
+ if (pendingSize > 0) {
+ if (debugEnabled) {
+ log.debug("close({}) flushing {} pending bytes", this,
pendingSize);
}
- while (!pendingWrites.isEmpty()) {
- SftpAckData ack = pendingWrites.removeFirst();
- Buffer response = client.receive(ack.id);
-
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
+ flush();
+ }
+
+ AbstractSftpClient client = getClient();
+ for (int ackIndex = 1; !pendingWrites.isEmpty(); ackIndex++) {
+ SftpAckData ack = pendingWrites.removeFirst();
+ if (debugEnabled) {
+ log.debug("close({}) processing ack #{}: {}", this,
ackIndex, ack);
}
- } finally {
- handle.close();
+
+ Buffer response = client.receive(ack.id);
+ if (debugEnabled) {
+ log.debug("close({}) processing ack #{} response for
{}", this, ackIndex, ack);
+ }
+ client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE,
response);
}
} finally {
- handle = null;
+ if (debugEnabled) {
+ log.debug("close({}) closing file handle", this);
+ }
+ handle.close();
}
+ } finally {
+ handle = null;
}
}
+
+ @Override
+ public String toString() {
+ SftpClient client = getClient();
+ return getClass().getSimpleName()
+ + "[" + client.getSession() + "]"
+ + "[" + getPath() + "]";
+ }
}
diff --git
a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
index e155ff1..35d6e5e 100644
---
a/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
+++
b/sshd-sftp/src/test/java/org/apache/sshd/client/subsystem/sftp/SftpTest.java
@@ -67,6 +67,7 @@ import
org.apache.sshd.client.subsystem.sftp.extensions.BuiltinSftpClientExtensi
import org.apache.sshd.client.subsystem.sftp.extensions.SftpClientExtension;
import org.apache.sshd.client.subsystem.sftp.impl.AbstractSftpClient;
import org.apache.sshd.client.subsystem.sftp.impl.DefaultCloseableHandle;
+import org.apache.sshd.client.subsystem.sftp.impl.SftpOutputStreamAsync;
import org.apache.sshd.common.Factory;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.OptionalFeature;
@@ -1608,14 +1609,14 @@ public class SftpTest extends
AbstractSftpClientTestSupport {
protected void sendFile(String path, String data) throws Exception {
ChannelSftp c = (ChannelSftp)
session.openChannel(SftpConstants.SFTP_SUBSYSTEM_NAME);
c.connect();
- try {
- c.put(new
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)), path);
+ try (InputStream srcStream = new
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) {
+ c.put(srcStream, path);
} finally {
c.disconnect();
}
}
- private String randomString(int size) {
+ private static String randomString(int size) {
StringBuilder sb = new StringBuilder(size);
for (int i = 0; i < size; i++) {
sb.append((char) ((i % 10) + '0'));
@@ -1623,6 +1624,42 @@ public class SftpTest extends
AbstractSftpClientTestSupport {
return sb.toString();
}
+ @Test // see SSHD-1022
+ public void testFlushOutputStreamWithoutWrite() throws Exception {
+ Path targetPath = detectTargetFolder();
+ Path lclSftp = CommonTestSupportUtils.resolve(
+ targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME,
getClass().getSimpleName(), getCurrentTestName());
+ CommonTestSupportUtils.deleteRecursive(lclSftp);
+
+ Path parentPath = targetPath.getParent();
+ Path clientFolder =
assertHierarchyTargetFolderExists(lclSftp.resolve("client"));
+ try (ClientSession session = client.connect(getCurrentTestName(),
TEST_LOCALHOST, port)
+ .verify(CONNECT_TIMEOUT).getSession()) {
+ session.addPasswordIdentity(getCurrentTestName());
+ session.auth().verify(AUTH_TIMEOUT);
+
+ try (SftpClient sftp = createSftpClient(session)) {
+ Path file = clientFolder.resolve("file.txt");
+ String filePath =
CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, file);
+ try (OutputStream os = sftp.write(filePath,
SftpClient.MIN_WRITE_BUFFER_SIZE)) {
+
assertObjectInstanceOf(SftpOutputStreamAsync.class.getSimpleName(),
SftpOutputStreamAsync.class, os);
+
+ for (int index = 1; index <= 5; index++) {
+ outputDebugMessage("%s - pre write flush attempt #%d",
getCurrentTestName(), index);
+ os.flush();
+ }
+
+ os.write((getCurrentTestName() +
"\n").getBytes(StandardCharsets.UTF_8));
+
+ for (int index = 1; index <= 5; index++) {
+ outputDebugMessage("%s - post write flush attempt
#%d", getCurrentTestName(), index);
+ os.flush();
+ }
+ }
+ }
+ }
+ }
+
static class LinkData {
private final Path source;
private final Path target;