On Mon, Oct 31, 2011 at 2:01 PM, Rusty Klophaus <ru...@basho.com> wrote:

> Thanks for your excellent description of the problem. We haven't seen this
> before to my knowledge, and this isn't expected behavior.
> Also, if you can share your code, or if you have a small script that can
> reproduce the failure, that would be extremely helpful.
>

I created a small test script that reliable reproduces the issue, but I
created another version that creates truly independent clients (distinct
processes) and I could not reproduce it.  So there issue must lie somewhere
in my Fiber based client software stack.  Somewhere within em-synchrony or
EventMachine some shared state must be getting clobbered at high processing
rates, or the high rate is causing EventMachine to return a short read
under some circumstances.

Apologies for the false alarm.

In case anyone is using Ruby and would like a Fiber based client for some
parallelism, this is the code:

#!/opt/ruby/bin/ruby

require 'em-synchrony'
require 'riak'
require 'json'

# Riak's PB client uses Thread local storage.  We change it to store the
socket in the client.
module Riak
  class Client
    class ProtobuffsBackend
      def socket
        @riakpbc_socket ||= new_socket
      end
      def reset_socket
        socket.close
        @riakpbc_socket = nil
      end
    end
  end
end

# Riak's PB client mishandles encodings when storing multibyte character
strings.  If tries
# to transcode to binary, which it can do as its multibyte, rather than
force binary encoding.
# It already correctly records the original encoding and forces it back
when loading the
# object.  Here we monkey patch the offending function.  Can't simply
define it within its module,
# as the module is included by the BeefcakeProtobuffsBackend and Ruby won't
include twice a module.
# So we defined it within the class that includes it.
# Fixed on latest beta version of gem.
Riak::Client::BeefcakeProtobuffsBackend.class_exec {
  def maybe_encode(string)
    ENCODING ? string.force_encoding('BINARY') : string
  end
}


# replace default Socket code to use EventMachine Sockets instead
TCPSocket = EventMachine::Synchrony::TCPSocket

hosts = [ "riak1", "riak2", "riak3" ]
num_hosts = hosts.size

key = 'test_key'.encode('US-ASCII')

EM.synchrony do
  concurrency = 12
  fibers = []
  concurrency.times do |i|
    fibers << Fiber.new do
      # set up our client
      puts "Creating client to #{hosts[i%num_hosts]}"
      c = Riak::Client.new :host => hosts[i%num_hosts], :protocol => "pbc"
      puts "Client created"
      raise "Could not connect to Riak." unless c.ping
      puts "Client ready."

      bucket = c['concur_test']
      EM::Synchrony.sleep(1)
      while true
        begin
          s = { rand(100000) => rand(100000) }
          o = bucket.get_or_new(key, :r => 2)
          # Here there be blowups.
          s = JSON.parse(o.raw_data).merge(s) if o.raw_data
          o.content_type = 'application/json'
          o.raw_data = s.to_json.encode('UTF-8')
          o.store(:returnbody => false, :w => 2, :dw => 0)
        rescue StopIteration
          break
        end
      end
    end
  end

  # Ruby has not method to wait for a set of fibers to all finish
  # so we just iterate over them, resuming them until they are all
  # done.
  puts "Starting up clients."
  fibers.each { |fiber| fiber.resume }
  EM::Synchrony.sleep(1)
  puts "Waiting for them to finish"
  while not fibers.empty?
    #puts "Iterating over fibers"
    fibers.each do |fiber|
      if not fiber.alive?
        puts "Found finished fiber."
        fibers.delete fiber
      end
    end
    #puts "Iterated over fibers."
    EM::Synchrony.sleep(1)
  end
  EventMachine.stop
end
_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to