Protocol Buffer.
On Mon, Apr 14, 2014 at 11:53 AM, Russell Brown <russell.br...@me.com>wrote: > HTTP or PB? Pretty sure the HTTP client defaults to a pool of 50 > connections. > > On 14 Apr 2014, at 16:50, Sean Allen <s...@monkeysnatchbanana.com> wrote: > > We fire off 100 requests for the items in the batch and wait on the > futures to complete. > > > On Mon, Apr 14, 2014 at 11:40 AM, Alexander Sicular <sicul...@gmail.com>wrote: > >> I'm not sure what "looking up entries... in batches of 100 from Riak" >> devolves into in the java client but riak doesn't have a native multiget. >> It either does 100 get ops or a [search>]mapreduce. That might inform some >> of your performance issues. >> >> -Alexander >> >> @siculars >> http://siculars.posthaven.com >> >> Sent from my iRotaryPhone >> >> > On Apr 14, 2014, at 8:26, Sean Allen <s...@monkeysnatchbanana.com> >> wrote: >> > >> > I'm seeing something very odd trying to scale out part of code I'm >> working on. >> > >> > It runs inside of Storm and lookups up entries from 10 node riak >> cluster. >> > I've hit a wall that we can't get past. We are looking up entries (json >> representation of a job) >> > in batches of 100 from Riak, each batch gets handled by a bolt in >> Storm, adding more >> > bolts (an instance of the bolt class with a dedicated thread) results >> in no increase >> > in performance. I instrumted the code and saw that waiting for all riak >> futures to finish >> > increases as more bolts are added. Thinking that perhaps there was >> contention around the >> > RiakCluster object that we were sharing per jvm, I tried giving each >> bolt instance its own >> > cluster object and there wasn't any change. >> > >> > Note that changing Thread spool size given to withExecutor not >> withExecutionAttempts value >> > has any impact. >> > >> > We're working off of the develop branch for the java client. We've been >> using d3cc30d but I also tried with cef7570 and had the same issue. >> > >> > A simplied version of the scala code running this: >> > >> > // called once upon bolt initialization. >> > def prepare(config: JMap[_, _], >> > context: TopologyContext, >> > collector: OutputCollector): Unit = { >> > ... >> > >> > val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to >> 10).map(n => s"riak-beavis-$n").toList.asJava) >> > riak = new RiakCluster.Builder(nodes) >> > // varying this has made no difference >> > .withExecutionAttempts(1) >> > // nor has varying this >> > .withExecutor(new ScheduledThreadPoolExecutor(200)) >> > .build() >> > riak.start >> > >> > ... >> > } >> > >> > private def get(jobLocationId: String): >> RiakFuture[FetchOperation.Response] = { >> > val location = new >> Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId) >> > val fop = new >> FetchOperation.Builder(location).withTimeout(75).withR(1).build >> > >> > riak.execute(fop) >> > } >> > >> > def execute(tuple: Tuple): Unit = { >> > val indexType = tuple.getStringByField("index_type") >> > val indexName = tuple.getStringByField("index_name") >> > val batch = >> tuple.getValueByField("batch").asInstanceOf[Set[Payload]] >> > >> > var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = >> Set.empty >> > >> > // this always returns in a standard time based on batch size >> > time("dispatch-calls") { >> > lookups = batch.filter(_.key.isDefined).map { >> > payload => {(payload, get(payload.key.get))} >> > } >> > } >> > >> > val futures = lookups.map(_._2) >> > >> > // this is what takes longer and longer when more bolts are added. >> > // it doesnt matter what the sleep time is. >> > time("waiting-on-futures") { >> > while (futures.count(!_.isDone) > 0) { >> > Thread.sleep(25L) >> > } >> > } >> > >> > >> > // everything from here to the end returns in a fixed amount of time >> > // and doesn't change with the number of bolts >> > ... >> > >> > } >> > >> > >> > It seems like we are running into contention somewhere in the riak java >> client. >> > My first thought was the LinkedBlockingQueue that serves as the retry >> queue in RiakCluster >> > but, I've tried running with only a single execution attempt as well as >> a custom client >> > version where I removed all retries from the codebase and still >> experience the same problem. >> > >> > I'm still digging through the code looking for possible points of >> contention. >> > >> > Any thoughts? >> > >> > _______________________________________________ >> > riak-users mailing list >> > riak-users@lists.basho.com >> > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com >> > > > > -- > > Ce n'est pas une signature > _______________________________________________ > riak-users mailing list > riak-users@lists.basho.com > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com > > > -- Ce n'est pas une signature
_______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com