We fire off 100 requests for the items in the batch and wait on the futures
to complete.


On Mon, Apr 14, 2014 at 11:40 AM, Alexander Sicular <sicul...@gmail.com>wrote:

> I'm not sure what "looking up entries... in batches of 100 from Riak"
> devolves into in the java client but riak doesn't have a native multiget.
> It either does 100 get ops or a [search>]mapreduce. That might inform some
> of your performance issues.
>
> -Alexander
>
> @siculars
> http://siculars.posthaven.com
>
> Sent from my iRotaryPhone
>
> > On Apr 14, 2014, at 8:26, Sean Allen <s...@monkeysnatchbanana.com>
> wrote:
> >
> > I'm seeing something very odd trying to scale out part of code I'm
> working on.
> >
> > It runs inside of Storm and lookups up entries from 10 node riak cluster.
> > I've hit a wall that we can't get past. We are looking up entries (json
> representation of a job)
> > in batches of 100 from Riak, each batch gets handled by a bolt in Storm,
> adding more
> > bolts (an instance of the bolt class with a dedicated thread) results in
> no increase
> > in performance. I instrumted the code and saw that waiting for all riak
> futures to finish
> > increases as more bolts are added. Thinking that perhaps there was
> contention around the
> > RiakCluster object that we were sharing per jvm, I tried giving each
> bolt instance its own
> > cluster object and there wasn't any change.
> >
> > Note that changing Thread spool size given to withExecutor not
> withExecutionAttempts value
> > has any impact.
> >
> > We're working off of the develop branch for the java client. We've been
> using d3cc30d but I also tried with cef7570 and had the same issue.
> >
> > A simplied version of the scala code running this:
> >
> >   // called once upon bolt initialization.
> >   def prepare(config: JMap[_, _],
> >               context: TopologyContext,
> >               collector: OutputCollector): Unit = {
> >     ...
> >
> >     val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to
> 10).map(n => s"riak-beavis-$n").toList.asJava)
> >     riak = new RiakCluster.Builder(nodes)
> >       // varying this has made no difference
> >       .withExecutionAttempts(1)
> >      // nor has varying this
> >       .withExecutor(new ScheduledThreadPoolExecutor(200))
> >       .build()
> >     riak.start
> >
> >     ...
> >   }
> >
> >   private def get(jobLocationId: String):
> RiakFuture[FetchOperation.Response] = {
> >     val location = new
> Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId)
> >     val fop = new
> FetchOperation.Builder(location).withTimeout(75).withR(1).build
> >
> >     riak.execute(fop)
> >   }
> >
> >   def execute(tuple: Tuple): Unit = {
> >     val indexType = tuple.getStringByField("index_type")
> >     val indexName = tuple.getStringByField("index_name")
> >     val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]]
> >
> >     var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] =
> Set.empty
> >
> >     // this always returns in a standard time based on batch size
> >     time("dispatch-calls") {
> >       lookups = batch.filter(_.key.isDefined).map {
> >         payload => {(payload, get(payload.key.get))}
> >       }
> >     }
> >
> >     val futures = lookups.map(_._2)
> >
> >     // this is what takes longer and longer when more bolts are added.
> >     // it doesnt matter what the sleep time is.
> >     time("waiting-on-futures") {
> >       while (futures.count(!_.isDone) > 0) {
> >         Thread.sleep(25L)
> >       }
> >     }
> >
> >
> >     // everything from here to the end returns in a fixed amount of time
> >     // and doesn't change with the number of bolts
> >     ...
> >
> >   }
> >
> >
> > It seems like we are running into contention somewhere in the riak java
> client.
> > My first thought was the LinkedBlockingQueue that serves as the retry
> queue in RiakCluster
> > but, I've tried running with only a single execution attempt as well as
> a custom client
> > version where I removed all retries from the codebase and still
> experience the same problem.
> >
> > I'm still digging through the code looking for possible points of
> contention.
> >
> > Any thoughts?
> >
> > _______________________________________________
> > riak-users mailing list
> > riak-users@lists.basho.com
> > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>



-- 

Ce n'est pas une signature
_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to