On Mon, Oct 31, 2011 at 2:01 PM, Rusty Klophaus <ru...@basho.com> wrote:
> Thanks for your excellent description of the problem. We haven't seen this > before to my knowledge, and this isn't expected behavior. > Also, if you can share your code, or if you have a small script that can > reproduce the failure, that would be extremely helpful. > I created a small test script that reliable reproduces the issue, but I created another version that creates truly independent clients (distinct processes) and I could not reproduce it. So there issue must lie somewhere in my Fiber based client software stack. Somewhere within em-synchrony or EventMachine some shared state must be getting clobbered at high processing rates, or the high rate is causing EventMachine to return a short read under some circumstances. Apologies for the false alarm. In case anyone is using Ruby and would like a Fiber based client for some parallelism, this is the code: #!/opt/ruby/bin/ruby require 'em-synchrony' require 'riak' require 'json' # Riak's PB client uses Thread local storage. We change it to store the socket in the client. module Riak class Client class ProtobuffsBackend def socket @riakpbc_socket ||= new_socket end def reset_socket socket.close @riakpbc_socket = nil end end end end # Riak's PB client mishandles encodings when storing multibyte character strings. If tries # to transcode to binary, which it can do as its multibyte, rather than force binary encoding. # It already correctly records the original encoding and forces it back when loading the # object. Here we monkey patch the offending function. Can't simply define it within its module, # as the module is included by the BeefcakeProtobuffsBackend and Ruby won't include twice a module. # So we defined it within the class that includes it. # Fixed on latest beta version of gem. Riak::Client::BeefcakeProtobuffsBackend.class_exec { def maybe_encode(string) ENCODING ? string.force_encoding('BINARY') : string end } # replace default Socket code to use EventMachine Sockets instead TCPSocket = EventMachine::Synchrony::TCPSocket hosts = [ "riak1", "riak2", "riak3" ] num_hosts = hosts.size key = 'test_key'.encode('US-ASCII') EM.synchrony do concurrency = 12 fibers = [] concurrency.times do |i| fibers << Fiber.new do # set up our client puts "Creating client to #{hosts[i%num_hosts]}" c = Riak::Client.new :host => hosts[i%num_hosts], :protocol => "pbc" puts "Client created" raise "Could not connect to Riak." unless c.ping puts "Client ready." bucket = c['concur_test'] EM::Synchrony.sleep(1) while true begin s = { rand(100000) => rand(100000) } o = bucket.get_or_new(key, :r => 2) # Here there be blowups. s = JSON.parse(o.raw_data).merge(s) if o.raw_data o.content_type = 'application/json' o.raw_data = s.to_json.encode('UTF-8') o.store(:returnbody => false, :w => 2, :dw => 0) rescue StopIteration break end end end end # Ruby has not method to wait for a set of fibers to all finish # so we just iterate over them, resuming them until they are all # done. puts "Starting up clients." fibers.each { |fiber| fiber.resume } EM::Synchrony.sleep(1) puts "Waiting for them to finish" while not fibers.empty? #puts "Iterating over fibers" fibers.each do |fiber| if not fiber.alive? puts "Found finished fiber." fibers.delete fiber end end #puts "Iterated over fibers." EM::Synchrony.sleep(1) end EventMachine.stop end
_______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com