I see the following error in consumer; Unable to Receive Message:kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition
On Wed, Apr 15, 2015 at 2:09 PM, Victor L <vlyamt...@gmail.com> wrote: > I set it to 10ms and it did fix cpu consumption problem but i don't see > any messages coming up in consumer queue > > On Wed, Apr 15, 2015 at 1:57 PM, Evan Huus <evan.h...@shopify.com> wrote: > >> The field is a time.Duration (https://golang.org/pkg/time/#Duration) and >> it >> is the maximum duration to buffer messages before triggering a flush to >> the >> broker. The default was 1 millisecond (this value does not even exist in >> the most recent release). Setting it to higher values trades off latency >> for efficiency, whatever your use case dictates. >> >> Evan >> >> On Wed, Apr 15, 2015 at 1:47 PM, Victor L <vlyamt...@gmail.com> wrote: >> >> > I inherited this code, it just set to 1000, what would be reasonable >> time: >> > 1000 * time.Millisecond? >> > >> > On Wed, Apr 15, 2015 at 1:41 PM, Evan Huus <evan.h...@shopify.com> >> wrote: >> > >> > > On Wed, Apr 15, 2015 at 1:33 PM, Victor L <vlyamt...@gmail.com> >> wrote: >> > > >> > > > Hi Evan, >> > > > It's hardcoded: >> > > > MaxBufferTime = 1000 >> > > > >> > > >> > > Without any units this defaults to nanoseconds, which means your >> timer is >> > > spinning every microsecond. You probably mean 1000 * time.Millisecond? >> > > >> > > >> > > > MaxBufferedBytes = 1024 >> > > > >> > > > How's it versioned? Should i just download zip file from project >> > github? >> > > > >> > > >> > > By default go will use whatever version you originally checked out >> with >> > `go >> > > get` or the like. You can use a properly versioned copy via >> > > http://gopkg.in/Shopify/sarama.v1. >> > > >> > > >> > > > On Wed, Apr 15, 2015 at 12:48 PM, Evan Huus <evan.h...@shopify.com >> > >> > > > wrote: >> > > > >> > > > > Hi Victor, two points: >> > > > > >> > > > > - Based on the backtrace, you are using a very old version of >> Sarama. >> > > You >> > > > > might have better luck using a more recent stable version. >> > > > > - Are you setting `MaxBufferTime` in the configuration to 0 or a >> very >> > > > small >> > > > > value? If so the loop will spin on that timer. Try making this >> value >> > > > > larger. >> > > > > >> > > > > Evan >> > > > > >> > > > > On Wed, Apr 15, 2015 at 12:35 PM, Victor L <vlyamt...@gmail.com> >> > > wrote: >> > > > > >> > > > > > I am using sarama "golang" kafka 1.8.1 client ( >> > > > > > https://github.com/Shopify/sarama) to send messages to message >> > > queue >> > > > > once >> > > > > > in 3secs and this task drives cpu consumption to 130% on the >> > quad-cpu >> > > > > > blade; The number stays this high regardless of number of >> > > > > > partitions/consumers.... According to results of profiling with >> > > > 'pprof', >> > > > > > most of the cycles are spent in producer.go/NewBrokerProducer; >> > > > > > Total: 47297 samples >> > > > > > 5947 12.6% 12.6% 15013 31.7% selectgo >> > > > > > 3617 7.6% 20.2% 3617 7.6% runtime.xchg >> > > > > > 2819 6.0% 26.2% 28506 60.3% >> > > > > github.com/Shopify/sarama.funcĀ·008 >> <http://github.com/Shopify/sarama.func%C2%B7008> >> > <http://github.com/Shopify/sarama.func%C2%B7008> >> > > <http://github.com/Shopify/sarama.func%C2%B7008> >> > > > <http://github.com/Shopify/sarama.func%C2%B7008> >> > > > > <http://github.com/Shopify/sarama.func%C2%B7008> >> > > > > > <http://github.com/Shopify/sarama.func%C2%B7008> >> > > > > > <http://github.com/Shopify/sarama.func%C2%B7008> >> > > > > > >> > > ---------------------------------------------------------------------- >> > > > > > NewBrokerProducer >> > > > > > ----------------------------------------------------------- >> > > > > > 229: go func() { >> > > > > > . . 230: timer := >> > > > > time.NewTimer(p.config.MaxBufferTime) >> > > > > > . . 231: var shutdownRequired bool >> > > > > > . . 232: wg.Done() >> > > > > > . . 233: for { >> > > > > > 208 17598 234: select { >> > > > > > 153 749 235: case <-bp.flushNow: >> > > > > > . . 236: if shutdownRequired = >> > > bp.flush(p); >> > > > > > shutdownRequired { >> > > > > > . . 237: goto shutdown >> > > > > > . . 238: } >> > > > > > 2230 3144 239: case <-timer.C: >> > > > > > 144 3606 240: if shutdownRequired = >> > > > > > bp.flushIfAnyMessages(p); shutdownRequired { >> > > > > > . . 241: goto shutdown >> > > > > > >> > > > > > >> > > > > > wonder if there's some known issue with this method or if anyone >> > > > already >> > > > > > seen it before... >> > > > > > Thank you, >> > > > > > >> > > > > >> > > > >> > > >> > >> > >