HTTP or PB? Pretty sure the HTTP client defaults to a pool of 50 connections.

On 14 Apr 2014, at 16:50, Sean Allen <s...@monkeysnatchbanana.com> wrote:

> We fire off 100 requests for the items in the batch and wait on the futures 
> to complete.
> 
> 
> On Mon, Apr 14, 2014 at 11:40 AM, Alexander Sicular <sicul...@gmail.com> 
> wrote:
> I'm not sure what "looking up entries... in batches of 100 from Riak" 
> devolves into in the java client but riak doesn't have a native multiget. It 
> either does 100 get ops or a [search>]mapreduce. That might inform some of 
> your performance issues.
> 
> -Alexander
> 
> @siculars
> http://siculars.posthaven.com
> 
> Sent from my iRotaryPhone
> 
> > On Apr 14, 2014, at 8:26, Sean Allen <s...@monkeysnatchbanana.com> wrote:
> >
> > I'm seeing something very odd trying to scale out part of code I'm working 
> > on.
> >
> > It runs inside of Storm and lookups up entries from 10 node riak cluster.
> > I've hit a wall that we can't get past. We are looking up entries (json 
> > representation of a job)
> > in batches of 100 from Riak, each batch gets handled by a bolt in Storm, 
> > adding more
> > bolts (an instance of the bolt class with a dedicated thread) results in no 
> > increase
> > in performance. I instrumted the code and saw that waiting for all riak 
> > futures to finish
> > increases as more bolts are added. Thinking that perhaps there was 
> > contention around the
> > RiakCluster object that we were sharing per jvm, I tried giving each bolt 
> > instance its own
> > cluster object and there wasn't any change.
> >
> > Note that changing Thread spool size given to withExecutor not 
> > withExecutionAttempts value
> > has any impact.
> >
> > We're working off of the develop branch for the java client. We've been 
> > using d3cc30d but I also tried with cef7570 and had the same issue.
> >
> > A simplied version of the scala code running this:
> >
> >   // called once upon bolt initialization.
> >   def prepare(config: JMap[_, _],
> >               context: TopologyContext,
> >               collector: OutputCollector): Unit = {
> >     ...
> >
> >     val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to 
> > 10).map(n => s"riak-beavis-$n").toList.asJava)
> >     riak = new RiakCluster.Builder(nodes)
> >       // varying this has made no difference
> >       .withExecutionAttempts(1)
> >      // nor has varying this
> >       .withExecutor(new ScheduledThreadPoolExecutor(200))
> >       .build()
> >     riak.start
> >
> >     ...
> >   }
> >
> >   private def get(jobLocationId: String): 
> > RiakFuture[FetchOperation.Response] = {
> >     val location = new 
> > Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId)
> >     val fop = new 
> > FetchOperation.Builder(location).withTimeout(75).withR(1).build
> >
> >     riak.execute(fop)
> >   }
> >
> >   def execute(tuple: Tuple): Unit = {
> >     val indexType = tuple.getStringByField("index_type")
> >     val indexName = tuple.getStringByField("index_name")
> >     val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]]
> >
> >     var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = 
> > Set.empty
> >
> >     // this always returns in a standard time based on batch size
> >     time("dispatch-calls") {
> >       lookups = batch.filter(_.key.isDefined).map {
> >         payload => {(payload, get(payload.key.get))}
> >       }
> >     }
> >
> >     val futures = lookups.map(_._2)
> >
> >     // this is what takes longer and longer when more bolts are added.
> >     // it doesnt matter what the sleep time is.
> >     time("waiting-on-futures") {
> >       while (futures.count(!_.isDone) > 0) {
> >         Thread.sleep(25L)
> >       }
> >     }
> >
> >
> >     // everything from here to the end returns in a fixed amount of time
> >     // and doesn't change with the number of bolts
> >     ...
> >
> >   }
> >
> >
> > It seems like we are running into contention somewhere in the riak java 
> > client.
> > My first thought was the LinkedBlockingQueue that serves as the retry queue 
> > in RiakCluster
> > but, I've tried running with only a single execution attempt as well as a 
> > custom client
> > version where I removed all retries from the codebase and still experience 
> > the same problem.
> >
> > I'm still digging through the code looking for possible points of 
> > contention.
> >
> > Any thoughts?
> >
> > _______________________________________________
> > riak-users mailing list
> > riak-users@lists.basho.com
> > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
> 
> 
> 
> -- 
> 
> Ce n'est pas une signature
> _______________________________________________
> riak-users mailing list
> riak-users@lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to