On Fri, Aug 16, 2013 at 6:11 PM, Igor Galić <i.ga...@brainsware.org> wrote:
> > > ----- Original Message ----- > > Updated Branches: > > refs/heads/master e0ec5304d -> 8a1112881 > > > > > > TS-2089: introduce configurable collation preproc threads > > > > We found that CPU of logging thread could be easy to reach up 100% in > > collation host, but disk IO was low at the same time. > > > > The bottleneck of logging thread is that some preprocessing job, such as > > convert LogBuffer to ascii text, consume so much CPU time. And more > > worse, the write() operation will block logging thread. > > > > So this patch try to split logging thread into two parts: > > 1) Configurable preproc threads, which are responsiable for processing > all > > of prepare work, and then forward the preprocessed buffer to flush > thread, > > or send them to CollationClient/HostSM. > > > > 2) One Flush thread, it will consume preprocessed buffers and write them > to > > disk. In our testing, one flush thread is enough for us. > > > > TODO: This patch supports only one flush thread, we can improve it to > > "one flush thread per file/disk" in the future. > > > > == How to configure == > > The number of preproc threads is 1 by default. > > > > Please modify "proxy.config.log.collation_preproc_threads" option to > > change it. > > > First off: I *love* your commit messages. > > > Signed-off-by: Yunkai Zhang <qiushu....@taobao.com> > > > > > > Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo > > Commit: > http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288 > > Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288 > > Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288 > > > > Branch: refs/heads/master > > Commit: 8a1112881813b5a09e0de8f51770f99337febcfc > > Parents: e0ec530 > > Author: Yunkai Zhang <qiushu....@taobao.com> > > Authored: Sun Aug 11 16:42:32 2013 +0800 > > Committer: Yunkai Zhang <qiushu....@taobao.com> > > Committed: Fri Aug 16 10:41:16 2013 +0800 > > > > ---------------------------------------------------------------------- > > CHANGES | 3 + > > mgmt/RecordsConfig.cc | 2 + > > mgmt/cli/ShowCmd.cc | 3 + > > proxy/logging/Log.cc | 257 > ++++++++++++++++++++++++----- > > proxy/logging/Log.h | 46 +++++- > > proxy/logging/LogBufferSink.h | 9 +- > > proxy/logging/LogCollationClientSM.cc | 6 +- > > proxy/logging/LogCollationHostSM.cc | 4 +- > > proxy/logging/LogConfig.cc | 19 ++- > > proxy/logging/LogConfig.h | 1 + > > proxy/logging/LogFile.cc | 121 ++++++-------- > > proxy/logging/LogFile.h | 20 +-- > > proxy/logging/LogHost.cc | 66 ++++---- > > proxy/logging/LogHost.h | 14 +- > > proxy/logging/LogObject.cc | 68 ++++---- > > proxy/logging/LogObject.h | 43 +++-- > > 16 files changed, 450 insertions(+), 232 deletions(-) > > ---------------------------------------------------------------------- > > I'm missing a change to doc/reference/configuration/records.config.en.rst > We should make it a habit of adding documentation in the same commit as > new records.config changes. > I didn't notice "records.config.en.rst" file, sorry. I'll sync it later. > > > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES > > ---------------------------------------------------------------------- > > diff --git a/CHANGES b/CHANGES > > index 6b9a3bb..c613f00 100644 > > --- a/CHANGES > > +++ b/CHANGES > > @@ -1,6 +1,9 @@ > > -*- coding: > utf-8 > > -*- > > Changes with Apache Traffic Server 3.5.0 > > > > + > > + *) TS-2089: introduce configurable collation preproc threads > > + > > *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned > > needlessly chowned to to ATS' user. > > Author: Tomasz Kuzemko <tom...@kuzemko.net> > > > > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc > > ---------------------------------------------------------------------- > > diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc > > index cfc2267..9402581 100644 > > --- a/mgmt/RecordsConfig.cc > > +++ b/mgmt/RecordsConfig.cc > > @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = { > > , > > {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT, > > "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL} > > , > > + {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT, > "1", > > RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL} > > + , > > {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1", > > RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL} > > , > > {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT, > "86400", > > RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL} > > > > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc > > ---------------------------------------------------------------------- > > diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc > > index 0798c43..ed71560 100644 > > --- a/mgmt/cli/ShowCmd.cc > > +++ b/mgmt/cli/ShowCmd.cc > > @@ -1555,6 +1555,7 @@ ShowLogging() > > TSInt collation_port = -1; > > TSString collation_secret = NULL; > > TSInt host_tag = 0; > > + TSInt preproc_threads = 0; > > TSInt orphan_space = -1; > > > > TSInt squid_log = 0; > > @@ -1596,6 +1597,7 @@ ShowLogging() > > Cli_RecordGetString("proxy.config.log.collation_secret", > > &collation_secret); > > Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag); > > Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs", > > &orphan_space); > > + Cli_RecordGetInt("proxy.config.log.collation_preproc_threads", > > &preproc_threads); > > > > Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log); > > Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii); > > @@ -1657,6 +1659,7 @@ ShowLogging() > > Cli_Printf(" Port ----------------------------------- %d\n", > > collation_port); > > Cli_Printf(" Secret --------------------------------- %s\n", > > collation_secret); > > Cli_PrintEnable(" Host Tagged ---------------------------- ", > host_tag); > > + Cli_PrintEnable(" Preproc Threads ------------------------ ", > > preproc_threads); > > Cli_Printf(" Space Limit for Orphan Files ----------- %d MB\n", > > orphan_space); > > > > Cli_PrintEnable("\nSquid Format ----------------------------- ", > > squid_log); > > > > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc > > ---------------------------------------------------------------------- > > diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc > > index 94d5625..1361978 100644 > > --- a/proxy/logging/Log.cc > > +++ b/proxy/logging/Log.cc > > @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects; > > size_t Log::maxInactiveObjects; > > > > // Flush thread stuff > > -volatile unsigned long Log::flush_counter = 0; > > This bit is fascinating: We're replacing the (seemingly) > unused volatile variable flush_counter with the already > existing variable m_bytes_written.. > > We turn that variable into a volatile, and continue to > use it as if nothing happened. But here's the question: > why was flush_counter not used? > Log::flush_counter is unused in the original code. I don't know why it exists. It seems a deprecated code. Why change LogFile.m_bytes_written to volatile is that now we have multiple preproc threads and one flush thread, those threads may change m_bytes_written concurrently. > > -ink_mutex Log::flush_mutex; > > -ink_cond Log::flush_cond; > > -ink_thread Log::flush_thread; > > +ink_mutex *Log::preproc_mutex; > > +ink_cond *Log::preproc_cond; > > +ink_mutex *Log::flush_mutex; > > +ink_cond *Log::flush_cond; > > +InkAtomicList *Log::flush_data_list; > > > > // Collate thread stuff > > ink_mutex Log::collate_mutex; > > ink_cond Log::collate_cond; > > ink_thread Log::collate_thread; > > int Log::collation_accept_file_descriptor; > > +int Log::collation_preproc_threads; > > int Log::collation_port; > > > > // Log private objects > > @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object) > > > > struct PeriodicWakeup; > > typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *); > > -struct PeriodicWakeup : Continuation { > > +struct PeriodicWakeup : Continuation > > +{ > > + int m_preproc_threads; > > + int m_flush_threads; > > + > > int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) > > { > > - ink_cond_signal (&Log::flush_cond); > > - return EVENT_CONT; > > + for (int i = 0; i < m_preproc_threads; i++) { > > + ink_cond_signal (&Log::preproc_cond[i]); > > + } > > + for (int i = 0; i < m_flush_threads; i++) { > > + ink_cond_signal (&Log::flush_cond[i]); > > + } > > + return EVENT_CONT; > > } > > > > - PeriodicWakeup () : Continuation (new_ProxyMutex()) > > + PeriodicWakeup (int preproc_threads, int flush_threads) : > > + Continuation (new_ProxyMutex()), > > + m_preproc_threads(preproc_threads), > > + m_flush_threads(flush_threads) > > { > > - SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup); > > + SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup); > > } > > }; > > > > @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now) > > > /*------------------------------------------------------------------------- > > MAIN INTERFACE > > > -------------------------------------------------------------------------*/ > > +struct LoggingPreprocContinuation: public Continuation > > +{ > > + int m_idx; > > + > > + int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED > */) > > + { > > + Log::preproc_thread_main((void *)&m_idx); > > + return 0; > > + } > > + > > + LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx) > > + { > > + SET_HANDLER(&LoggingPreprocContinuation::mainEvent); > > + } > > +}; > > + > > struct LoggingFlushContinuation: public Continuation > > { > > + int m_idx; > > + > > int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED > */) > > { > > - Log::flush_thread_main(NULL); > > + Log::flush_thread_main((void *)&m_idx); > > return 0; > > } > > > > - LoggingFlushContinuation():Continuation(NULL) > > + LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx) > > { > > SET_HANDLER(&LoggingFlushContinuation::mainEvent); > > } > > @@ -910,6 +942,7 @@ Log::init(int flags) > > numInactiveObjects = 0; > > inactive_objects = new LogObject*[maxInactiveObjects]; > > > > + collation_preproc_threads = 1; > > collation_accept_file_descriptor = NO_FD; > > > > // store the configuration flags > > @@ -931,6 +964,7 @@ Log::init(int flags) > > > > config->read_configuration_variables(); > > collation_port = config->collation_port; > > + collation_preproc_threads = config->collation_preproc_threads; > > > > if (config_flags & STANDALONE_COLLATOR) { > > logging_mode = LOG_TRANSACTIONS_ONLY; > > @@ -959,8 +993,8 @@ Log::init(int flags) > > create_threads(); > > > > #ifndef INK_SINGLE_THREADED > > - eventProcessor.schedule_every(NEW (new PeriodicWakeup()), > HRTIME_SECOND, > > - ET_CALL); > > + eventProcessor.schedule_every(NEW (new > > PeriodicWakeup(collation_preproc_threads, 1)), > > + HRTIME_SECOND, ET_CALL); > > #endif > > init_status |= PERIODIC_WAKEUP_SCHEDULED; > > > > @@ -1001,9 +1035,16 @@ Log::init_when_enabled() > > // setup global scrap object > > // > > global_scrap_format = NEW(new LogFormat(TEXT_LOG)); > > - global_scrap_object = NEW(new LogObject(global_scrap_format, > > Log::config->logfile_dir, "scrapfile.log", BINARY_LOG, > > - NULL, > > Log::config->rolling_enabled, Log::config->rolling_interval_sec, > > - > Log::config->rolling_offset_hr, > > Log::config->rolling_size_mb)); > > + global_scrap_object = > > + NEW(new LogObject(global_scrap_format, > > + Log::config->logfile_dir, > > + "scrapfile.log", > > + BINARY_LOG, NULL, > > + Log::config->rolling_enabled, > > + Log::config->collation_preproc_threads, > > + Log::config->rolling_interval_sec, > > + Log::config->rolling_offset_hr, > > + Log::config->rolling_size_mb)); > > > > // create the flush thread and the collation thread > > // > > @@ -1030,15 +1071,43 @@ Log::create_threads() > > > > REC_ReadConfigInteger(stacksize, > "proxy.config.thread.default.stacksize"); > > if (!(init_status & THREADS_CREATED)) { > > - // start the flush thread > > + > > + char desc[64]; > > + preproc_mutex = new ink_mutex[collation_preproc_threads]; > > + preproc_cond = new ink_cond[collation_preproc_threads]; > > + > > + size_t stacksize; > > + REC_ReadConfigInteger(stacksize, > "proxy.config.thread.default.stacksize"); > > This ^ seems like an unnecessary step, that could be pushed into ... > > > + > > + // start the preproc threads > > // > > // no need for the conditional var since it will be relying on > > // on the event system. > > - ink_mutex_init(&flush_mutex, "Flush thread mutex"); > > - ink_cond_init(&flush_cond); > > - Continuation *flush_continuation = NEW(new > LoggingFlushContinuation); > > - Event *flush_event = eventProcessor.spawn_thread(flush_continuation, > > "[LOGGING]", stacksize); > > - flush_thread = flush_event->ethread->tid; > > + for (int i = 0; i < collation_preproc_threads; i++) { > > + sprintf(desc, "Logging preproc thread mutex[%d]", i); > > + ink_mutex_init(&preproc_mutex[i], desc); > > + ink_cond_init(&preproc_cond[i]); > > + Continuation *preproc_cont = NEW(new > LoggingPreprocContinuation(i)); > > + sprintf(desc, "[LOG_PREPROC %d]", i); > > + eventProcessor.spawn_thread(preproc_cont, desc, stacksize); > > spawn_thread() as a default.. i.e.: > > eventProcessor::spawn_thread(Continuation, const char * desc, int > stacksize -1) > { > size_t thread_stacksize; > if stacksize -1 { > REC_ReadConfigInteger(thread_stacksize, > "proxy.config.thread.default.stacksize") > } else { > thread_stacksize = stacksize > } > > } > Yes, agree. I think it's better to give another patch to improve spawn_thread(). > > > > > > > /*------------------------------------------------------------------------- > > +<<<<<<< HEAD > > LogFile::write_and_try_delete > > +======= > > + LogFile::preproc_and_try_delete > > + > > + preprocess the given buffer data before write to target file > > + and try to delete it when its reference become zero. > > +>>>>>>> TS-2089: introduce configurable collation preproc threads > > > To reiterate a point Leif made recently about one of *my* commits: > We should make sure that every single commit actually compiles.. > Yes, it's comments:D Thanks for your reviewing. > > Anyway, that's all I have for now. > -- i > Igor Galić > > Tel: +43 (0) 664 886 22 883 > Mail: i.ga...@brainsware.org > URL: http://brainsware.org/ > GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE > -- Yunkai Zhang Work at Taobao