I'm seeing something very odd trying to scale out part of code I'm working
on.

It runs inside of Storm and lookups up entries from 10 node riak cluster.
I've hit a wall that we can't get past. We are looking up entries (json
representation of a job)
in batches of 100 from Riak, each batch gets handled by a bolt in Storm,
adding more
bolts (an instance of the bolt class with a dedicated thread) results in no
increase
in performance. I instrumted the code and saw that waiting for all riak
futures to finish
increases as more bolts are added. Thinking that perhaps there was
contention around the
RiakCluster object that we were sharing per jvm, I tried giving each bolt
instance its own
cluster object and there wasn't any change.

Note that changing Thread spool size given to withExecutor not
withExecutionAttempts value
has any impact.

We're working off of the develop branch for the java client. We've been
using d3cc30d but I also tried with cef7570 and had the same issue.

A simplied version of the scala code running this:

  // called once upon bolt initialization.
  def prepare(config: JMap[_, _],
              context: TopologyContext,
              collector: OutputCollector): Unit = {
    ...

    val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to
10).map(n => s"riak-beavis-$n").toList.asJava)
    riak = new RiakCluster.Builder(nodes)
      // varying this has made no difference
      .withExecutionAttempts(1)
     // nor has varying this
      .withExecutor(new ScheduledThreadPoolExecutor(200))
      .build()
    riak.start

    ...
  }

  private def get(jobLocationId: String):
RiakFuture[FetchOperation.Response] = {
    val location = new
Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId)
    val fop = new
FetchOperation.Builder(location).withTimeout(75).withR(1).build

    riak.execute(fop)
  }

  def execute(tuple: Tuple): Unit = {
    val indexType = tuple.getStringByField("index_type")
    val indexName = tuple.getStringByField("index_name")
    val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]]

    var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] =
Set.empty

    // this always returns in a standard time based on batch size
    time("dispatch-calls") {
      lookups = batch.filter(_.key.isDefined).map {
        payload => {(payload, get(payload.key.get))}
      }
    }

    val futures = lookups.map(_._2)

    // this is what takes longer and longer when more bolts are added.
    // it doesnt matter what the sleep time is.
    time("waiting-on-futures") {
      while (futures.count(!_.isDone) > 0) {
        Thread.sleep(25L)
      }
    }


    // everything from here to the end returns in a fixed amount of time
    // and doesn't change with the number of bolts
    ...

  }


It seems like we are running into contention somewhere in the riak java
client.
My first thought was the LinkedBlockingQueue that serves as the retry queue
in RiakCluster
but, I've tried running with only a single execution attempt as well as a
custom client
version where I removed all retries from the codebase and still experience
the same problem.

I'm still digging through the code looking for possible points of
contention.

Any thoughts?
_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to