This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nuttx.git

commit 61ef7eb3dc0697ccc9efb4c99e09e024e74b6040
Author: ligd <liguidi...@xiaomi.com>
AuthorDate: Mon Aug 28 19:29:54 2023 +0800

    wqueue: add work_cancel_sync() support
    
    Signed-off-by: ligd <liguidi...@xiaomi.com>
---
 include/nuttx/wqueue.h      | 23 +++++++++++++
 sched/wqueue/kwork_cancel.c | 78 ++++++++++++++++++++++++++++++++++++++++++---
 sched/wqueue/kwork_thread.c | 45 ++++++++++++++++++++------
 sched/wqueue/wqueue.h       |  2 ++
 4 files changed, 134 insertions(+), 14 deletions(-)

diff --git a/include/nuttx/wqueue.h b/include/nuttx/wqueue.h
index 3da6058980..78e72937e9 100644
--- a/include/nuttx/wqueue.h
+++ b/include/nuttx/wqueue.h
@@ -383,6 +383,29 @@ int work_queue(int qid, FAR struct work_s *work, worker_t 
worker,
 
 int work_cancel(int qid, FAR struct work_s *work);
 
+/****************************************************************************
+ * Name: work_cancel_sync
+ *
+ * Description:
+ *   Blocked cancel previously queued user-mode work.  This removes work
+ *   from the user mode work queue.  After work has been cancelled, it may
+ *   be requeued by calling work_queue() again.
+ *
+ * Input Parameters:
+ *   qid    - The work queue ID (must be HPWORK or LPWORK)
+ *   work   - The previously queued work structure to cancel
+ *
+ * Returned Value:
+ *   Zero (OK) on success, a negated errno on failure.  This error may be
+ *   reported:
+ *
+ *   -ENOENT - There is no such work queued.
+ *   -EINVAL - An invalid work queue was specified
+ *
+ ****************************************************************************/
+
+int work_cancel_sync(int qid, FAR struct work_s *work);
+
 /****************************************************************************
  * Name: work_foreach
  *
diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c
index 340d861ce5..332d3fa90a 100644
--- a/sched/wqueue/kwork_cancel.c
+++ b/sched/wqueue/kwork_cancel.c
@@ -49,8 +49,11 @@
  *   work_queue() again.
  *
  * Input Parameters:
- *   qid    - The work queue ID
- *   work   - The previously queued work structure to cancel
+ *   wqueue  - The work queue to use.  Must be HPWORK or LPWORK
+ *   nthread - The number of threads in the work queue
+ *             > 0 unsynchronous cancel
+ *             < 0 synchronous cancel
+ *   work    - The previously queued work structure to cancel
  *
  * Returned Value:
  *   Zero (OK) on success, a negated errno on failure.  This error may be
@@ -61,7 +64,7 @@
  *
  ****************************************************************************/
 
-static int work_qcancel(FAR struct kwork_wqueue_s *wqueue,
+static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread,
                         FAR struct work_s *work)
 {
   irqstate_t flags;
@@ -93,6 +96,21 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue,
       work->worker = NULL;
       ret = OK;
     }
+  else if (nthread > 0)
+    {
+      int wndx;
+
+      for (wndx = 0; wndx < nthread; wndx++)
+        {
+          if (wqueue->worker[wndx].work == work &&
+              wqueue->worker[wndx].pid != nxsched_gettid())
+            {
+              nxsem_wait_uninterruptible(&wqueue->worker[wndx].wait);
+              ret = OK;
+              break;
+            }
+        }
+    }
 
   leave_critical_section(flags);
   return ret;
@@ -130,7 +148,56 @@ int work_cancel(int qid, FAR struct work_s *work)
     {
       /* Cancel high priority work */
 
-      return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork, work);
+      return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork,
+                          -1, work);
+    }
+  else
+#endif
+#ifdef CONFIG_SCHED_LPWORK
+  if (qid == LPWORK)
+    {
+      /* Cancel low priority work */
+
+      return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork,
+                          -1, work);
+    }
+  else
+#endif
+    {
+      return -EINVAL;
+    }
+}
+
+/****************************************************************************
+ * Name: work_cancel_sync
+ *
+ * Description:
+ *   Blocked cancel previously queued user-mode work.  This removes work
+ *   from the user mode work queue.  After work has been cancelled, it may
+ *   be requeued by calling work_queue() again.
+ *
+ * Input Parameters:
+ *   qid    - The work queue ID (must be HPWORK or LPWORK)
+ *   work   - The previously queued work structure to cancel
+ *
+ * Returned Value:
+ *   Zero (OK) on success, a negated errno on failure.  This error may be
+ *   reported:
+ *
+ *   -ENOENT - There is no such work queued.
+ *   -EINVAL - An invalid work queue was specified
+ *
+ ****************************************************************************/
+
+int work_cancel_sync(int qid, FAR struct work_s *work)
+{
+#ifdef CONFIG_SCHED_HPWORK
+  if (qid == HPWORK)
+    {
+      /* Cancel high priority work */
+
+      return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork,
+                          CONFIG_SCHED_HPNTHREADS, work);
     }
   else
 #endif
@@ -139,7 +206,8 @@ int work_cancel(int qid, FAR struct work_s *work)
     {
       /* Cancel low priority work */
 
-      return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork, work);
+      return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork,
+                          CONFIG_SCHED_LPNTHREADS, work);
     }
   else
 #endif
diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c
index 6668c889a6..9e2c8bf62d 100644
--- a/sched/wqueue/kwork_thread.c
+++ b/sched/wqueue/kwork_thread.c
@@ -126,13 +126,19 @@ struct lp_wqueue_s g_lpwork =
 static int work_thread(int argc, FAR char *argv[])
 {
   FAR struct kwork_wqueue_s *wqueue;
+  FAR struct kworker_s *kworker;
   FAR struct work_s *work;
   worker_t worker;
   irqstate_t flags;
   FAR void *arg;
+  int semcount;
 
-  wqueue = (FAR struct kwork_wqueue_s *)
-           ((uintptr_t)strtoul(argv[1], NULL, 16));
+  /* Get the handle from argv */
+
+  wqueue  = (FAR struct kwork_wqueue_s *)
+            ((uintptr_t)strtoul(argv[1], NULL, 0));
+  kworker = (FAR struct kworker_s *)
+            ((uintptr_t)strtoul(argv[2], NULL, 0));
 
   flags = enter_critical_section();
 
@@ -168,6 +174,10 @@ static int work_thread(int argc, FAR char *argv[])
 
           work->worker = NULL;
 
+          /* Mark the thread busy */
+
+          kworker->work = work;
+
           /* Do the work.  Re-enable interrupts while the work is being
            * performed... we don't have any idea how long this will take!
            */
@@ -175,6 +185,18 @@ static int work_thread(int argc, FAR char *argv[])
           leave_critical_section(flags);
           CALL_WORKER(worker, arg);
           flags = enter_critical_section();
+
+          /* Mark the thread un-busy */
+
+          kworker->work = NULL;
+
+          /* Check if someone is waiting, if so, wakeup it */
+
+          nxsem_get_value(&kworker->wait, &semcount);
+          while (semcount++ < 0)
+            {
+              nxsem_post(&kworker->wait);
+            }
         }
 
       /* Then process queued work.  work_process will not return until: (1)
@@ -213,15 +235,12 @@ static int work_thread_create(FAR const char *name, int 
priority,
                               int stack_size, int nthread,
                               FAR struct kwork_wqueue_s *wqueue)
 {
-  FAR char *argv[2];
-  char args[32];
+  FAR char *argv[3];
+  char arg0[32];
+  char arg1[32];
   int wndx;
   int pid;
 
-  snprintf(args, sizeof(args), "%p", wqueue);
-  argv[0] = args;
-  argv[1] = NULL;
-
   /* Don't permit any of the threads to run until we have fully initialized
    * g_hpwork and g_lpwork.
    */
@@ -230,6 +249,14 @@ static int work_thread_create(FAR const char *name, int 
priority,
 
   for (wndx = 0; wndx < nthread; wndx++)
     {
+      nxsem_init(&wqueue->worker[wndx].wait, 0, 0);
+
+      snprintf(arg0, sizeof(arg0), "%p", wqueue);
+      snprintf(arg1, sizeof(arg1), "%p", &wqueue->worker[wndx]);
+      argv[0] = arg0;
+      argv[1] = arg1;
+      argv[2] = NULL;
+
       pid = kthread_create(name, priority, stack_size,
                            work_thread, argv);
 
@@ -241,7 +268,7 @@ static int work_thread_create(FAR const char *name, int 
priority,
           return pid;
         }
 
-      wqueue->worker[wndx].pid  = pid;
+      wqueue->worker[wndx].pid = pid;
     }
 
   sched_unlock();
diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h
index c838adf2cb..66f2ed5a4c 100644
--- a/sched/wqueue/wqueue.h
+++ b/sched/wqueue/wqueue.h
@@ -53,6 +53,8 @@
 struct kworker_s
 {
   pid_t             pid;       /* The task ID of the worker thread */
+  FAR struct work_s *work;     /* The work structure */
+  sem_t             wait;      /* Sync waiting for worker done */
 };
 
 /* This structure defines the state of one kernel-mode work queue */

Reply via email to