http connections and CLOSE_WAIT
We are using pre11 right now. When we open http connections, they hang around for a long time in CLOSE_WAIT which results in really spiky performance. When the connections close, its fast, then they build up again, Is there something that needs to be configured w/ riak to get it to reuse the sockets and not sit in close wait? -- Ce n'est pas une signature ___ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
[no subject]
I'm seeing something very odd trying to scale out part of code I'm working on. It runs inside of Storm and lookups up entries from 10 node riak cluster. I've hit a wall that we can't get past. We are looking up entries (json representation of a job) in batches of 100 from Riak, each batch gets handled by a bolt in Storm, adding more bolts (an instance of the bolt class with a dedicated thread) results in no increase in performance. I instrumted the code and saw that waiting for all riak futures to finish increases as more bolts are added. Thinking that perhaps there was contention around the RiakCluster object that we were sharing per jvm, I tried giving each bolt instance its own cluster object and there wasn't any change. Note that changing Thread spool size given to withExecutor not withExecutionAttempts value has any impact. We're working off of the develop branch for the java client. We've been using d3cc30d but I also tried with cef7570 and had the same issue. A simplied version of the scala code running this: // called once upon bolt initialization. def prepare(config: JMap[_, _], context: TopologyContext, collector: OutputCollector): Unit = { ... val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to 10).map(n => s"riak-beavis-$n").toList.asJava) riak = new RiakCluster.Builder(nodes) .withExecutionAttempts(1) .withExecutor(new ScheduledThreadPoolExecutor(200)) .build() riak.start ... } private def get(jobLocationId: String): RiakFuture[FetchOperation.Response] = { val location = new Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId) val fop = new FetchOperation.Builder(location).withTimeout(75).withR(1).build riak.execute(fop) } def execute(tuple: Tuple): Unit = { val indexType = tuple.getStringByField("index_type") val indexName = tuple.getStringByField("index_name") val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]] var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = Set.empty // this always returns in a standard time based on batch size time("dispatch-calls") { lookups = batch.filter(_.key.isDefined).map { payload => {(payload, get(payload.key.get))} } } val futures = lookups.map(_._2) // this is what takes longer and longer when more bolts are added. // it doesnt matter what the sleep time is. time("waiting-on-futures") { while (futures.count(!_.isDone) > 0) { Thread.sleep(25L) } } // everything from here to the end returns in a fixed amount of time // and doesn't change with the number of bolts ... } It seems like we are running into contention somewhere in the riak java client. My first thought was the LinkedBlockingQueue that serves as the retry queue in RiakCluster but, I've tried running with only a single execution attempt as well as a custom client version where I removed all retries from the codebase and still experience the same problem. Any thoughts? ___ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
oddness when using java client within storm
I'm seeing something very odd trying to scale out part of code I'm working on. It runs inside of Storm and lookups up entries from 10 node riak cluster. I've hit a wall that we can't get past. We are looking up entries (json representation of a job) in batches of 100 from Riak, each batch gets handled by a bolt in Storm, adding more bolts (an instance of the bolt class with a dedicated thread) results in no increase in performance. I instrumted the code and saw that waiting for all riak futures to finish increases as more bolts are added. Thinking that perhaps there was contention around the RiakCluster object that we were sharing per jvm, I tried giving each bolt instance its own cluster object and there wasn't any change. Note that changing Thread spool size given to withExecutor not withExecutionAttempts value has any impact. We're working off of the develop branch for the java client. We've been using d3cc30d but I also tried with cef7570 and had the same issue. A simplied version of the scala code running this: // called once upon bolt initialization. def prepare(config: JMap[_, _], context: TopologyContext, collector: OutputCollector): Unit = { ... val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to 10).map(n => s"riak-beavis-$n").toList.asJava) riak = new RiakCluster.Builder(nodes) // varying this has made no difference .withExecutionAttempts(1) // nor has varying this .withExecutor(new ScheduledThreadPoolExecutor(200)) .build() riak.start ... } private def get(jobLocationId: String): RiakFuture[FetchOperation.Response] = { val location = new Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId) val fop = new FetchOperation.Builder(location).withTimeout(75).withR(1).build riak.execute(fop) } def execute(tuple: Tuple): Unit = { val indexType = tuple.getStringByField("index_type") val indexName = tuple.getStringByField("index_name") val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]] var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = Set.empty // this always returns in a standard time based on batch size time("dispatch-calls") { lookups = batch.filter(_.key.isDefined).map { payload => {(payload, get(payload.key.get))} } } val futures = lookups.map(_._2) // this is what takes longer and longer when more bolts are added. // it doesnt matter what the sleep time is. time("waiting-on-futures") { while (futures.count(!_.isDone) > 0) { Thread.sleep(25L) } } // everything from here to the end returns in a fixed amount of time // and doesn't change with the number of bolts ... } It seems like we are running into contention somewhere in the riak java client. My first thought was the LinkedBlockingQueue that serves as the retry queue in RiakCluster but, I've tried running with only a single execution attempt as well as a custom client version where I removed all retries from the codebase and still experience the same problem. I'm still digging through the code looking for possible points of contention. Any thoughts? ___ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Re: oddness when using java client within storm
We fire off 100 requests for the items in the batch and wait on the futures to complete. On Mon, Apr 14, 2014 at 11:40 AM, Alexander Sicular wrote: > I'm not sure what "looking up entries... in batches of 100 from Riak" > devolves into in the java client but riak doesn't have a native multiget. > It either does 100 get ops or a [search>]mapreduce. That might inform some > of your performance issues. > > -Alexander > > @siculars > http://siculars.posthaven.com > > Sent from my iRotaryPhone > > > On Apr 14, 2014, at 8:26, Sean Allen > wrote: > > > > I'm seeing something very odd trying to scale out part of code I'm > working on. > > > > It runs inside of Storm and lookups up entries from 10 node riak cluster. > > I've hit a wall that we can't get past. We are looking up entries (json > representation of a job) > > in batches of 100 from Riak, each batch gets handled by a bolt in Storm, > adding more > > bolts (an instance of the bolt class with a dedicated thread) results in > no increase > > in performance. I instrumted the code and saw that waiting for all riak > futures to finish > > increases as more bolts are added. Thinking that perhaps there was > contention around the > > RiakCluster object that we were sharing per jvm, I tried giving each > bolt instance its own > > cluster object and there wasn't any change. > > > > Note that changing Thread spool size given to withExecutor not > withExecutionAttempts value > > has any impact. > > > > We're working off of the develop branch for the java client. We've been > using d3cc30d but I also tried with cef7570 and had the same issue. > > > > A simplied version of the scala code running this: > > > > // called once upon bolt initialization. > > def prepare(config: JMap[_, _], > > context: TopologyContext, > > collector: OutputCollector): Unit = { > > ... > > > > val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to > 10).map(n => s"riak-beavis-$n").toList.asJava) > > riak = new RiakCluster.Builder(nodes) > > // varying this has made no difference > > .withExecutionAttempts(1) > > // nor has varying this > > .withExecutor(new ScheduledThreadPoolExecutor(200)) > > .build() > > riak.start > > > > ... > > } > > > > private def get(jobLocationId: String): > RiakFuture[FetchOperation.Response] = { > > val location = new > Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId) > > val fop = new > FetchOperation.Builder(location).withTimeout(75).withR(1).build > > > > riak.execute(fop) > > } > > > > def execute(tuple: Tuple): Unit = { > > val indexType = tuple.getStringByField("index_type") > > val indexName = tuple.getStringByField("index_name") > > val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]] > > > > var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = > Set.empty > > > > // this always returns in a standard time based on batch size > > time("dispatch-calls") { > > lookups = batch.filter(_.key.isDefined).map { > > payload => {(payload, get(payload.key.get))} > > } > > } > > > > val futures = lookups.map(_._2) > > > > // this is what takes longer and longer when more bolts are added. > > // it doesnt matter what the sleep time is. > > time("waiting-on-futures") { > > while (futures.count(!_.isDone) > 0) { > > Thread.sleep(25L) > > } > > } > > > > > > // everything from here to the end returns in a fixed amount of time > > // and doesn't change with the number of bolts > > ... > > > > } > > > > > > It seems like we are running into contention somewhere in the riak java > client. > > My first thought was the LinkedBlockingQueue that serves as the retry > queue in RiakCluster > > but, I've tried running with only a single execution attempt as well as > a custom client > > version where I removed all retries from the codebase and still > experience the same problem. > > > > I'm still digging through the code looking for possible points of > contention. > > > > Any thoughts? > > > > ___ > > riak-users mailing list > > riak-users@lists.basho.com > > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com > -- Ce n'est pas une signature ___ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Re: oddness when using java client within storm
Protocol Buffer. On Mon, Apr 14, 2014 at 11:53 AM, Russell Brown wrote: > HTTP or PB? Pretty sure the HTTP client defaults to a pool of 50 > connections. > > On 14 Apr 2014, at 16:50, Sean Allen wrote: > > We fire off 100 requests for the items in the batch and wait on the > futures to complete. > > > On Mon, Apr 14, 2014 at 11:40 AM, Alexander Sicular wrote: > >> I'm not sure what "looking up entries... in batches of 100 from Riak" >> devolves into in the java client but riak doesn't have a native multiget. >> It either does 100 get ops or a [search>]mapreduce. That might inform some >> of your performance issues. >> >> -Alexander >> >> @siculars >> http://siculars.posthaven.com >> >> Sent from my iRotaryPhone >> >> > On Apr 14, 2014, at 8:26, Sean Allen >> wrote: >> > >> > I'm seeing something very odd trying to scale out part of code I'm >> working on. >> > >> > It runs inside of Storm and lookups up entries from 10 node riak >> cluster. >> > I've hit a wall that we can't get past. We are looking up entries (json >> representation of a job) >> > in batches of 100 from Riak, each batch gets handled by a bolt in >> Storm, adding more >> > bolts (an instance of the bolt class with a dedicated thread) results >> in no increase >> > in performance. I instrumted the code and saw that waiting for all riak >> futures to finish >> > increases as more bolts are added. Thinking that perhaps there was >> contention around the >> > RiakCluster object that we were sharing per jvm, I tried giving each >> bolt instance its own >> > cluster object and there wasn't any change. >> > >> > Note that changing Thread spool size given to withExecutor not >> withExecutionAttempts value >> > has any impact. >> > >> > We're working off of the develop branch for the java client. We've been >> using d3cc30d but I also tried with cef7570 and had the same issue. >> > >> > A simplied version of the scala code running this: >> > >> > // called once upon bolt initialization. >> > def prepare(config: JMap[_, _], >> > context: TopologyContext, >> > collector: OutputCollector): Unit = { >> > ... >> > >> > val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to >> 10).map(n => s"riak-beavis-$n").toList.asJava) >> > riak = new RiakCluster.Builder(nodes) >> > // varying this has made no difference >> > .withExecutionAttempts(1) >> > // nor has varying this >> > .withExecutor(new ScheduledThreadPoolExecutor(200)) >> > .build() >> > riak.start >> > >> > ... >> > } >> > >> > private def get(jobLocationId: String): >> RiakFuture[FetchOperation.Response] = { >> > val location = new >> Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId) >> > val fop = new >> FetchOperation.Builder(location).withTimeout(75).withR(1).build >> > >> > riak.execute(fop) >> > } >> > >> > def execute(tuple: Tuple): Unit = { >> > val indexType = tuple.getStringByField("index_type") >> > val indexName = tuple.getStringByField("index_name") >> > val batch = >> tuple.getValueByField("batch").asInstanceOf[Set[Payload]] >> > >> > var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = >> Set.empty >> > >> > // this always returns in a standard time based on batch size >> > time("dispatch-calls") { >> > lookups = batch.filter(_.key.isDefined).map { >> > payload => {(payload, get(payload.key.get))} >> > } >> > } >> > >> > val futures = lookups.map(_._2) >> > >> > // this is what takes longer and longer when more bolts are added. >> > // it doesnt matter what the sleep time is. >> > time("waiting-on-futures") { >> > while (futures.count(!_.isDone) > 0) { >> > Thread.sleep(25L) >> > } >> > } >> > >> > >> > // everything from here to the end returns in a fixed amount of time >> > // and doesn't change with
Re: oddness when using java client within storm
Thanks Brian- playing with min connections doesn't make a difference. i'm cracking open the netty threads and other configuration in the morning. -Sean- On Mon, Apr 14, 2014 at 2:35 PM, Brian Roach wrote: > Sean - > > Sadly I've not gotten to tuning anything yet in the new client ... the > terrors of pre-release :) > > One thing is that by default the connection pool only keeps one > connection around (for each RiakNode in the RiakCluster) and will time > out any others after one second. > > You might try bumping that up to 10 per node with the > withMinConnections() option in the RiakNode builder so that client > isn't creating new connections each time you fire off 100 requests. > > Thinking about it this may be a culprit as the TCP connect is handled > synchronously when it's needed; basically, you're not getting a future > back from RiakCluster.execute() until a connection is returned from > the pool, and if a new connection needs to be made, there's that > overhead there. > > I'm using all default settings in Netty in terms of threads, etc, so > it may be something there as well ... but as I said, I haven't gotten > to trying to tune for performance yet. > > Thanks, > - Roach > > On Mon, Apr 14, 2014 at 10:10 AM, Sean Allen > wrote: > > Protocol Buffer. > > > > > > On Mon, Apr 14, 2014 at 11:53 AM, Russell Brown > > wrote: > >> > >> HTTP or PB? Pretty sure the HTTP client defaults to a pool of 50 > >> connections. > >> > >> On 14 Apr 2014, at 16:50, Sean Allen > wrote: > >> > >> We fire off 100 requests for the items in the batch and wait on the > >> futures to complete. > >> > >> > >> On Mon, Apr 14, 2014 at 11:40 AM, Alexander Sicular > > >> wrote: > >>> > >>> I'm not sure what "looking up entries... in batches of 100 from Riak" > >>> devolves into in the java client but riak doesn't have a native > multiget. It > >>> either does 100 get ops or a [search>]mapreduce. That might inform > some of > >>> your performance issues. > >>> > >>> -Alexander > >>> > >>> @siculars > >>> http://siculars.posthaven.com > >>> > >>> Sent from my iRotaryPhone > >>> > >>> > On Apr 14, 2014, at 8:26, Sean Allen > >>> > wrote: > >>> > > >>> > I'm seeing something very odd trying to scale out part of code I'm > >>> > working on. > >>> > > >>> > It runs inside of Storm and lookups up entries from 10 node riak > >>> > cluster. > >>> > I've hit a wall that we can't get past. We are looking up entries > (json > >>> > representation of a job) > >>> > in batches of 100 from Riak, each batch gets handled by a bolt in > >>> > Storm, adding more > >>> > bolts (an instance of the bolt class with a dedicated thread) results > >>> > in no increase > >>> > in performance. I instrumted the code and saw that waiting for all > riak > >>> > futures to finish > >>> > increases as more bolts are added. Thinking that perhaps there was > >>> > contention around the > >>> > RiakCluster object that we were sharing per jvm, I tried giving each > >>> > bolt instance its own > >>> > cluster object and there wasn't any change. > >>> > > >>> > Note that changing Thread spool size given to withExecutor not > >>> > withExecutionAttempts value > >>> > has any impact. > >>> > > >>> > We're working off of the develop branch for the java client. We've > been > >>> > using d3cc30d but I also tried with cef7570 and had the same issue. > >>> > > >>> > A simplied version of the scala code running this: > >>> > > >>> > // called once upon bolt initialization. > >>> > def prepare(config: JMap[_, _], > >>> > context: TopologyContext, > >>> > collector: OutputCollector): Unit = { > >>> > ... > >>> > > >>> > val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 > to > >>> > 10).map(n => s"riak-beavis-$n").toList.asJava) > >>> > riak = new RiakCluster.Builder(nodes) > >>> > //
configuration problem: riak.conf being ignored
riak-2 we have the following in /etc/riak/riak.conf erlang.max_ports = 65536 erlang.max_ets_tables = 262144 however i get the following on startup: 2014-04-24 19:12:09.697 [warning] <0.799.0> riak_kv_env: Erlang ports limit of 1024 is low, at least 64000 is recommended 2014-04-24 19:12:09.698 [warning] <0.799.0> riak_kv_env: ETS table count limit of 1400 is low, at least 256000 is recommended. So it appears that riak.conf isn't actually being used. Pointers? -- Ce n'est pas une signature ___ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Re: configuration problem: riak.conf being ignored
More on this looking at the command line that started it, did/does pre-20 erlang not pick up the vm options correctly? /usr/lib64/riak/erts-5.10.3/bin/beam.smp -P 256000 -e 262144 -Q 65536 -A 64 -K true -W w -- -root /usr/lib64/riak -progname riak -- -home /opt/riak -- -boot /usr/lib64/riak/releases/2.0.0pre20/riak -config /var/lib/riak/generated.configs/app.2014.04.24.19.12.05.config -setcookie riak -name * -smp enable -vm_args /var/lib/riak/generated.configs/vm.2014.04.24.19.12.05.args -pa /usr/lib64/riak/lib/basho-patches -- console (hostname removed in the above) On Fri, Apr 25, 2014 at 5:40 AM, Sean Allen wrote: > riak-2 > > we have the following in /etc/riak/riak.conf > > > erlang.max_ports = 65536 > erlang.max_ets_tables = 262144 > > however i get the following on startup: > > 2014-04-24 19:12:09.697 [warning] <0.799.0> riak_kv_env: Erlang ports > limit of 1024 is low, at least 64000 is recommended > 2014-04-24 19:12:09.698 [warning] <0.799.0> riak_kv_env: ETS table count > limit of 1400 is low, at least 256000 is recommended. > > So it appears that riak.conf isn't actually being used. Pointers? > > > -- > > Ce n'est pas une signature > -- Ce n'est pas une signature ___ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
Re: configuration problem: riak.conf being ignored
Setting ERL_MAX_PORTS and ERL_MAX_ETS_TABLES did have an effect. On Fri, Apr 25, 2014 at 5:47 AM, Sean Allen wrote: > More on this looking at the command line that started it, > > did/does pre-20 erlang not pick up the vm options correctly? > > /usr/lib64/riak/erts-5.10.3/bin/beam.smp -P 256000 -e 262144 -Q 65536 -A > 64 -K true -W w -- -root /usr/lib64/riak -progname riak -- -home /opt/riak > -- -boot /usr/lib64/riak/releases/2.0.0pre20/riak -config > /var/lib/riak/generated.configs/app.2014.04.24.19.12.05.config -setcookie > riak -name * -smp enable -vm_args > /var/lib/riak/generated.configs/vm.2014.04.24.19.12.05.args -pa > /usr/lib64/riak/lib/basho-patches -- console > > (hostname removed in the above) > > > > On Fri, Apr 25, 2014 at 5:40 AM, Sean Allen > wrote: > >> riak-2 >> >> we have the following in /etc/riak/riak.conf >> >> >> erlang.max_ports = 65536 >> erlang.max_ets_tables = 262144 >> >> however i get the following on startup: >> >> 2014-04-24 19:12:09.697 [warning] <0.799.0> riak_kv_env: Erlang ports >> limit of 1024 is low, at least 64000 is recommended >> 2014-04-24 19:12:09.698 [warning] <0.799.0> riak_kv_env: ETS table count >> limit of 1400 is low, at least 256000 is recommended. >> >> So it appears that riak.conf isn't actually being used. Pointers? >> >> >> -- >> >> Ce n'est pas une signature >> > > > > -- > > Ce n'est pas une signature > -- Ce n'est pas une signature ___ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com