On Wed, Jun 28, 2017 at 02:19:25PM +0200, Petr Mladek wrote: > On Wed 2017-05-31 16:22:33, Sergey Senozhatsky wrote: > > so I try to minimize the negative impact of RT prio here. printk_kthread > > is not special any more. it's an auxiliary kthread that we sometimes > > wake_up. the thing is that printk_kthread also must offload at some > > point, basically the same `atomic_print_limit' limit applies to it as > > well. > > You might call cond_resched() outside console_unlock(). But you have > to keep printk_kthread in runnable state as long as there are pending > messages. Then scheduler will always prefer this RT task over non-RT > tasks. Or am I wrong?
Not sure whether I mentioned this advice before, but: I believe we should strive to achieve a design where cond_resched() etc. is *not* needed - cond_resched() / sleep() etc. likely are signs of extended code smell: one should strive to achieve handling which has a properly *precisely*/*strictly* handshaked request/response (producer/consumer) communication protocol. I.e., IPC mechanism objects (mutex etc.). That way, one avoids the polling-type, imprecise-type "are we there yet? is it our job now?" handling and instead uses properly precise (thus, *not* needlessly inefficient!) scheduler wakeup mechanisms. Thus, it's "merely" (hah!) a matter of designing handling where responsibilities / transitions are clearly spelt out, thus end up as properly precisely implemented notifications via IPC mechanisms. For a very simple setup (which quite possibly cannot be done this easily!), things could be: printk_work_wakeup_one_worker() { reliably_notify_only_one_available_computing_hardware_resource(); <-- kernel mechanism available, I think } void printk_work_dump_my_package(work_count_max) { while (++work_count < work_count_max) printk_work_dump_element(); } void printk_work_dump_handler(work_count_max) { --> added mutex to have continuation check be done within *inner* atomic handling section (avoid race window). printk_work_dump_my_package(work_count_max); take_mutex(); bool all_done = !have_payload_remain; bool need_continuation = !(all_done); if (need_continuation) printk_work_wakeup_one_worker(); release_mutex(); } worker thread frame function: WORKER printk_work_worker() { for (;;) { switch(select()) case dump_requested: printk_work_dump_handler(printk_work_count_max_kthread); case termination_requested: return; } } /* tasked_cpus = all_cpus; */ tasked_cpus = all_cpus / 4 */ for(tasked_cpus) { new_kthread(printk_work_worker()); } printk_impl() { printk_queue_element(...); printk_work_dump_handler(printk_work_count_max_immediate); } Very Q&D thoughts, but might be helpful... (ermm, however launching #cpus workers most likely is useless, since schedulable decision-making does *not* depend on #cpus - if any computing resource is available, this *can* be executed, thus quite likely only one worker is needed anyway - no, two, since one worker needs to be able to wake up precisely *one* other to have things precisely continue on *another* computing resource!) Oh, and this doesn't implement (and especially not reliably/atomically race-window-less) the case of having another activity trigger another printk queuing and thus (potentially - but not necessarily!!) another package dump activity while some worker activity already is ongoing. I.e. we've got a race window in the (multi-)use of printk_work_dump_handler() (which should be solvable, though). Oh, and rather than doing some specific work_count limit comparisons, it might actually be more elegant instead to bundle worker work packages in *advance*, to achieve simple/isolated/separate submission to whichever worker one would prefer; inner handling would then simply do: work_handler() { while (!my_package_done) dump_package_element(); } But splitting into separate work packages bears the risk of illegal reordering of printk payload elements, thus most likely one should *not* do this and instead keep doing simple work_count checks on a *global*/*shared* printk queue payload (insertion submission of further elements into this queue must still be possible at any time without (excessive) blocking, though, of course). while (can_continue_dumping) { payload_mutex_begin(); element = grab_element(); payload_mutex_end(); dump_element(element); } payload_mutex_begin(); queue_element(element); payload_mutex_end(); HTH, Andreas Mohr