I'm seeing something very odd trying to scale out part of code I'm working on.
It runs inside of Storm and lookups up entries from 10 node riak cluster. I've hit a wall that we can't get past. We are looking up entries (json representation of a job) in batches of 100 from Riak, each batch gets handled by a bolt in Storm, adding more bolts (an instance of the bolt class with a dedicated thread) results in no increase in performance. I instrumted the code and saw that waiting for all riak futures to finish increases as more bolts are added. Thinking that perhaps there was contention around the RiakCluster object that we were sharing per jvm, I tried giving each bolt instance its own cluster object and there wasn't any change. Note that changing Thread spool size given to withExecutor not withExecutionAttempts value has any impact. We're working off of the develop branch for the java client. We've been using d3cc30d but I also tried with cef7570 and had the same issue. A simplied version of the scala code running this: // called once upon bolt initialization. def prepare(config: JMap[_, _], context: TopologyContext, collector: OutputCollector): Unit = { ... val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to 10).map(n => s"riak-beavis-$n").toList.asJava) riak = new RiakCluster.Builder(nodes) // varying this has made no difference .withExecutionAttempts(1) // nor has varying this .withExecutor(new ScheduledThreadPoolExecutor(200)) .build() riak.start ... } private def get(jobLocationId: String): RiakFuture[FetchOperation.Response] = { val location = new Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId) val fop = new FetchOperation.Builder(location).withTimeout(75).withR(1).build riak.execute(fop) } def execute(tuple: Tuple): Unit = { val indexType = tuple.getStringByField("index_type") val indexName = tuple.getStringByField("index_name") val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]] var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = Set.empty // this always returns in a standard time based on batch size time("dispatch-calls") { lookups = batch.filter(_.key.isDefined).map { payload => {(payload, get(payload.key.get))} } } val futures = lookups.map(_._2) // this is what takes longer and longer when more bolts are added. // it doesnt matter what the sleep time is. time("waiting-on-futures") { while (futures.count(!_.isDone) > 0) { Thread.sleep(25L) } } // everything from here to the end returns in a fixed amount of time // and doesn't change with the number of bolts ... } It seems like we are running into contention somewhere in the riak java client. My first thought was the LinkedBlockingQueue that serves as the retry queue in RiakCluster but, I've tried running with only a single execution attempt as well as a custom client version where I removed all retries from the codebase and still experience the same problem. I'm still digging through the code looking for possible points of contention. Any thoughts?
_______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com