This is an automated email from the ASF dual-hosted git repository. xiaoxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nuttx.git
commit 6f72f5481d52f6d5ce5cb1d38d60b443d01b8c11 Author: ouyangxiangzhen <ouyangxiangz...@xiaomi.com> AuthorDate: Thu Apr 17 11:41:22 2025 +0800 sched/wqueue: Refactor delayed and periodical workqueue. This commit refactors the logic of workqueue processing delayed and periodic work, and changes the timer to be set in `work_thread`. The improvements of this change are as follows: - Fixed the memory reuse problem of the original periodic workqueue implementation. - By removing the `wdog_s` structure in the `work_s` structure, the memory overhead of each `work_s` structure is reduced by about 30 bytes. - Set the timer for each workqueue instead of each work, which improves system performance. - Simplified the workqueue cancel logic. Signed-off-by: ouyangxiangzhen <ouyangxiangz...@xiaomi.com> --- include/nuttx/wqueue.h | 15 +-- libs/libc/wqueue/work_queue.c | 10 ++ libs/libc/wqueue/work_usrthread.c | 8 +- sched/wqueue/kwork_cancel.c | 34 +++++-- sched/wqueue/kwork_queue.c | 135 ++++++++------------------- sched/wqueue/kwork_thread.c | 187 ++++++++++++++++++++++++++++---------- sched/wqueue/wqueue.h | 96 ++++++++++++++----- 7 files changed, 296 insertions(+), 189 deletions(-) diff --git a/include/nuttx/wqueue.h b/include/nuttx/wqueue.h index 59803faf6c..6eaefff597 100644 --- a/include/nuttx/wqueue.h +++ b/include/nuttx/wqueue.h @@ -249,16 +249,11 @@ typedef CODE void (*worker_t)(FAR void *arg); struct work_s { - struct list_node node; /* Implements a double linked list */ - clock_t qtime; /* Time work queued */ - union - { - struct wdog_s timer; /* Delay expiry timer */ - struct wdog_period_s ptimer; /* Period expiry timer */ - } u; - worker_t worker; /* Work callback */ - FAR void *arg; /* Callback argument */ - FAR struct kwork_wqueue_s *wq; /* Work queue */ + struct list_node node; /* Implements a double linked list */ + clock_t qtime; /* Time work queued */ + clock_t period; /* Periodical delay ticks */ + worker_t worker; /* Work callback */ + FAR void *arg; /* Callback argument */ }; /* This is an enumeration of the various events that may be diff --git a/libs/libc/wqueue/work_queue.c b/libs/libc/wqueue/work_queue.c index 23dd1535b0..f3d94cb894 100644 --- a/libs/libc/wqueue/work_queue.c +++ b/libs/libc/wqueue/work_queue.c @@ -89,6 +89,16 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue, work->arg = arg; /* Callback argument */ work->qtime = clock() + delay; /* Delay until work performed */ + /* delay+1 is to prevent the insufficient sleep time if we are + * currently near the boundary to the next tick. + * | current_tick | current_tick + 1 | current_tick + 2 | .... | + * | ^ Here we get the current tick + * In this case we delay 1 tick, timer will be triggered at + * current_tick + 1, which is not enough for at least 1 tick. + */ + + work->qtime += 1; + /* Insert the work into the wait queue sorted by the expired time. */ list_for_every_entry(&wqueue->q, curr, struct work_s, node) diff --git a/libs/libc/wqueue/work_usrthread.c b/libs/libc/wqueue/work_usrthread.c index 35aa739582..e52529c245 100644 --- a/libs/libc/wqueue/work_usrthread.c +++ b/libs/libc/wqueue/work_usrthread.c @@ -94,7 +94,7 @@ static void work_process(FAR struct usr_wqueue_s *wqueue) FAR struct work_s *work; worker_t worker; FAR void *arg; - sclock_t elapsed; + clock_t tick; clock_t next; int ret; @@ -126,18 +126,18 @@ static void work_process(FAR struct usr_wqueue_s *wqueue) * zero will always execute immediately. */ - elapsed = clock() - work->qtime; + tick = clock(); /* Is this delay work ready? */ - if (elapsed >= 0) + if (clock_compare(work->qtime, tick)) { /* Remove the ready-to-execute work from the list */ list_delete(&work->node); /* Extract the work description from the entry (in case the work - * instance by the re-used after it has been de-queued). + * instance by the reused after it has been de-queued). */ worker = work->worker; diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c index 98d84b8065..369730d6a0 100644 --- a/sched/wqueue/kwork_cancel.c +++ b/sched/wqueue/kwork_cancel.c @@ -46,7 +46,7 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync, FAR struct work_s *work) { irqstate_t flags; - int ret = -ENOENT; + int ret = OK; if (wqueue == NULL || work == NULL) { @@ -59,17 +59,37 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync, */ flags = spin_lock_irqsave(&wqueue->lock); - if (work->worker != NULL) + + /* Check whether we own the work structure. */ + + if (!work_available(work)) { - /* Remove the entry from the work queue and make sure that it is - * marked as available (i.e., the worker field is nullified). - */ + bool is_head = list_is_head(&wqueue->pending, &work->node); + + /* Seize the ownership from the work thread. */ work->worker = NULL; - wd_cancel(&work->u.timer); + list_delete(&work->node); - ret = OK; + /* If the head of the pending queue has changed, we should reset + * the wqueue timer. + */ + + if (is_head) + { + if (!list_is_empty(&wqueue->pending)) + { + work = list_first_entry(&wqueue->pending, struct work_s, node); + + ret = wd_start_abstick(&wqueue->timer, work->qtime, + work_timer_expired, (wdparm_t)wqueue); + } + else + { + wd_cancel(&wqueue->timer); + } + } } else if (!up_interrupt_context() && !sched_idletask() && sync) { diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c index 5c1f192c8a..57f893cb67 100644 --- a/sched/wqueue/kwork_queue.c +++ b/sched/wqueue/kwork_queue.c @@ -40,67 +40,6 @@ #ifdef CONFIG_SCHED_WORKQUEUE -/**************************************************************************** - * Pre-processor Definitions - ****************************************************************************/ - -#define queue_work(wqueue, work) \ - do \ - { \ - list_add_tail(&(wqueue)->q, &(work)->node); \ - if ((wqueue)->wait_count > 0) /* There are threads waiting for sem. */ \ - { \ - (wqueue)->wait_count--; \ - nxsem_post(&(wqueue)->sem); \ - } \ - } \ - while (0) - -/**************************************************************************** - * Private Functions - ****************************************************************************/ - -/**************************************************************************** - * Name: work_timer_expiry - ****************************************************************************/ - -static void work_timer_expiry(wdparm_t arg) -{ - FAR struct work_s *work = (FAR struct work_s *)arg; - - irqstate_t flags = spin_lock_irqsave(&work->wq->lock); - sched_lock(); - - /* We have being canceled */ - - if (work->worker != NULL) - { - queue_work(work->wq, work); - } - - spin_unlock_irqrestore(&work->wq->lock, flags); - sched_unlock(); -} - -static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads, - FAR struct work_s *work) -{ - int wndx; - - for (wndx = 0; wndx < nthreads; wndx++) - { - if (kworkers[wndx].work == work) - { - if (kworkers[wndx].wait_count > 0) - { - return true; - } - } - } - - return false; -} - /**************************************************************************** * Public Functions ****************************************************************************/ @@ -141,65 +80,67 @@ int work_queue_period_wq(FAR struct kwork_wqueue_s *wqueue, FAR void *arg, clock_t delay, clock_t period) { irqstate_t flags; - int ret = OK; + clock_t expected; + bool wake = false; + int ret = OK; if (wqueue == NULL || work == NULL || worker == NULL) { return -EINVAL; } - /* Interrupts are disabled so that this logic can be called from with - * task logic or from interrupt handling logic. - */ - - flags = spin_lock_irqsave(&wqueue->lock); - sched_lock(); + /* Ensure the work has been canceled. */ - /* Remove the entry from the timer and work queue. */ + work_cancel_wq(wqueue, work); - if (work->worker != NULL) - { - /* Remove the entry from the work queue and make sure that it is - * marked as available (i.e., the worker field is nullified). - */ + /* delay+1 is to prevent the insufficient sleep time if we are + * currently near the boundary to the next tick. + * | current_tick | current_tick + 1 | current_tick + 2 | .... | + * | ^ Here we get the current tick + * In this case we delay 1 tick, timer will be triggered at + * current_tick + 1, which is not enough for at least 1 tick. + */ - work->worker = NULL; - wd_cancel(&work->u.timer); + expected = clock_systime_ticks() + delay + 1; - list_delete(&work->node); - } + /* Interrupts are disabled so that this logic can be called from with + * task logic or from interrupt handling logic. + */ - if (work_is_canceling(wqueue->worker, wqueue->nthreads, work)) - { - goto out; - } + flags = spin_lock_irqsave(&wqueue->lock); /* Initialize the work structure. */ - work->worker = worker; /* Work callback. non-NULL means queued */ - work->arg = arg; /* Callback argument */ - work->wq = wqueue; /* Work queue */ + work->worker = worker; /* Work callback. non-NULL means queued */ + work->arg = arg; /* Callback argument */ + work->qtime = expected; /* Expected time */ + work->period = period; /* Periodical delay */ - /* Queue the new work */ + /* Insert to the pending list of the wqueue. */ - if (!delay) + if (delay) { - queue_work(wqueue, work); - } - else if (period == 0) - { - ret = wd_start(&work->u.timer, delay, - work_timer_expiry, (wdparm_t)work); + if (work_insert_pending(wqueue, work)) + { + /* Start the timer if the work is the earliest expired work. */ + + ret = wd_start_abstick(&wqueue->timer, work->qtime, + work_timer_expired, (wdparm_t)wqueue); + } } else { - ret = wd_start_period(&work->u.ptimer, delay, period, - work_timer_expiry, (wdparm_t)work); + list_add_tail(&wqueue->expired, &work->node); + wake = true; } -out: spin_unlock_irqrestore(&wqueue->lock, flags); - sched_unlock(); + + if (wake) + { + nxsem_post(&wqueue->sem); + } + return ret; } diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c index 1320537fde..be1a77185e 100644 --- a/sched/wqueue/kwork_thread.c +++ b/sched/wqueue/kwork_thread.c @@ -83,11 +83,14 @@ struct hp_wqueue_s g_hpwork = { - LIST_INITIAL_VALUE(g_hpwork.q), - SEM_INITIALIZER(0), - SEM_INITIALIZER(0), - SP_UNLOCKED, - CONFIG_SCHED_HPNTHREADS, + { + LIST_INITIAL_VALUE(g_hpwork.wq.expired), + LIST_INITIAL_VALUE(g_hpwork.wq.pending), + SEM_INITIALIZER(0), + SEM_INITIALIZER(0), + SP_UNLOCKED, + CONFIG_SCHED_HPNTHREADS, + } }; #endif /* CONFIG_SCHED_HPWORK */ @@ -97,11 +100,14 @@ struct hp_wqueue_s g_hpwork = struct lp_wqueue_s g_lpwork = { - LIST_INITIAL_VALUE(g_lpwork.q), - SEM_INITIALIZER(0), - SEM_INITIALIZER(0), - SP_UNLOCKED, - CONFIG_SCHED_LPNTHREADS, + { + LIST_INITIAL_VALUE(g_lpwork.wq.expired), + LIST_INITIAL_VALUE(g_lpwork.wq.pending), + SEM_INITIALIZER(0), + SEM_INITIALIZER(0), + SP_UNLOCKED, + CONFIG_SCHED_LPNTHREADS, + } }; #endif /* CONFIG_SCHED_LPWORK */ @@ -110,6 +116,51 @@ struct lp_wqueue_s g_lpwork = * Private Functions ****************************************************************************/ +static inline_function +void work_dispatch(FAR struct kwork_wqueue_s *wq) +{ + FAR struct work_s *work; + FAR struct work_s *next; + unsigned int count = 0; + clock_t ticks = clock_systime_ticks(); + + /* Wake up the worker thread once there is expired work. + * If some worker threads are busy, here the callback will + * wake up another waiting work thread. + * + * Becareful of the special case that the pending work + * has been canceled but the timer is expired. + * In this case we should not wake up any worker thread. + */ + + list_for_every_entry_safe(&wq->pending, work, next, struct work_s, node) + { + /* Check whether the work has expired. */ + + if (!clock_compare(work->qtime, ticks)) + { + wd_start_abstick(&wq->timer, work->qtime, + work_timer_expired, (wdparm_t)wq); + break; + } + + /* Expired work will be moved to tail of the expired queue. */ + + list_delete(&work->node); + list_add_tail(&wq->expired, &work->node); + + /* Note that the thread execution this function is also + * a worker thread, which has already been woken up by the timer. + * So only `count - 1` semaphore will be posted. + */ + + if (count++ > 0) + { + nxsem_post(&wq->sem); + } + } +} + /**************************************************************************** * Name: work_thread * @@ -135,11 +186,11 @@ struct lp_wqueue_s g_lpwork = static int work_thread(int argc, FAR char *argv[]) { FAR struct kwork_wqueue_s *wqueue; - FAR struct kworker_s *kworker; - FAR struct work_s *work; - worker_t worker; - irqstate_t flags; - FAR void *arg; + FAR struct kworker_s *kworker; + FAR struct work_s *work; + worker_t worker; + irqstate_t flags; + FAR void *arg; /* Get the handle from argv */ @@ -148,33 +199,38 @@ static int work_thread(int argc, FAR char *argv[]) kworker = (FAR struct kworker_s *) ((uintptr_t)strtoul(argv[2], NULL, 16)); - flags = spin_lock_irqsave(&wqueue->lock); - sched_lock(); - - /* Loop forever */ + /* Loop until wqueue->exit != 0. + * Since the only way to set wqueue->exit is to call work_queue_free(), + * there is no need for entering the critical section. + */ while (!wqueue->exit) { - /* And check each entry in the work queue. Since we have disabled + /* And check first entry in the work queue. Since we have disabled * interrupts we know: (1) we will not be suspended unless we do * so ourselves, and (2) there will be no changes to the work queue */ - /* Remove the ready-to-execute work from the list */ + flags = spin_lock_irqsave(&wqueue->lock); + sched_lock(); + + /* If the wqueue timer is expired and non-active, it indicates that + * there might be expired work in the pending queue. + */ - while (!list_is_empty(&wqueue->q)) + if (!WDOG_ISACTIVE(&wqueue->timer)) { - work = list_first_entry(&wqueue->q, struct work_s, node); + work_dispatch(wqueue); + } - list_delete(&work->node); + if (!list_is_empty(&wqueue->expired)) + { + work = list_first_entry(&wqueue->expired, struct work_s, node); - if (work->worker == NULL) - { - continue; - } + list_delete(&work->node); - /* Extract the work description from the entry (in case the work - * instance will be re-used after it has been de-queued). + /* Extract the work description from the entry (in case the + * work instance will be reused after it has been de-queued). */ worker = work->worker; @@ -183,21 +239,42 @@ static int work_thread(int argc, FAR char *argv[]) arg = work->arg; - /* Mark the work as no longer being queued */ + /* Check whether the work is periodic. */ + + if (work->period != 0) + { + /* Calculate next expiration qtime. */ + + work->qtime += work->period; + + /* Enqueue to the waiting queue */ + + if (work_insert_pending(wqueue, work)) + { + /* We should reset timer if the work is the earliest. */ - work->worker = NULL; + wd_start_abstick(&wqueue->timer, work->qtime, + work_timer_expired, (wdparm_t)wqueue); + } + } + else + { + /* Return the work structure ownership to the work owner. */ + + work->worker = NULL; + } /* Mark the thread busy */ kworker->work = work; + spin_unlock_irqrestore(&wqueue->lock, flags); + sched_unlock(); + /* Do the work. Re-enable interrupts while the work is being * performed... we don't have any idea how long this will take! */ - spin_unlock_irqrestore(&wqueue->lock, flags); - sched_unlock(); - CALL_WORKER(worker, arg); flags = spin_lock_irqsave(&wqueue->lock); sched_lock(); @@ -215,23 +292,14 @@ static int work_thread(int argc, FAR char *argv[]) } } - /* Then process queued work. work_process will not return until: (1) - * there is no further work in the work queue, and (2) semaphore is - * posted. - */ - - wqueue->wait_count++; spin_unlock_irqrestore(&wqueue->lock, flags); sched_unlock(); + /* Wait for the semaphore to be posted by the wqueue timer. */ + nxsem_wait_uninterruptible(&wqueue->sem); - flags = spin_lock_irqsave(&wqueue->lock); - sched_lock(); } - spin_unlock_irqrestore(&wqueue->lock, flags); - sched_unlock(); - nxsem_post(&wqueue->exsem); return OK; } @@ -303,6 +371,27 @@ static int work_thread_create(FAR const char *name, int priority, * Public Functions ****************************************************************************/ +/**************************************************************************** + * Name: work_timer_expired + * + * Description: + * The wqueue timer callback. + * + * Input Parameters: + * arg - The work queue. + * + ****************************************************************************/ + +void work_timer_expired(wdparm_t arg) +{ + /* The work time expired callback will wake up at least one worker thread + * to dispatch the expired work. + */ + + FAR struct kwork_wqueue_s *wq = (FAR struct kwork_wqueue_s *)arg; + nxsem_post(&wq->sem); +} + /**************************************************************************** * Name: work_queue_create * @@ -349,7 +438,9 @@ FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name, /* Initialize the work queue structure */ - list_initialize(&wqueue->q); + list_initialize(&wqueue->expired); + list_initialize(&wqueue->pending); + wqueue->timer.func = NULL; nxsem_init(&wqueue->sem, 0, 0); nxsem_init(&wqueue->exsem, 0, 0); wqueue->nthreads = nthreads; @@ -392,6 +483,8 @@ int work_queue_free(FAR struct kwork_wqueue_s *wqueue) return -EINVAL; } + wd_cancel(&wqueue->timer); + /* Mark the work queue as exiting */ wqueue->exit = true; diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h index b9a6f1ceb2..7628670b03 100644 --- a/sched/wqueue/wqueue.h +++ b/sched/wqueue/wqueue.h @@ -66,14 +66,15 @@ struct kworker_s struct kwork_wqueue_s { - struct list_node q; /* The queue of pending work */ - sem_t sem; /* The counting semaphore of the wqueue */ - sem_t exsem; /* Sync waiting for thread exit */ - spinlock_t lock; /* Spinlock */ - uint8_t nthreads; /* Number of worker threads */ - bool exit; /* A flag to request the thread to exit */ - int16_t wait_count; - struct kworker_s worker[0]; /* Describes a worker thread */ + struct list_node expired; /* The queue of expired work. */ + struct list_node pending; /* The queue of pending work. */ + sem_t sem; /* The counting semaphore of the wqueue */ + sem_t exsem; /* Sync waiting for thread exit */ + spinlock_t lock; /* Spinlock */ + uint8_t nthreads; /* Number of worker threads */ + bool exit; /* A flag to request the thread to exit */ + struct wdog_s timer; /* Timer to pending. */ + struct kworker_s worker[0]; /* Describes a worker thread */ }; /* This structure defines the state of one high-priority work queue. This @@ -83,17 +84,11 @@ struct kwork_wqueue_s #ifdef CONFIG_SCHED_HPWORK struct hp_wqueue_s { - struct list_node q; /* The queue of pending work */ - sem_t sem; /* The counting semaphore of the wqueue */ - sem_t exsem; /* Sync waiting for thread exit */ - spinlock_t lock; /* Spinlock */ - uint8_t nthreads; /* Number of worker threads */ - bool exit; /* A flag to request the thread to exit */ - int16_t wait_count; + struct kwork_wqueue_s wq; /* Describes each thread in the high priority queue's thread pool */ - struct kworker_s worker[CONFIG_SCHED_HPNTHREADS]; + struct kworker_s worker[CONFIG_SCHED_HPNTHREADS]; }; #endif @@ -104,17 +99,11 @@ struct hp_wqueue_s #ifdef CONFIG_SCHED_LPWORK struct lp_wqueue_s { - struct list_node q; /* The queue of pending work */ - sem_t sem; /* The counting semaphore of the wqueue */ - sem_t exsem; /* Sync waiting for thread exit */ - spinlock_t lock; /* Spinlock */ - uint8_t nthreads; /* Number of worker threads */ - bool exit; /* A flag to request the thread to exit */ - int16_t wait_count; + struct kwork_wqueue_s wq; /* Describes each thread in the low priority queue's thread pool */ - struct kworker_s worker[CONFIG_SCHED_LPNTHREADS]; + struct kworker_s worker[CONFIG_SCHED_LPNTHREADS]; }; #endif @@ -159,6 +148,65 @@ static inline_function FAR struct kwork_wqueue_s *work_qid2wq(int qid) } } +/**************************************************************************** + * Name: work_insert_pending + * + * Description: + * Internal public function to insert the work to the workqueue. + * Require wqueue != NULL and work != NULL. + * + * Input Parameters: + * wqueue - The work queue. + * work - The work to be inserted. + * + * Returned Value: + * Return whether the work is inserted at the head of the pending queue. + * + ****************************************************************************/ + +static inline_function +bool work_insert_pending(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work) +{ + struct work_s *curr; + + DEBUGASSERT(wqueue != NULL && work != NULL); + + /* Insert the work into the wait queue sorted by the expired time. */ + + list_for_every_entry(&wqueue->pending, curr, struct work_s, node) + { + if (!clock_compare(curr->qtime, work->qtime)) + { + break; + } + } + + /* After the insertion, we do not violate the invariant that + * the wait queue is sorted by the expired time. Because + * curr->qtime > work->qtime. + * In the case of the wqueue is empty, we insert + * the work at the head of the wait queue. + */ + + list_add_before(&curr->node, &work->node); + + return list_is_head(&wqueue->pending, &work->node); +} + +/**************************************************************************** + * Name: work_timer_expired + * + * Description: + * The wqueue timer callback. + * + * Input Parameters: + * arg - The work queue. + * + ****************************************************************************/ + +void work_timer_expired(wdparm_t arg); + /**************************************************************************** * Name: work_start_highpri *