Protocol Buffer.

On Mon, Apr 14, 2014 at 11:53 AM, Russell Brown <russell.br...@me.com>wrote:

> HTTP or PB? Pretty sure the HTTP client defaults to a pool of 50
> connections.
>
> On 14 Apr 2014, at 16:50, Sean Allen <s...@monkeysnatchbanana.com> wrote:
>
> We fire off 100 requests for the items in the batch and wait on the
> futures to complete.
>
>
> On Mon, Apr 14, 2014 at 11:40 AM, Alexander Sicular <sicul...@gmail.com>wrote:
>
>> I'm not sure what "looking up entries... in batches of 100 from Riak"
>> devolves into in the java client but riak doesn't have a native multiget.
>> It either does 100 get ops or a [search>]mapreduce. That might inform some
>> of your performance issues.
>>
>> -Alexander
>>
>> @siculars
>> http://siculars.posthaven.com
>>
>> Sent from my iRotaryPhone
>>
>> > On Apr 14, 2014, at 8:26, Sean Allen <s...@monkeysnatchbanana.com>
>> wrote:
>> >
>> > I'm seeing something very odd trying to scale out part of code I'm
>> working on.
>> >
>> > It runs inside of Storm and lookups up entries from 10 node riak
>> cluster.
>> > I've hit a wall that we can't get past. We are looking up entries (json
>> representation of a job)
>> > in batches of 100 from Riak, each batch gets handled by a bolt in
>> Storm, adding more
>> > bolts (an instance of the bolt class with a dedicated thread) results
>> in no increase
>> > in performance. I instrumted the code and saw that waiting for all riak
>> futures to finish
>> > increases as more bolts are added. Thinking that perhaps there was
>> contention around the
>> > RiakCluster object that we were sharing per jvm, I tried giving each
>> bolt instance its own
>> > cluster object and there wasn't any change.
>> >
>> > Note that changing Thread spool size given to withExecutor not
>> withExecutionAttempts value
>> > has any impact.
>> >
>> > We're working off of the develop branch for the java client. We've been
>> using d3cc30d but I also tried with cef7570 and had the same issue.
>> >
>> > A simplied version of the scala code running this:
>> >
>> >   // called once upon bolt initialization.
>> >   def prepare(config: JMap[_, _],
>> >               context: TopologyContext,
>> >               collector: OutputCollector): Unit = {
>> >     ...
>> >
>> >     val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to
>> 10).map(n => s"riak-beavis-$n").toList.asJava)
>> >     riak = new RiakCluster.Builder(nodes)
>> >       // varying this has made no difference
>> >       .withExecutionAttempts(1)
>> >      // nor has varying this
>> >       .withExecutor(new ScheduledThreadPoolExecutor(200))
>> >       .build()
>> >     riak.start
>> >
>> >     ...
>> >   }
>> >
>> >   private def get(jobLocationId: String):
>> RiakFuture[FetchOperation.Response] = {
>> >     val location = new
>> Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId)
>> >     val fop = new
>> FetchOperation.Builder(location).withTimeout(75).withR(1).build
>> >
>> >     riak.execute(fop)
>> >   }
>> >
>> >   def execute(tuple: Tuple): Unit = {
>> >     val indexType = tuple.getStringByField("index_type")
>> >     val indexName = tuple.getStringByField("index_name")
>> >     val batch =
>> tuple.getValueByField("batch").asInstanceOf[Set[Payload]]
>> >
>> >     var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] =
>> Set.empty
>> >
>> >     // this always returns in a standard time based on batch size
>> >     time("dispatch-calls") {
>> >       lookups = batch.filter(_.key.isDefined).map {
>> >         payload => {(payload, get(payload.key.get))}
>> >       }
>> >     }
>> >
>> >     val futures = lookups.map(_._2)
>> >
>> >     // this is what takes longer and longer when more bolts are added.
>> >     // it doesnt matter what the sleep time is.
>> >     time("waiting-on-futures") {
>> >       while (futures.count(!_.isDone) > 0) {
>> >         Thread.sleep(25L)
>> >       }
>> >     }
>> >
>> >
>> >     // everything from here to the end returns in a fixed amount of time
>> >     // and doesn't change with the number of bolts
>> >     ...
>> >
>> >   }
>> >
>> >
>> > It seems like we are running into contention somewhere in the riak java
>> client.
>> > My first thought was the LinkedBlockingQueue that serves as the retry
>> queue in RiakCluster
>> > but, I've tried running with only a single execution attempt as well as
>> a custom client
>> > version where I removed all retries from the codebase and still
>> experience the same problem.
>> >
>> > I'm still digging through the code looking for possible points of
>> contention.
>> >
>> > Any thoughts?
>> >
>> > _______________________________________________
>> > riak-users mailing list
>> > riak-users@lists.basho.com
>> > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>
>
>
>
> --
>
> Ce n'est pas une signature
>  _______________________________________________
> riak-users mailing list
> riak-users@lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>
>
>


-- 

Ce n'est pas une signature
_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to