Attached are the current patches I have for this functionality.
I'll do a bit more testing and hope to push them early tomorrow.

thanks,
Pádraig
From 229a87ed0e45eb5f6875147c48719820291e5238 Mon Sep 17 00:00:00 2001
From: Carl Edquist <edqu...@cs.wisc.edu>
Date: Thu, 15 Dec 2022 06:10:33 -0600
Subject: [PATCH 1/4] all: add broken pipe detection while waiting for input

When a program's output becomes a broken pipe, future attempts to write
to that ouput will fail (SIGPIPE/EPIPE).  Once it is known that all
future write attepts will fail (due to broken pipes), in many cases it
becomes pointless to wait for further input for slow devices like ttys.
Ideally, a program could use this information to exit early once it is
known that future writes will fail.

Introduce iopoll() to wait on a pair of fds (input & output) for input
to become ready or output to become a broken pipe.

This is relevant when input is intermittent (a tty, pipe, or socket);
but if input is always ready (a regular file or block device), then
a read() will not block, and write failures for a broken pipe will
happen normally.

Introduce iopoll_input_ok() to check whether an input fd is relevant
for iopoll().

Experimentally, broken pipes are only detectable immediately for pipes,
but not sockets.  Errors for other file types will be detected in the
usual way, on write failure.

Introduce iopoll_output_ok() to check whether an output fd is suitable
for iopoll() -- namely, whether it is a pipe.

iopoll() is best implemented with a native poll(2) where possible, but
fall back to a select(2)-based implementation platforms where there are
portability issues.  See also discussion in tail.c.

In general, adding a call to iopoll() before a read() in filter programs
also allows broken pipes to "propagate" backwards in a shell pipeline.

* src/iopoll.c, src/iopoll.h (iopoll): New function implementing broken
pipe detection on output while waiting for input.
(IOPOLL_BROKEN_OUTPUT, IOPOLL_ERROR): Return codes for iopoll().
(IOPOLL_USES_POLL): Macro for poll() vs select() implementation.
(iopoll_input_ok): New function to check whether an input fd is relevant
for iopoll().
(iopoll_output_ok): New function to check whether an input fd is
suitable for iopoll().
* src/local.mk (noinst_HEADERS): add src/iopoll.h.
---
 src/iopoll.c | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++
 src/iopoll.h |   6 +++
 src/local.mk |   1 +
 3 files changed, 125 insertions(+)
 create mode 100644 src/iopoll.c
 create mode 100644 src/iopoll.h

diff --git a/src/iopoll.c b/src/iopoll.c
new file mode 100644
index 000000000..916241f89
--- /dev/null
+++ b/src/iopoll.c
@@ -0,0 +1,118 @@
+/* iopoll.c  -- broken pipe detection (while waiting for input)
+   Copyright (C) 1989-2022 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+   Written by Carl Edquist in collaboration with Arsen Arsenović.  */
+
+#include <config.h>
+
+  /* poll(2) is needed on AIX (where 'select' gives a readable
+     event immediately) and Solaris (where 'select' never gave
+     a readable event).  Also use poll(2) on systems we know work
+     and/or are already using poll (linux).  */
+
+#if defined _AIX || defined __sun || defined __APPLE__ || \
+    defined __linux__ || defined __ANDROID__
+# define IOPOLL_USES_POLL 1
+#endif
+
+#if IOPOLL_USES_POLL
+# include <poll.h>
+#else
+# include <sys/select.h>
+#endif
+
+#include "system.h"
+#include "iopoll.h"
+#include "isapipe.h"
+
+
+/* Wait for fdin to become ready for reading or fdout to become a broken pipe.
+   Return 0 if fdin can be read() without blocking, or IOPOLL_BROKEN_OUTPUT if
+   fdout becomes a broken pipe, otherwise IOPOLL_ERROR if there is a poll()
+   or select() error.  */
+
+#if IOPOLL_USES_POLL
+
+extern int
+iopoll (int fdin, int fdout)
+{
+  struct pollfd pfds[2] = {{fdin, POLLIN, 0}, {fdout, 0, 0}};
+
+  while (poll (pfds, 2, -1) > 0 || errno == EINTR)
+    {
+      if (errno == EINTR)
+        continue;
+      if (pfds[0].revents) /* input available or pipe closed indicating EOF; */
+        return 0;          /* should now be able to read() without blocking  */
+      if (pfds[1].revents)            /* POLLERR, POLLHUP (or POLLNVAL) */
+        return IOPOLL_BROKEN_OUTPUT;  /* output error or broken pipe    */
+    }
+  return IOPOLL_ERROR;  /* poll error */
+}
+
+#else  /* fall back to select()-based implementation */
+
+extern int
+iopoll (int fdin, int fdout)
+{
+  int nfds = (fdin > fdout ? fdin : fdout) + 1;
+  int ret = 0;
+
+  /* If fdout has an error condition (like a broken pipe) it will be seen
+     as ready for reading.  Assumes fdout is not actually readable.  */
+  while (ret >= 0 || errno == EINTR)
+    {
+      fd_set rfds;
+      FD_ZERO (&rfds);
+      FD_SET (fdin, &rfds);
+      FD_SET (fdout, &rfds);
+      ret = select (nfds, &rfds, NULL, NULL, NULL);
+
+      if (ret < 0)
+        continue;
+      if (FD_ISSET (fdin, &rfds))  /* input available or EOF; should now */
+        return 0;                  /* be able to read() without blocking */
+      if (FD_ISSET (fdout, &rfds))     /* POLLERR, POLLHUP (or POLLIN)   */
+        return IOPOLL_BROKEN_OUTPUT;   /* output error or broken pipe    */
+    }
+  return IOPOLL_ERROR;  /* select error */
+}
+
+#endif
+
+
+/* Return true if fdin is relevant for iopoll().
+   An fd is not relevant for iopoll() if it is always ready for reading,
+   which is the case for a regular file or block device.  */
+
+extern bool
+iopoll_input_ok (int fdin)
+{
+  struct stat st;
+  bool always_ready = fstat (fdin, &st) == 0
+                      && (S_ISREG (st.st_mode)
+                          || S_ISBLK (st.st_mode));
+  return ! always_ready;
+}
+
+/* Return true if fdout is suitable for iopoll().
+   Namely, fdout refers to a pipe.  */
+
+extern bool
+iopoll_output_ok (int fdout)
+{
+  return isapipe (fdout) > 0;
+}
diff --git a/src/iopoll.h b/src/iopoll.h
new file mode 100644
index 000000000..f57e3a33d
--- /dev/null
+++ b/src/iopoll.h
@@ -0,0 +1,6 @@
+#define IOPOLL_BROKEN_OUTPUT -2
+#define IOPOLL_ERROR         -3
+
+int iopoll(int fdin, int fdout);
+bool iopoll_input_ok(int fdin);
+bool iopoll_output_ok(int fdout);
diff --git a/src/local.mk b/src/local.mk
index 9d288f7b2..484ea2b37 100644
--- a/src/local.mk
+++ b/src/local.mk
@@ -51,6 +51,7 @@ noinst_HEADERS =		\
   src/fs-is-local.h		\
   src/group-list.h		\
   src/ioblksize.h		\
+  src/iopoll.h			\
   src/longlong.h		\
   src/ls.h			\
   src/operand2sig.h		\
-- 
2.26.2

From b988ef6fd0d0c031b8bac3a630107c3eebf7645e Mon Sep 17 00:00:00 2001
From: Carl Edquist <edqu...@cs.wisc.edu>
Date: Thu, 15 Dec 2022 12:32:49 -0600
Subject: [PATCH 2/4] tee: enhance -p mode using iopoll() to detect broken pipe
 outputs
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

If input is intermittent (a tty, pipe, or socket), and all remaining
outputs are pipes (eg, >(cmd) process substitutions), exit early when
they have all become broken pipes (and thus future writes will fail),
without waiting for more input to become available, as future write
attempts to these outputs will fail (SIGPIPE/EPIPE).

Only provide this enhancement when pipe errors are ignored (-p mode).

Note that only one output needs to be monitored at a time with iopoll(),
as we only want to exit early if _all_ outputs have been removed.

* src/tee.c (pipe_check): New global for iopoll mode.
(main): enable pipe_check for -p, as long as output_error ignores EPIPE,
and input is suitable for iopoll().
(get_next_out): Helper function for finding next valid output.
(fail_output, tee_files): Break out write failure/output removal logic
to helper function.
(tee_files): Add out_pollable array to track which outputs are suitable
for iopoll() (ie, that are pipes); track first output index that is
still valid; add iopoll() broken pipe detection before calling read(),
removing an output that becomes a broken pipe.
* src/local.mk (src_tee_SOURCES): include src/iopoll.c.
* NEWS: Mention tee -p enhancement in Improvements.
* doc/coreutils.texi: Mention the new early exit behavior in the nopipe
modes for the tee -p option.

Suggested-by: Arsen Arsenović <ar...@aarsen.me>
---
 NEWS               |  4 ++
 doc/coreutils.texi |  2 +
 src/iopoll.h       |  6 +--
 src/local.mk       |  1 +
 src/tee.c          | 98 ++++++++++++++++++++++++++++++++++++++++------
 5 files changed, 95 insertions(+), 16 deletions(-)

diff --git a/NEWS b/NEWS
index 7c99bea19..2694cf305 100644
--- a/NEWS
+++ b/NEWS
@@ -153,6 +153,10 @@ GNU coreutils NEWS                                    -*- outline -*-
   when their modification time doesn't change when new data is available.
   Previously tail would not show any new data in this case.
 
+  tee -p detects when all remaining outputs have become broken pipes, and
+  exits, rather than waiting for more input to induce an exit when written.
+
+
 
 * Noteworthy changes in release 9.1 (2022-04-15) [stable]
 
diff --git a/doc/coreutils.texi b/doc/coreutils.texi
index 8870dd828..18f01d4c7 100644
--- a/doc/coreutils.texi
+++ b/doc/coreutils.texi
@@ -14198,6 +14198,7 @@ This is the default @var{mode} when not specified,
 or when the short form @option{-p} is used.
 Warn on error opening or writing any output, except pipes.
 Writing is continued to still open files/pipes.
+Exit early if all remaining outputs become broken pipes.
 Exit status indicates failure if any non pipe output had an error.
 
 @item exit
@@ -14205,6 +14206,7 @@ Exit on error opening or writing any output, including pipes.
 
 @item exit-nopipe
 Exit on error opening or writing any output, except pipes.
+Exit early if all remaining outputs become broken pipes.
 @end table
 
 @end table
diff --git a/src/iopoll.h b/src/iopoll.h
index f57e3a33d..85935e960 100644
--- a/src/iopoll.h
+++ b/src/iopoll.h
@@ -1,6 +1,6 @@
 #define IOPOLL_BROKEN_OUTPUT -2
 #define IOPOLL_ERROR         -3
 
-int iopoll(int fdin, int fdout);
-bool iopoll_input_ok(int fdin);
-bool iopoll_output_ok(int fdout);
+int iopoll (int fdin, int fdout);
+bool iopoll_input_ok (int fdin);
+bool iopoll_output_ok (int fdout);
diff --git a/src/local.mk b/src/local.mk
index 484ea2b37..8269a2f68 100644
--- a/src/local.mk
+++ b/src/local.mk
@@ -396,6 +396,7 @@ src_arch_SOURCES = src/uname.c src/uname-arch.c
 src_cut_SOURCES = src/cut.c src/set-fields.c
 src_numfmt_SOURCES = src/numfmt.c src/set-fields.c
 
+src_tee_SOURCES = src/tee.c src/iopoll.c
 src_sum_SOURCES = src/sum.c src/sum.h src/digest.c
 src_sum_CPPFLAGS = -DHASH_ALGO_SUM=1 $(AM_CPPFLAGS)
 
diff --git a/src/tee.c b/src/tee.c
index b48320d22..c69ea9133 100644
--- a/src/tee.c
+++ b/src/tee.c
@@ -28,6 +28,7 @@
 #include "fadvise.h"
 #include "stdio--.h"
 #include "xbinary-io.h"
+#include "iopoll.h"
 
 /* The official name of this program (e.g., no 'g' prefix).  */
 #define PROGRAM_NAME "tee"
@@ -45,6 +46,9 @@ static bool append;
 /* If true, ignore interrupts. */
 static bool ignore_interrupts;
 
+/* If true, detect if next output becomes broken while waiting for input. */
+static bool pipe_check;
+
 enum output_error
   {
     output_error_sigpipe,      /* traditional behavior, sigpipe enabled.  */
@@ -149,6 +153,12 @@ main (int argc, char **argv)
                                       output_error_args, output_error_types);
           else
             output_error = output_error_warn_nopipe;
+
+          /* Detect and close a broken pipe output when ignoring EPIPE.  */
+          if (output_error == output_error_warn_nopipe
+              || output_error == output_error_exit_nopipe)
+            pipe_check = true;
+
           break;
 
         case_GETOPT_HELP_CHAR;
@@ -166,6 +176,10 @@ main (int argc, char **argv)
   if (output_error != output_error_sigpipe)
     signal (SIGPIPE, SIG_IGN);
 
+  /* No need to poll outputs if input is always ready for reading.  */
+  if (pipe_check && !iopoll_input_ok (STDIN_FILENO))
+    pipe_check = false;
+
   /* Do *not* warn if tee is given no file arguments.
      POSIX requires that it work when given no arguments.  */
 
@@ -176,6 +190,42 @@ main (int argc, char **argv)
   return ok ? EXIT_SUCCESS : EXIT_FAILURE;
 }
 
+
+/* Return the index of the first non-NULL descriptor after idx,
+   or -1 if all are NULL.  */
+
+static int
+get_next_out (FILE **descriptors, int nfiles, int idx)
+{
+  for (idx++; idx <= nfiles; idx++)
+    if (descriptors[idx])
+      return idx;
+  return -1;  /* no outputs remaining */
+}
+
+/* Remove descriptors[i] due to write failure or broken pipe.
+   Return true if this indicates a reportable error.  */
+
+static bool
+fail_output (FILE **descriptors, char **files, int i)
+{
+  int w_errno = errno;
+  bool fail = errno != EPIPE
+              || output_error == output_error_exit
+              || output_error == output_error_warn;
+  if (descriptors[i] == stdout)
+    clearerr (stdout); /* Avoid redundant close_stdout diagnostic.  */
+  if (fail)
+    {
+      error (output_error == output_error_exit
+             || output_error == output_error_exit_nopipe,
+             w_errno, "%s", quotef (files[i]));
+    }
+  descriptors[i] = NULL;
+  return fail;
+}
+
+
 /* Copy the standard input into each of the NFILES files in FILES
    and into the standard output.  As a side effect, modify FILES[-1].
    Return true if successful.  */
@@ -185,9 +235,11 @@ tee_files (int nfiles, char **files)
 {
   size_t n_outputs = 0;
   FILE **descriptors;
+  bool *out_pollable;
   char buffer[BUFSIZ];
   ssize_t bytes_read = 0;
   int i;
+  int first_out = 0;  /* idx of first non-NULL output in descriptors */
   bool ok = true;
   char const *mode_string =
     (O_BINARY
@@ -202,8 +254,12 @@ tee_files (int nfiles, char **files)
      In both arrays, entry 0 corresponds to standard output.  */
 
   descriptors = xnmalloc (nfiles + 1, sizeof *descriptors);
+  if (pipe_check)
+    out_pollable = xnmalloc (nfiles + 1, sizeof *out_pollable);
   files--;
   descriptors[0] = stdout;
+  if (pipe_check)
+    out_pollable[0] = iopoll_output_ok (fileno (descriptors[0]));
   files[0] = bad_cast (_("standard output"));
   setvbuf (stdout, NULL, _IONBF, 0);
   n_outputs++;
@@ -212,6 +268,8 @@ tee_files (int nfiles, char **files)
     {
       /* Do not treat "-" specially - as mandated by POSIX.  */
       descriptors[i] = fopen (files[i], mode_string);
+      if (pipe_check)
+        out_pollable[i] = iopoll_output_ok (fileno (descriptors[i]));
       if (descriptors[i] == NULL)
         {
           error (output_error == output_error_exit
@@ -228,6 +286,28 @@ tee_files (int nfiles, char **files)
 
   while (n_outputs)
     {
+      if (pipe_check && out_pollable[first_out])
+        {
+          /* Monitor for input, or errors on first valid output.  */
+          int err = iopoll (STDIN_FILENO, fileno (descriptors[first_out]));
+
+          /* Close the output if it became a broken pipe.  */
+          if (err == IOPOLL_BROKEN_OUTPUT)
+            {
+              errno = EPIPE;  /* behave like write produced EPIPE */
+              if (fail_output (descriptors, files, first_out))
+                ok = false;
+              n_outputs--;
+              first_out = get_next_out (descriptors, nfiles, first_out);
+              continue;
+            }
+          else if (err == IOPOLL_ERROR)
+            {
+              error (0, errno, _("iopoll error"));
+              ok = false;
+            }
+        }
+
       bytes_read = read (STDIN_FILENO, buffer, sizeof buffer);
       if (bytes_read < 0 && errno == EINTR)
         continue;
@@ -240,21 +320,11 @@ tee_files (int nfiles, char **files)
         if (descriptors[i]
             && fwrite (buffer, bytes_read, 1, descriptors[i]) != 1)
           {
-            int w_errno = errno;
-            bool fail = errno != EPIPE || (output_error == output_error_exit
-                                          || output_error == output_error_warn);
-            if (descriptors[i] == stdout)
-              clearerr (stdout); /* Avoid redundant close_stdout diagnostic.  */
-            if (fail)
-              {
-                error (output_error == output_error_exit
-                       || output_error == output_error_exit_nopipe,
-                       w_errno, "%s", quotef (files[i]));
-              }
-            descriptors[i] = NULL;
-            if (fail)
+            if (fail_output (descriptors, files, i))
               ok = false;
             n_outputs--;
+            if (i == first_out)
+              first_out = get_next_out (descriptors, nfiles, first_out);
           }
     }
 
@@ -273,6 +343,8 @@ tee_files (int nfiles, char **files)
       }
 
   free (descriptors);
+  if (pipe_check)
+    free (out_pollable);
 
   return ok;
 }
-- 
2.26.2

From 8d07264a8f04e0a6291d9bfbed4b193fe2bdb075 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A1draig=20Brady?= <p...@draigbrady.com>
Date: Tue, 3 Jan 2023 18:06:45 +0100
Subject: [PATCH 3/4] tests: tee -p: add test for early exit with closed pipes

* tests/misc/tee.sh: Add a test for the new iopoll logic
to detect closed outputs and exit early without needing
further input.
---
 tests/misc/tee.sh | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/tests/misc/tee.sh b/tests/misc/tee.sh
index b96523186..30d64a9d2 100755
--- a/tests/misc/tee.sh
+++ b/tests/misc/tee.sh
@@ -63,6 +63,16 @@ if test -w /dev/full && test -c /dev/full; then
   test $(wc -l < err) = 1 || { cat err; fail=1; }
 fi
 
+# Test iopoll-powered early exit for closed pipes
+tee_exited() { sleep $1; test -f tee.exited; }
+# Currently this functionality is most useful with
+# intermittent input from a terminal, but here we
+# use an input pipe that doesn't write anything
+# but will exit as soon as tee does, or it times out
+retry_delay_ tee_exited .1 7 | # 12.7s (Must be > following timeout)
+{ timeout 10 tee -p 2>err && touch tee.exited; } | :
+test $(wc -l < err) = 0 || { cat err; fail=1; }
+test -f tee.exited || fail=1
 
 # Ensure tee honors --output-error modes
 mkfifo_or_skip_ fifo
-- 
2.26.2

From 26d050d00cdb58a73d99de424b87465725ea7b46 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A1draig=20Brady?= <p...@draigbrady.com>
Date: Mon, 27 Feb 2023 18:07:06 +0000
Subject: [PATCH 4/4] doc: tee -p: clarify operation

* src/tee.c (usage): Change from describing one (non pipe) aspect
to the more general point of being the option to use if working with
pipes, and referencing the more detailed info below.
* doc/coreutils.texi (tee invocation): s/standard/appropriate/ since
the standard operation with pipes is to exit immediately upon write
error.  s/early/immediately/ as it's ambiguous as to what "early"
is in relation to.
---
 doc/coreutils.texi | 6 +++---
 src/tee.c          | 3 ++-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/doc/coreutils.texi b/doc/coreutils.texi
index 18f01d4c7..7ea910ba8 100644
--- a/doc/coreutils.texi
+++ b/doc/coreutils.texi
@@ -14179,7 +14179,7 @@ Ignore interrupt signals.
 @opindex --output-error
 Adjust the behavior with errors on the outputs.
 In summary @option{-p} allows @command{tee} to operate in a more
-standard manner with pipes, and to continue to process data
+appropriate manner with pipes, and to continue to process data
 to any remaining outputs, if any pipe outputs exit early.
 The default operation when @option{--output-error} is @emph{not}
 specified is to exit immediately on error writing to a pipe,
@@ -14198,7 +14198,7 @@ This is the default @var{mode} when not specified,
 or when the short form @option{-p} is used.
 Warn on error opening or writing any output, except pipes.
 Writing is continued to still open files/pipes.
-Exit early if all remaining outputs become broken pipes.
+Exit immediately if all remaining outputs become broken pipes.
 Exit status indicates failure if any non pipe output had an error.
 
 @item exit
@@ -14206,7 +14206,7 @@ Exit on error opening or writing any output, including pipes.
 
 @item exit-nopipe
 Exit on error opening or writing any output, except pipes.
-Exit early if all remaining outputs become broken pipes.
+Exit immediately if all remaining outputs become broken pipes.
 @end table
 
 @end table
diff --git a/src/tee.c b/src/tee.c
index c69ea9133..e328e6f04 100644
--- a/src/tee.c
+++ b/src/tee.c
@@ -96,7 +96,7 @@ Copy standard input to each FILE, and also to standard output.\n\
   -i, --ignore-interrupts   ignore interrupt signals\n\
 "), stdout);
       fputs (_("\
-  -p                        diagnose errors writing to non pipes\n\
+  -p                        operate in a more appropriate MODE with pipes.\n\
       --output-error[=MODE]   set behavior on write error.  See MODE below\n\
 "), stdout);
       fputs (HELP_OPTION_DESCRIPTION, stdout);
@@ -109,6 +109,7 @@ MODE determines behavior with write errors on the outputs:\n\
   exit           exit on error writing to any output\n\
   exit-nopipe    exit on error writing to any output not a pipe\n\
 The default MODE for the -p option is 'warn-nopipe'.\n\
+With \"nopipe\" MODEs, exit immediately if all outputs become broken pipes.\n\
 The default operation when --output-error is not specified, is to\n\
 exit immediately on error writing to a pipe, and diagnose errors\n\
 writing to non pipe outputs.\n\
-- 
2.26.2

Reply via email to