This is an automated email from the ASF dual-hosted git repository.
tballison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new ba501e6157 fix race condition in PipesClient (#2849)
ba501e6157 is described below
commit ba501e6157a6f7e2ebc9bec0cf308633677612b4
Author: Tim Allison <[email protected]>
AuthorDate: Fri May 29 09:09:40 2026 -0400
fix race condition in PipesClient (#2849)
---
.../org/apache/tika/pipes/core/PipesClient.java | 68 ++++++++++++++++------
1 file changed, 50 insertions(+), 18 deletions(-)
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 6eeacefa05..e274117564 100644
---
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -76,7 +76,11 @@ public class PipesClient implements Closeable {
private final boolean ownsServerManager;
private final int pipesClientId;
- private ConnectionTuple connectionTuple;
+ // volatile + connectionLock: closeConnection() can be called concurrently
by the
+ // in-flight parse thread (timeout/crash paths in waitForServer) and the
thread
+ // calling close(). The lock lets one thread atomically claim and null the
tuple.
+ private final Object connectionLock = new Object();
+ private volatile ConnectionTuple connectionTuple;
private int filesProcessed = 0;
/**
@@ -118,7 +122,11 @@ public class PipesClient implements Closeable {
}
private boolean ping() {
- if (connectionTuple == null) {
+ // Snapshot the volatile once: a concurrent closeConnection() can null
the
+ // field at any point, but the local reference stays valid (close()
unblocks
+ // us by closing the socket, surfacing as IOException below - not an
NPE).
+ ConnectionTuple tuple = connectionTuple;
+ if (tuple == null) {
return false;
}
// Check if server process is still running
@@ -126,8 +134,8 @@ public class PipesClient implements Closeable {
return false;
}
try {
- PipesMessage.ping().write(connectionTuple.output);
- PipesMessage response = PipesMessage.read(connectionTuple.input);
+ PipesMessage.ping().write(tuple.output);
+ PipesMessage response = PipesMessage.read(tuple.input);
if (response.type() == PipesMessageType.PING) {
return true;
}
@@ -158,20 +166,27 @@ public class PipesClient implements Closeable {
* Server lifecycle is managed by PipesParser.
*/
private void closeConnection() throws InterruptedException {
- if (connectionTuple == null) {
+ // Atomically claim the tuple and null the field so concurrent callers
+ // (parse thread vs. close() thread) don't deref a field another thread
+ // has already nulled. Whoever loses the race sees a null tuple and
bails.
+ ConnectionTuple tuple;
+ synchronized (connectionLock) {
+ tuple = connectionTuple;
+ connectionTuple = null;
+ }
+ if (tuple == null) {
return;
}
LOG.debug("pipesClientId={}: closing connection", pipesClientId);
try {
- PipesMessage.shutDown().write(connectionTuple.output);
+ PipesMessage.shutDown().write(tuple.output);
} catch (IOException e) {
// swallow
}
List<IOException> exceptions = new ArrayList<>();
- tryToClose(connectionTuple.input, exceptions);
- tryToClose(connectionTuple.output, exceptions);
- tryToClose(connectionTuple.socket, exceptions);
- connectionTuple = null;
+ tryToClose(tuple.input, exceptions);
+ tryToClose(tuple.output, exceptions);
+ tryToClose(tuple.socket, exceptions);
}
private void tryToClose(Closeable closeable, List<IOException> exceptions)
{
@@ -292,20 +307,33 @@ public class PipesClient implements Closeable {
// Connect to server
Socket socket = serverManager.connect((int)
pipesConfig.getSocketTimeoutMs());
- connectionTuple = new ConnectionTuple(socket,
- new DataInputStream(new
BufferedInputStream(socket.getInputStream())),
- new DataOutputStream(new
BufferedOutputStream(socket.getOutputStream())));
+ synchronized (connectionLock) {
+ connectionTuple = new ConnectionTuple(socket,
+ new DataInputStream(new
BufferedInputStream(socket.getInputStream())),
+ new DataOutputStream(new
BufferedOutputStream(socket.getOutputStream())));
+ }
waitForStartup();
}
private void writeTask(FetchEmitTuple t) throws IOException {
+ ConnectionTuple tuple = connectionTuple;
+ if (tuple == null) {
+ throw new IOException("connection closed");
+ }
LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}",
pipesClientId, t.getId());
byte[] bytes = JsonPipesIpc.toBytes(t);
- PipesMessage.newRequest(bytes).write(connectionTuple.output);
+ PipesMessage.newRequest(bytes).write(tuple.output);
}
private PipesResult waitForServer(FetchEmitTuple t, IntermediateResult
intermediateResult) throws InterruptedException {
+ // Snapshot the volatile once; a concurrent close() may null the
field, but the
+ // local stays valid and its blocking read unblocks via socket close
(IOException).
+ ConnectionTuple tuple = connectionTuple;
+ if (tuple == null) {
+ return buildFatalResult(t.getId(), t.getEmitKey(),
UNSPECIFIED_CRASH,
+ intermediateResult.get());
+ }
TimeoutLimits limits = TimeoutLimits.get(t.getParseContext());
long progressTimeoutMillis = limits.getProgressTimeoutMillis();
long totalTaskTimeoutMillis = limits.getTotalTaskTimeoutMillis();
@@ -337,12 +365,12 @@ public class PipesClient implements Closeable {
intermediateResult.get());
}
try {
- PipesMessage msg = PipesMessage.read(connectionTuple.input);
+ PipesMessage msg = PipesMessage.read(tuple.input);
LOG.trace("clientId={}: received message type={} id={}",
pipesClientId, msg.type(), t.getId());
// Send ACK only for messages that require it
if (msg.type().requiresAck()) {
- PipesMessage.ack().write(connectionTuple.output);
+ PipesMessage.ack().write(tuple.output);
}
switch (msg.type()) {
@@ -429,12 +457,16 @@ public class PipesClient implements Closeable {
}
private void waitForStartup() throws IOException {
- PipesMessage msg = PipesMessage.read(connectionTuple.input);
+ ConnectionTuple tuple = connectionTuple;
+ if (tuple == null) {
+ throw new IOException("connection closed");
+ }
+ PipesMessage msg = PipesMessage.read(tuple.input);
if (msg.type() == PipesMessageType.READY) {
LOG.info("clientId={}: server successfully started",
pipesClientId);
} else if (msg.type() == PipesMessageType.STARTUP_FAILED) {
// Send ACK for startup failure
- PipesMessage.ack().write(connectionTuple.output);
+ PipesMessage.ack().write(tuple.output);
String errorMsg = new String(msg.payload(),
StandardCharsets.UTF_8);
LOG.error("clientId={}: Server failed to start: {}",
pipesClientId, errorMsg);
throw new ServerInitializationException(errorMsg);