This is an automated email from the ASF dual-hosted git repository.

tballison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new ba501e6157 fix race condition in PipesClient (#2849)
ba501e6157 is described below

commit ba501e6157a6f7e2ebc9bec0cf308633677612b4
Author: Tim Allison <[email protected]>
AuthorDate: Fri May 29 09:09:40 2026 -0400

    fix race condition in PipesClient (#2849)
---
 .../org/apache/tika/pipes/core/PipesClient.java    | 68 ++++++++++++++++------
 1 file changed, 50 insertions(+), 18 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 6eeacefa05..e274117564 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -76,7 +76,11 @@ public class PipesClient implements Closeable {
     private final boolean ownsServerManager;
     private final int pipesClientId;
 
-    private ConnectionTuple connectionTuple;
+    // volatile + connectionLock: closeConnection() can be called concurrently 
by the
+    // in-flight parse thread (timeout/crash paths in waitForServer) and the 
thread
+    // calling close(). The lock lets one thread atomically claim and null the 
tuple.
+    private final Object connectionLock = new Object();
+    private volatile ConnectionTuple connectionTuple;
     private int filesProcessed = 0;
 
     /**
@@ -118,7 +122,11 @@ public class PipesClient implements Closeable {
     }
 
     private boolean ping() {
-        if (connectionTuple == null) {
+        // Snapshot the volatile once: a concurrent closeConnection() can null 
the
+        // field at any point, but the local reference stays valid (close() 
unblocks
+        // us by closing the socket, surfacing as IOException below - not an 
NPE).
+        ConnectionTuple tuple = connectionTuple;
+        if (tuple == null) {
             return false;
         }
         // Check if server process is still running
@@ -126,8 +134,8 @@ public class PipesClient implements Closeable {
             return false;
         }
         try {
-            PipesMessage.ping().write(connectionTuple.output);
-            PipesMessage response = PipesMessage.read(connectionTuple.input);
+            PipesMessage.ping().write(tuple.output);
+            PipesMessage response = PipesMessage.read(tuple.input);
             if (response.type() == PipesMessageType.PING) {
                 return true;
             }
@@ -158,20 +166,27 @@ public class PipesClient implements Closeable {
      * Server lifecycle is managed by PipesParser.
      */
     private void closeConnection() throws InterruptedException {
-        if (connectionTuple == null) {
+        // Atomically claim the tuple and null the field so concurrent callers
+        // (parse thread vs. close() thread) don't deref a field another thread
+        // has already nulled. Whoever loses the race sees a null tuple and 
bails.
+        ConnectionTuple tuple;
+        synchronized (connectionLock) {
+            tuple = connectionTuple;
+            connectionTuple = null;
+        }
+        if (tuple == null) {
             return;
         }
         LOG.debug("pipesClientId={}: closing connection", pipesClientId);
         try {
-            PipesMessage.shutDown().write(connectionTuple.output);
+            PipesMessage.shutDown().write(tuple.output);
         } catch (IOException e) {
             // swallow
         }
         List<IOException> exceptions = new ArrayList<>();
-        tryToClose(connectionTuple.input, exceptions);
-        tryToClose(connectionTuple.output, exceptions);
-        tryToClose(connectionTuple.socket, exceptions);
-        connectionTuple = null;
+        tryToClose(tuple.input, exceptions);
+        tryToClose(tuple.output, exceptions);
+        tryToClose(tuple.socket, exceptions);
     }
 
     private void tryToClose(Closeable closeable, List<IOException> exceptions) 
{
@@ -292,20 +307,33 @@ public class PipesClient implements Closeable {
         // Connect to server
         Socket socket = serverManager.connect((int) 
pipesConfig.getSocketTimeoutMs());
 
-        connectionTuple = new ConnectionTuple(socket,
-                new DataInputStream(new 
BufferedInputStream(socket.getInputStream())),
-                new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream())));
+        synchronized (connectionLock) {
+            connectionTuple = new ConnectionTuple(socket,
+                    new DataInputStream(new 
BufferedInputStream(socket.getInputStream())),
+                    new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream())));
+        }
 
         waitForStartup();
     }
 
     private void writeTask(FetchEmitTuple t) throws IOException {
+        ConnectionTuple tuple = connectionTuple;
+        if (tuple == null) {
+            throw new IOException("connection closed");
+        }
         LOG.debug("pipesClientId={}: sending NEW_REQUEST for id={}", 
pipesClientId, t.getId());
         byte[] bytes = JsonPipesIpc.toBytes(t);
-        PipesMessage.newRequest(bytes).write(connectionTuple.output);
+        PipesMessage.newRequest(bytes).write(tuple.output);
     }
 
     private PipesResult waitForServer(FetchEmitTuple t, IntermediateResult 
intermediateResult) throws InterruptedException {
+        // Snapshot the volatile once; a concurrent close() may null the 
field, but the
+        // local stays valid and its blocking read unblocks via socket close 
(IOException).
+        ConnectionTuple tuple = connectionTuple;
+        if (tuple == null) {
+            return buildFatalResult(t.getId(), t.getEmitKey(), 
UNSPECIFIED_CRASH,
+                    intermediateResult.get());
+        }
         TimeoutLimits limits = TimeoutLimits.get(t.getParseContext());
         long progressTimeoutMillis = limits.getProgressTimeoutMillis();
         long totalTaskTimeoutMillis = limits.getTotalTaskTimeoutMillis();
@@ -337,12 +365,12 @@ public class PipesClient implements Closeable {
                         intermediateResult.get());
             }
             try {
-                PipesMessage msg = PipesMessage.read(connectionTuple.input);
+                PipesMessage msg = PipesMessage.read(tuple.input);
                 LOG.trace("clientId={}: received message type={} id={}", 
pipesClientId, msg.type(), t.getId());
 
                 // Send ACK only for messages that require it
                 if (msg.type().requiresAck()) {
-                    PipesMessage.ack().write(connectionTuple.output);
+                    PipesMessage.ack().write(tuple.output);
                 }
 
                 switch (msg.type()) {
@@ -429,12 +457,16 @@ public class PipesClient implements Closeable {
     }
 
     private void waitForStartup() throws IOException {
-        PipesMessage msg = PipesMessage.read(connectionTuple.input);
+        ConnectionTuple tuple = connectionTuple;
+        if (tuple == null) {
+            throw new IOException("connection closed");
+        }
+        PipesMessage msg = PipesMessage.read(tuple.input);
         if (msg.type() == PipesMessageType.READY) {
             LOG.info("clientId={}: server successfully started", 
pipesClientId);
         } else if (msg.type() == PipesMessageType.STARTUP_FAILED) {
             // Send ACK for startup failure
-            PipesMessage.ack().write(connectionTuple.output);
+            PipesMessage.ack().write(tuple.output);
             String errorMsg = new String(msg.payload(), 
StandardCharsets.UTF_8);
             LOG.error("clientId={}: Server failed to start: {}", 
pipesClientId, errorMsg);
             throw new ServerInitializationException(errorMsg);

Reply via email to