This is an automated email from the ASF dual-hosted git repository.

pkarashchenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nuttx.git


The following commit(s) were added to refs/heads/master by this push:
     new c9a38f42f7 sched/wqueue: Do as much work as possible in work_thread
c9a38f42f7 is described below

commit c9a38f42f7572172dd60d6748a7115b60b64835c
Author: Zhe Weng <weng...@xiaomi.com>
AuthorDate: Wed Mar 15 21:55:28 2023 +0800

    sched/wqueue: Do as much work as possible in work_thread
    
    Decouple the semcount and the work queue length.
    
    Previous Problem:
    
    If a work is queued and cancelled in high priority threads (or queued
    by timer and cancelled by another high priority thread) before
    work_thread runs, the queue operation will mark work_thread as ready to
    run, but the cancel operation minus the semcount back to -1 and makes
    wqueue->q empty. Then the work_thread still runs, found empty queue,
    and wait sem again, then semcount becomes -2 (being minused by 1)
    
    This can be done multiple times, then semcount can become very small
    value. Test case to produce incorrect semcount:
    
    high_priority_task()
    {
      for (int i = 0; i < 10000; i++)
        {
          work_queue(LPWORK, &work, worker, NULL, 0);
          work_cancel(LPWORK, &work);
          usleep(1);
        }
    
      /* Now the g_lpwork.sem.semcount is a value near -10000 */
    }
    
    With incorrect semcount, any queue operation when the work_thread is
    busy, will only increase semcount and push work into queue, but cannot
    trigger work_thread (semcount is negative but work_thread is not
    waiting), then there will be more and more works left in queue while
    the work_thread is waiting sem and cannot call them.
    
    Signed-off-by: Zhe Weng <weng...@xiaomi.com>
---
 sched/wqueue/kwork_cancel.c |  6 ------
 sched/wqueue/kwork_queue.c  | 29 +++++++++++++++++++++--------
 sched/wqueue/kwork_thread.c |  8 ++++++--
 sched/wqueue/wqueue.h       |  1 +
 4 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c
index fd8055ea04..340d861ce5 100644
--- a/sched/wqueue/kwork_cancel.c
+++ b/sched/wqueue/kwork_cancel.c
@@ -88,12 +88,6 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue,
       else
         {
           dq_rem((FAR dq_entry_t *)work, &wqueue->q);
-
-          /* Semaphore count should be consistent with the number of
-           * work entries.
-           */
-
-          wqueue->sem.semcount--;
         }
 
       work->worker = NULL;
diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c
index 493b540c4e..f850cad0a8 100644
--- a/sched/wqueue/kwork_queue.c
+++ b/sched/wqueue/kwork_queue.c
@@ -38,6 +38,23 @@
 
 #ifdef CONFIG_SCHED_WORKQUEUE
 
+/****************************************************************************
+ * Pre-processor Definitions
+ ****************************************************************************/
+
+#define queue_work(wqueue, work) \
+  do \
+    { \
+      int sem_count; \
+      dq_addlast((FAR dq_entry_t *)(work), &(wqueue).q); \
+      nxsem_get_value(&(wqueue).sem, &sem_count); \
+      if (sem_count < 0) /* There are threads waiting for sem. */ \
+        { \
+          nxsem_post(&(wqueue).sem); \
+        } \
+    } \
+  while (0)
+
 /****************************************************************************
  * Private Functions
  ****************************************************************************/
@@ -50,8 +67,7 @@
 static void hp_work_timer_expiry(wdparm_t arg)
 {
   irqstate_t flags = enter_critical_section();
-  dq_addlast((FAR dq_entry_t *)arg, &g_hpwork.q);
-  nxsem_post(&g_hpwork.sem);
+  queue_work(g_hpwork, arg);
   leave_critical_section(flags);
 }
 #endif
@@ -64,8 +80,7 @@ static void hp_work_timer_expiry(wdparm_t arg)
 static void lp_work_timer_expiry(wdparm_t arg)
 {
   irqstate_t flags = enter_critical_section();
-  dq_addlast((FAR dq_entry_t *)arg, &g_lpwork.q);
-  nxsem_post(&g_lpwork.sem);
+  queue_work(g_lpwork, arg);
   leave_critical_section(flags);
 }
 #endif
@@ -137,8 +152,7 @@ int work_queue(int qid, FAR struct work_s *work, worker_t 
worker,
 
       if (!delay)
         {
-          dq_addlast((FAR dq_entry_t *)work, &g_hpwork.q);
-          nxsem_post(&g_hpwork.sem);
+          queue_work(g_hpwork, work);
         }
       else
         {
@@ -155,8 +169,7 @@ int work_queue(int qid, FAR struct work_s *work, worker_t 
worker,
 
       if (!delay)
         {
-          dq_addlast((FAR dq_entry_t *)work, &g_lpwork.q);
-          nxsem_post(&g_lpwork.sem);
+          queue_work(g_lpwork, work);
         }
       else
         {
diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c
index 806024d4bd..7d4f6d4033 100644
--- a/sched/wqueue/kwork_thread.c
+++ b/sched/wqueue/kwork_thread.c
@@ -153,9 +153,13 @@ static int work_thread(int argc, FAR char *argv[])
 
       /* Remove the ready-to-execute work from the list */
 
-      work = (FAR struct work_s *)dq_remfirst(&wqueue->q);
-      if (work && work->worker)
+      while ((work = (FAR struct work_s *)dq_remfirst(&wqueue->q)) != NULL)
         {
+          if (work->worker == NULL)
+            {
+              continue;
+            }
+
           /* Extract the work description from the entry (in case the work
            * instance will be re-used after it has been de-queued).
            */
diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h
index 53323e78b9..c838adf2cb 100644
--- a/sched/wqueue/wqueue.h
+++ b/sched/wqueue/wqueue.h
@@ -26,6 +26,7 @@
 
 #include <nuttx/config.h>
 
+#include <semaphore.h>
 #include <sys/types.h>
 #include <stdbool.h>
 

Reply via email to