On Sun, 15 Jul 2018 at 13:20, Thomas Koenig <tkoe...@netcologne.de> wrote:

> So, here is the final version. I would really like to get this
> into trunk, and out of the way, so Nicolas and I can focus on
> other things.
>
> So, OK?

[I know i'm late as it was already applied]

For me it would be easier to read the locking if struct async_unit had
it's queue_lock named q_lock/qlock instead of plain lock.
The io_lock is named nicely already.

Furthermore there is a mixture of (correctly wrapped) __gthread_ in
struct adv_cond versus unwrapped pthread_mutex_t in struct async_unit
where i'd have expected the latter to also use the __gthread wrappers.

struct adv_cond member pending should not be an int but an unsigned
int or, even better, a bool, i'd say.

transfer_array_inner () is named unintuitively IMHO. Maybe
transfer_array_now, transfer_array_scalar or transfer_array_1.

Index: libgfortran/io/async.c
===================================================================
--- libgfortran/io/async.c      (nicht existent)
+++ libgfortran/io/async.c      (Arbeitskopie)
[]
+static void
+update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
+  st_parameter_dt *temp;
+  NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
+  temp = *old;
+  *old = new;
+  if (temp)
+    free (temp);
+}

free (NULL) is perfectly valid, please remove the if.


+static void *
+async_io (void *arg)
+{
[]
+  while (true)
+    {
+      /* Main loop.  At this point, au->lock is always held. */

dot space space at the end of a sentence please.
[]
+      while (ctq)
+       {
+         if (prev)
+           free (prev);

Likewise, drop if.

+         prev = ctq;
+         if (!au->error.has_error)

I'd flag that as likely.

Likewise, i'd flag finish_thread as unlikely; Being a label you can
hint the predictor that it's rather unlikely jumped to by flagging it
cold:

finish_thread: __attribute__((cold));

+/* Initialize an asyncronous unit, returning zero on success,
+ nonzero on failure.  It also sets u->au.  */
+
+void
+init_async_unit (gfc_unit *u)

s/asyncronous/asynchronous/

+{
+  async_unit *au;
+  if (!__gthread_active_p ())
+    {
+      u->au = NULL;
+      return;
+    }
+

Excess horizontal space on the empty line above.

+  au = (async_unit *) xmalloc (sizeof (async_unit));

I'd XCNEW (async_unit) and omit all those NULL and 0 stores.
You should use the scalar allocators provided in include/libiberty.h
throughout, so s/xmalloc/XNEW/ and s/free/XDELETE/ and so on.

+/* Enqueue a transfer statement.  */
+
+void
+enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
+{
+  transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
+  tq->arg = *arg;

boom on OOM. XCNEW (transfer_queue), please.

+  tq->arg = *arg;
+  tq->type = type;
+  tq->has_id = 0;

redundant store to has_id.

+  LOCK (&au->lock);
+  if (!au->tail)
+    au->head = tq;
+  else
+    au->tail->next = tq;
+  au->tail = tq;
+  REVOKE_SIGNAL (&(au->emptysignal));
+  au->empty = false;
+  UNLOCK (&au->lock);
+  SIGNAL (&au->work);
+}

+/* Enqueue an st_write_done or st_read_done which contains an ID.  */
+
+int
+enqueue_done_id (async_unit *au, enum aio_do type)
+{
+  int ret;
+  transfer_queue *tq = calloc (sizeof (transfer_queue), 1);

XCNEW.

+/* Enqueue an st_write_done or st_read_done without an ID.  */
+
+void
+enqueue_done (async_unit *au, enum aio_do type)
+{
+  transfer_queue *tq = calloc (sizeof (transfer_queue), 1);

XCNEW.

+  tq->type = type;
+  tq->has_id = 0;

Redundant store to has_id. Maybe just comment it out if you do want to
emphasis this side-effect of zeroing.

/* tq->has_id = 0; already done by XCNEW */
or the like.

+/* Enqueue a CLOSE statement.  */
+
+void
+enqueue_close (async_unit *au)
+{
+  transfer_queue *tq = calloc (sizeof (transfer_queue), 1);

XCNEW.

And i think enqueue_close does not need internal_proto but could be
and should be static.
Or, even better, remove it completely and call
  enqueue_done (au, AIO_CLOSE)
in async_close directly.

+/* The asynchronous unit keeps the currently active PDT around.
+   This function changes that to the current one.  */
+
+void
+enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
+{
+  st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));

XNEW (st_parameter_dt);

+  transfer_queue *tq = xmalloc (sizeof (transfer_queue));

XNEW (transfer_queue);
+
+  memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
+
+  NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
+  NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
+  tq->next = NULL;
+  tq->type = AIO_DATA_TRANSFER_INIT;
+  tq->read_flag = read_flag;
+  tq->has_id = 0;

ah, that should be bool, not _Bool and hence s/0/false/ and s/1/true/
here and elsewhere when storing to has_id.

since read_flag seems to be a boolean too, i'd
  unsigned has_id : 1;
  unsiged read_flag : 1;
fwiw.

+  tq->new_pdt = new;
+  LOCK (&au->lock);
+
+  if (!au->tail)
+    au->head = tq;
+  else
+    au->tail->next = tq;
+  au->tail = tq;
+  REVOKE_SIGNAL (&(au->emptysignal));
+  au->empty = 0;

s/0/false/ please.

+  UNLOCK (&au->lock);
+  SIGNAL (&au->work);
+}

+/* Perform a wait operation on an asynchronous unit with an ID specified,
+   which means collecting the errors that may have happened asynchronously.
+   Return true if an error has been encountered.  */
+
+bool
+async_wait_id (st_parameter_common *cmp, async_unit *au, int i)

I'd rename the parameter i to id for clarity.

+  WAIT_SIGNAL_MUTEX (&(au->id.done),
+                    (au->id.low >= au->id.waiting || au->empty), &au->lock);

I'd test au->empty first.

Not sure why it's ok to clear has_error in collect_async_errors --
especially without locking -- but i guess you tested such async
failure conditions in both async_wait_id and async_wait.


Index: libgfortran/io/async.h
===================================================================
--- libgfortran/io/async.h      (nicht existent)
+++ libgfortran/io/async.h      (Arbeitskopie)
@@ -0,0 +1,378 @@

+/* Thread - local storage of the current unit we are looking at. Needed for
+   error reporting.  */

dot space space at the end of a sentence.

The layout of struct async_unit does not look too good on a 64bit box.

+bool collect_async_errors (st_parameter_common *, async_unit *);
+internal_proto (collect_async_errors);

superfluous trailing space

Index: libgfortran/io/file_pos.c
===================================================================
--- libgfortran/io/file_pos.c   (Revision 259739)
+++ libgfortran/io/file_pos.c   (Arbeitskopie)

@@ -267,8 +280,13 @@ st_backspace (st_parameter_filepos *fpp)

  done:
   if (u != NULL)
-    unlock_unit (u);
+    {
+      unlock_unit (u);

+      if (u->au && needs_unlock)
+       UNLOCK (&u->au->io_lock);
+    }
+
   library_end ();
 }

in st_backspace you first unlock the unit and only afterwards unlock
the async io_lock.
I would settle on first unlocking the async io_lock and only then
unlocking the unit, no?

@@ -376,9 +406,12 @@ st_endfile (st_parameter_filepos *fpp)
        }
     }

-  done:
-    unlock_unit (u);
+ done:
+  if (u->au && needs_unlock)
+    UNLOCK (&u->au->io_lock);

+  unlock_unit (u);
+
   library_end ();
 }

like you do here, in st_endfile.

Here in st_endfile, why do you async_wait before the
LIBERROR_OPTION_CONFLICT handling by
       if (u->flags.access == ACCESS_SEQUENTIAL
          && u->endfile == AFTER_ENDFILE)
and not afterwards?

@@ -450,6 +499,7 @@ void
 st_flush (st_parameter_filepos *fpp)
 {
   gfc_unit *u;
+  bool needs_unlock = false;

   library_start (&fpp->common);

@@ -456,6 +506,17 @@ st_flush (st_parameter_filepos *fpp)
   u = find_unit (fpp->common.unit);
   if (u != NULL)
     {
+      if (u->au)
+       {
+         if (async_wait (&(fpp->common), u->au))
+           return;
+         else
+           {
+             needs_unlock = true;
+             LOCK (&u->au->io_lock);
+           }
+       }
+
       /* Make sure format buffer is flushed.  */
       if (u->flags.form == FORM_FORMATTED)
         fbuf_flush (u, u->mode);
@@ -469,5 +530,8 @@ st_flush (st_parameter_filepos *fpp)
     generate_error (&fpp->common, LIBERROR_BAD_OPTION,
                        "Specified UNIT in FLUSH is not connected");

+  if (needs_unlock)
+    UNLOCK (&u->au->io_lock);

I would change the condition to if (ASYNC_IO && needs_unlock) for consistency.

+
   library_end ();
 }
Index: libgfortran/io/inquire.c
===================================================================
--- libgfortran/io/inquire.c    (Revision 259739)
+++ libgfortran/io/inquire.c    (Arbeitskopie)
@@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
 /* Implement the non-IOLENGTH variant of the INQUIRY statement */

 #include "io.h"
+#include "async.h"
 #include "unix.h"
 #include <string.h>

please include async.h *after* unix.h like you do everwhere else.

Index: libgfortran/io/read.c
===================================================================
--- libgfortran/io/read.c       (Revision 259739)
+++ libgfortran/io/read.c       (Arbeitskopie)
@@ -30,6 +30,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
 #include <string.h>
 #include <ctype.h>
 #include <assert.h>
+#include "async.h"

 typedef unsigned char uchar;

@@ -42,6 +43,7 @@ typedef unsigned char uchar;
 void
 set_integer (void *dest, GFC_INTEGER_LARGEST value, int length)
 {
+  NOTE ("set_integer: %lld %p", (long long int) value, dest);
   switch (length)
     {
 #ifdef HAVE_GFC_INTEGER_16

Debugging leftover? Please remove the include and the NOTE.

--- libgfortran/io/transfer.c   (Revision 259739)
+++ libgfortran/io/transfer.c   (Arbeitskopie)

+/* Wrapper function for I/O of scalar types.  If this should be an async I/O
+   request, queue it.  For a synchronous write on an async unit, perform the
+   wait operation and return an error.  For all synchronous writes, call the
+   right transfer function.  */

+static void
+wrap_scalar_transfer (st_parameter_dt *dtp, bt type, void *p, int kind,
+                     size_t size, size_t n_elem)
+{
+  if (dtp->u.p.current_unit && dtp->u.p.current_unit->au)
+    {
+      if (dtp->u.p.async)
+       {

Please move this second nested if into the first one.

+         transfer_args args;
+         args.scalar.transfer = dtp->u.p.transfer;
+         args.scalar.arg_bt = type;
+         args.scalar.data = p;
+         args.scalar.i = kind;
+         args.scalar.s1 = size;
+         args.scalar.s2 = n_elem;
+         enqueue_transfer (dtp->u.p.current_unit->au, &args,
+                           AIO_TRANSFER_SCALAR);
+         return;
+       }
+    }

 void
+transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind,
+               gfc_charlen_type charlen)
+{
+  if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
+    return;
+
+  if (dtp->u.p.current_unit && dtp->u.p.current_unit->au)
+    {
+      if (dtp->u.p.async)
+       {

Likewise:
Please move this second nested if into the first one.

+         transfer_args args;
+         size_t sz = sizeof (gfc_array_char)
+                       + sizeof (descriptor_dimension)
+                               * GFC_DESCRIPTOR_RANK (desc);
+         args.array.desc = xmalloc (sz);
+         NOTE ("desc = %p", (void *) args.array.desc);
+         memcpy (args.array.desc, desc, sz);
+         args.array.kind = kind;
+         args.array.charlen = charlen;
+         enqueue_transfer (dtp->u.p.current_unit->au, &args,
+                           AIO_TRANSFER_ARRAY);
+         return;
+       }
+    }
+  /* Come here if there was no asynchronous I/O to be scheduled.  */
+  transfer_array_inner (dtp, desc, kind, charlen);
+}

@@ -2770,6 +2839,42 @@ data_transfer_init (st_parameter_dt *dtp, int read
   else if (dtp->u.p.current_unit->internal_unit_kind > 0)
     dtp->u.p.unit_is_internal = 1;

+  if ((cf & IOPARM_DT_HAS_ASYNCHRONOUS) != 0)
+    {
+      int f;
+      f = find_option (&dtp->common, dtp->asynchronous, dtp->asynchronous_len,
+                      async_opt, "Bad ASYNCHRONOUS in data transfer "
+                      "statement");
+      if (f == ASYNC_YES && dtp->u.p.current_unit->flags.async != ASYNC_YES)
+       {
+         generate_error (&dtp->common, LIBERROR_OPTION_CONFLICT,
+                         "ASYNCHRONOUS transfer without "
+                         "ASYHCRONOUS='YES' in OPEN");

s/ASYHCRONOUS/ASYNCHRONOUS/

+         return;
+       }
+      dtp->u.p.async = f == ASYNC_YES;
+    }
[]
+void
+data_transfer_init_worker (st_parameter_dt *dtp, int read_flag)

Missing function comment.

+{
+  GFC_INTEGER_4 cf = dtp->common.flags;
+
+  NOTE ("starting worker...");
+
+  if (read_flag && dtp->u.p.current_unit->flags.form != FORM_UNFORMATTED
+      && ((cf & IOPARM_DT_LIST_FORMAT) != 0)

Excess braces

+      && dtp->u.p.current_unit->child_dtio  == 0)

Surplus horizontal whitespace before ==

+    dtp->u.p.current_unit->last_char = EOF - 1;
[]
+void
+st_read_done (st_parameter_dt *dtp)
+{
+  if (dtp->u.p.current_unit)
+    {
+      if (dtp->u.p.current_unit->au)
+       {
+         if (dtp->common.flags & IOPARM_DT_HAS_ID)
+           *dtp->id = enqueue_done_id (dtp->u.p.current_unit->au,
AIO_READ_DONE);

Surplus trailing whitespace.

+         else
+           {
+             enqueue_done (dtp->u.p.current_unit->au, AIO_READ_DONE);
+             /* An asynchronous unit without ASYNCHRONOUS="YES" - make this
+                synchronous by performing a wait operation.  */
+             if (!dtp->u.p.async)
+               async_wait (&dtp->common, dtp->u.p.current_unit->au);

Don't we have to honour handled errors from async_wait?
Same for st_write_done ().

+           }
+       }
+      else
+       st_read_done_worker (dtp);

--- libgfortran/io/unit.c       (Revision 259739)
+++ libgfortran/io/unit.c       (Arbeitskopie)

@@ -922,7 +930,7 @@ newunit_alloc (void)
   memset (newunits + old_size, 0, old_size);
   newunits[old_size] = true;
   newunit_lwi = old_size + 1;
-    __gthread_mutex_unlock (&unit_lock);
+    UNLOCK (&unit_lock);

This should be indented by 2 spaces, not 4.

   return -old_size + NEWUNIT_START;
 }


--- libgfortran/libgfortran.h   (Revision 259739)
+++ libgfortran/libgfortran.h   (Arbeitskopie)
@@ -743,6 +743,9 @@ internal_proto(translate_error);
 extern void generate_error (st_parameter_common *, int, const char *);
 iexport_proto(generate_error);

+extern bool generate_error_common (st_parameter_common *, int, const char *);
+iexport_proto(generate_error_common);

why is that exported and not just internal_proto() ?
+
 extern void generate_warning (st_parameter_common *, const char *);
 internal_proto(generate_warning);


@@ -1748,5 +1751,7 @@ void cshift1_16_c16 (gfc_array_c16 * const restric
 internal_proto(cshift1_16_c16);
 #endif

+/* Define this if we support asynchronous I/O on this platform.  This
+   currently requires weak symbols.  */

 #endif  /* LIBGFOR_H  */

obsolete comment, please remove.

Thanks for the async support!
cheers,

Reply via email to