This is an automated email from the ASF dual-hosted git repository. pkarashchenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nuttx.git
The following commit(s) were added to refs/heads/master by this push: new c9a38f42f7 sched/wqueue: Do as much work as possible in work_thread c9a38f42f7 is described below commit c9a38f42f7572172dd60d6748a7115b60b64835c Author: Zhe Weng <weng...@xiaomi.com> AuthorDate: Wed Mar 15 21:55:28 2023 +0800 sched/wqueue: Do as much work as possible in work_thread Decouple the semcount and the work queue length. Previous Problem: If a work is queued and cancelled in high priority threads (or queued by timer and cancelled by another high priority thread) before work_thread runs, the queue operation will mark work_thread as ready to run, but the cancel operation minus the semcount back to -1 and makes wqueue->q empty. Then the work_thread still runs, found empty queue, and wait sem again, then semcount becomes -2 (being minused by 1) This can be done multiple times, then semcount can become very small value. Test case to produce incorrect semcount: high_priority_task() { for (int i = 0; i < 10000; i++) { work_queue(LPWORK, &work, worker, NULL, 0); work_cancel(LPWORK, &work); usleep(1); } /* Now the g_lpwork.sem.semcount is a value near -10000 */ } With incorrect semcount, any queue operation when the work_thread is busy, will only increase semcount and push work into queue, but cannot trigger work_thread (semcount is negative but work_thread is not waiting), then there will be more and more works left in queue while the work_thread is waiting sem and cannot call them. Signed-off-by: Zhe Weng <weng...@xiaomi.com> --- sched/wqueue/kwork_cancel.c | 6 ------ sched/wqueue/kwork_queue.c | 29 +++++++++++++++++++++-------- sched/wqueue/kwork_thread.c | 8 ++++++-- sched/wqueue/wqueue.h | 1 + 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c index fd8055ea04..340d861ce5 100644 --- a/sched/wqueue/kwork_cancel.c +++ b/sched/wqueue/kwork_cancel.c @@ -88,12 +88,6 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, else { dq_rem((FAR dq_entry_t *)work, &wqueue->q); - - /* Semaphore count should be consistent with the number of - * work entries. - */ - - wqueue->sem.semcount--; } work->worker = NULL; diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c index 493b540c4e..f850cad0a8 100644 --- a/sched/wqueue/kwork_queue.c +++ b/sched/wqueue/kwork_queue.c @@ -38,6 +38,23 @@ #ifdef CONFIG_SCHED_WORKQUEUE +/**************************************************************************** + * Pre-processor Definitions + ****************************************************************************/ + +#define queue_work(wqueue, work) \ + do \ + { \ + int sem_count; \ + dq_addlast((FAR dq_entry_t *)(work), &(wqueue).q); \ + nxsem_get_value(&(wqueue).sem, &sem_count); \ + if (sem_count < 0) /* There are threads waiting for sem. */ \ + { \ + nxsem_post(&(wqueue).sem); \ + } \ + } \ + while (0) + /**************************************************************************** * Private Functions ****************************************************************************/ @@ -50,8 +67,7 @@ static void hp_work_timer_expiry(wdparm_t arg) { irqstate_t flags = enter_critical_section(); - dq_addlast((FAR dq_entry_t *)arg, &g_hpwork.q); - nxsem_post(&g_hpwork.sem); + queue_work(g_hpwork, arg); leave_critical_section(flags); } #endif @@ -64,8 +80,7 @@ static void hp_work_timer_expiry(wdparm_t arg) static void lp_work_timer_expiry(wdparm_t arg) { irqstate_t flags = enter_critical_section(); - dq_addlast((FAR dq_entry_t *)arg, &g_lpwork.q); - nxsem_post(&g_lpwork.sem); + queue_work(g_lpwork, arg); leave_critical_section(flags); } #endif @@ -137,8 +152,7 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, if (!delay) { - dq_addlast((FAR dq_entry_t *)work, &g_hpwork.q); - nxsem_post(&g_hpwork.sem); + queue_work(g_hpwork, work); } else { @@ -155,8 +169,7 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, if (!delay) { - dq_addlast((FAR dq_entry_t *)work, &g_lpwork.q); - nxsem_post(&g_lpwork.sem); + queue_work(g_lpwork, work); } else { diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c index 806024d4bd..7d4f6d4033 100644 --- a/sched/wqueue/kwork_thread.c +++ b/sched/wqueue/kwork_thread.c @@ -153,9 +153,13 @@ static int work_thread(int argc, FAR char *argv[]) /* Remove the ready-to-execute work from the list */ - work = (FAR struct work_s *)dq_remfirst(&wqueue->q); - if (work && work->worker) + while ((work = (FAR struct work_s *)dq_remfirst(&wqueue->q)) != NULL) { + if (work->worker == NULL) + { + continue; + } + /* Extract the work description from the entry (in case the work * instance will be re-used after it has been de-queued). */ diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h index 53323e78b9..c838adf2cb 100644 --- a/sched/wqueue/wqueue.h +++ b/sched/wqueue/wqueue.h @@ -26,6 +26,7 @@ #include <nuttx/config.h> +#include <semaphore.h> #include <sys/types.h> #include <stdbool.h>