Hi Deyan,

When running mapreduce jobs, reduce phases often end up being the bottleneck. 
This is especially true when all input data needs to be gathered on the 
coordinating node before it can be executed, as is the case if the 
reduce_phase_only_1 flag is enabled. Having this flag set will cause the 
mapreduce job to not scale very well.

Depending on your exact requirements, it may be worthwhile considering 
gathering the histogram data periodically, e.g. per hour and/or day. These 
aggregates can then be stored in separate buckets with a key that describes the 
content, e.g. <cust>_<setup>_<date> . Once this has been done, you can 
efficiently retrieve a limited number of objects that cover the period you want 
to get statistics for directly through the descriptive keys, and process these 
in the application layer. Even though this periodically requires a bit more 
work, it will most likely be much more efficient at query time and scale better.

Best regards,

Christian


On 14 Jul 2013, at 12:16, Deyan Dyankov <dyan...@cloudxcel.com> wrote:

> Hi everyone,
> 
> first time here. Thanks in advance.
> 
> I am experiencing issues with MapReduce and it seems to timeout after a 
> certain volume data threshold is reached. The reducer is only one and here is 
> the mapreduce initiation script:
> #!/usr/bin/env ruby
> […]
> @client = Riak::Client.new(
>   :nodes => [
>     {:host => 'db1', :pb_port => 8087, :http_port => 8098},
>     {:host => 'db2', :pb_port => 8087, :http_port => 8098},
>     {:host => 'db3', :pb_port => 8087, :http_port => 8098}
>   ],
>   :protocol => 'pbc'
> )
> 
> start_key = "#{cust}:#{setup}:#{start_time}"
> end_key = "#{cust}:#{setup}:#{end_time}"
> 
> result = Riak::MapReduce.new(@client).
>   index(bucket_name, index_name, start_key..end_key).
>   map('map95th').
>   reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => 
> true).
>   run()
> 
> puts result
> 
> The following is the code for the map95th and reduce95th javascript functions:
> function map95th(v, keyData, arg) {
>   var key_elements = v['key'].split(':');
>   var cust = key_elements[0];
>   var setup = key_elements[1];
>   var sid = key_elements[2];
>   var ts = key_elements[3];
> 
>   var result_key = cust + ':' + setup + ':' + ts;
>   var obj = {}
>   var obj_data = Riak.mapValuesJson(v)[0];
> 
>   obj_data['bps'] = (obj_data['rx_bytes'] + obj_data['tx_bytes']) / 60;
>   return_val = obj_data['bps'];
>   return [ return_val ];
> }
> 
> // if used, this must be a single reducer! Call from Ruby like this:
> //  reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => 
> true).
> function reduce95th(values) {
>   var sorted = values.sort(function(a,b) { return a - b; });
>   var pct = sorted.length / 100;
>   var element_95th = pct * 95;
>   element_95th = parseInt(element_95th, 10) + 1;
> 
>   return [ sorted[element_95th] ];
> }
> 
> 
> 
> Now here is the interesting part. The MR goes through one record per minute. 
> If I run it for a period of less than ~20 days, it executes. Otherwise, it 
> times out:
> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$
> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb 
> yellingtone default $((`date +%s` - 20 * 86400)) `date +%s`
> 125581.51666666666
> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb 
> yellingtone default $((`date +%s` - 30 * 86400)) `date +%s`
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:182:in
>  `decode_response': Expected success from Riak but received 0. 
> {"phase":1,"error":"timeout","input":null,"type":null,"stack":null} 
> (Riak::ProtobuffsFailedRequest)
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:116:in
>  `mapred'
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:325:in
>  `block in mapred'
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:435:in
>  `block in recover_from'
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/innertube-1.0.2/lib/innertube.rb:127:in
>  `take'
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:433:in
>  `recover_from'
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:379:in
>  `protobuffs'
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:133:in
>  `backend'
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:324:in
>  `mapred'
>       from 
> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/map_reduce.rb:217:in
>  `run'
>       from ./95h.rb:29:in `<main>'
> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$
> 
> The records being processed look lie this:
> {"rx_bytes":3485395.0,"tx_bytes":1658479.0}
> 
> When running the script with more than 20 days worth of data (two records per 
> minute are processed, which amounts to 2 * 60 * 24 * 20 = more than 57,600 
> processed), the script times out and here are some things from the logs:
> ==> /var/log/riak/erlang.log.1 <==
> Erlang has closed
> 
> ==> /var/log/riak/error.log <==
> 2013-07-14 13:03:51.580 [error] <0.709.0>@riak_pipe_vnode:new_worker:768 Pipe 
> worker startup failed:fitting was gone before startup
> 
> ==> /var/log/riak/console.log <==
> 2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in 
> state wait_for_input terminated with reason: timeout
> 
> ==> /var/log/riak/error.log <==
> 2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in 
> state wait_for_input terminated with reason: timeout
> 
> ==> /var/log/riak/console.log <==
> 2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process 
> <0.22049.4326> with 0 neighbours exited with reason: timeout in 
> gen_fsm:terminate/7 line 611
> 
> ==> /var/log/riak/crash.log <==
> 2013-07-14 13:03:51 =CRASH REPORT====
>   crasher:
>     initial call: riak_pipe_vnode_worker:init/1
>     pid: <0.22049.4326>
>     registered_name: []
>     exception exit: 
> {timeout,[{gen_fsm,terminate,7,[{file,"gen_fsm.erl"},{line,611}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,227}]}]}
>     ancestors: 
> [<0.710.0>,<0.709.0>,riak_core_vnode_sup,riak_core_sup,<0.129.0>]
>     messages: []
>     links: [<0.710.0>,<0.709.0>]
>     dictionary: 
> [{eunit,[{module,riak_pipe_vnode_worker},{partition,388211372416021087647853783690262677096107081728},{<0.709.0>,<0.709.0>},{details,{fitting_details,{fitting,<18125.23420.4566>,#Ref<18125.0.5432.50467>,<<"C�������������������">>,1},1,riak_kv_w_reduce,{rct,#Fun<riak_kv_w_reduce.0.20542221>,{struct,[{<<"reduce_phase_only_1">>,true}]}},{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined},[{log,sink},{trace,[error]},{sink,{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined}},{sink_type,{fsm,10,infinity}}],64}}]}]
>     trap_exit: false
>     status: running
>     heap_size: 832040
>     stack_size: 24
>     reductions: 1456611
>   neighbours:
> 
> ==> /var/log/riak/error.log <==
> 2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process 
> <0.22049.4326> with 0 neighbours exited with reason: timeout in 
> gen_fsm:terminate/7 line 611
> 
> ==> /var/log/riak/crash.log <==
> 2013-07-14 13:03:52 =SUPERVISOR REPORT====
>      Supervisor: {<0.710.0>,riak_pipe_vnode_worker_sup}
>      Context:    child_terminated
>      Reason:     timeout
>      Offender:   
> [{pid,<0.22049.4326>},{name,undefined},{mfargs,{riak_pipe_vnode_worker,start_link,undefined}},{restart_type,temporary},{shutdown,2000},{child_type,worker}]
> 
> 
> ==> /var/log/riak/console.log <==
> 2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor 
> riak_pipe_vnode_worker_sup had child undefined started with 
> {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with 
> reason timeout in context child_terminated
> 
> ==> /var/log/riak/error.log <==
> 2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor 
> riak_pipe_vnode_worker_sup had child undefined started with 
> {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with 
> reason timeout in context child_terminated
> 
> 
> The data is in leveldb and is accessed through secondary indexes. 
> This is a 3 node cluster with 32GB ram, current usage is about 12G per node. 
> n_val=3. The same issues occurs on a similar 2 node cluster with 8GB of ram 
> (usage is ~6G).
> 
> The following is my app.config:
> [
>  {riak_api, [
>             {pb_ip,   "0.0.0.0" },
>             {pb_port, 8087 },
>             {pb_backlog, 100 }
>             ]},
>  {riak_core, [
>               {default_bucket_props, [
>                     {n_val, 3},
>                     {last_write_wins, true}
>                     ]},
>               {ring_state_dir, "/storage/riak/ring"},
>               {ring_creation_size, 256},
>               {http, [ {"0.0.0.0", 8098 } ]},
>               {https, [{ "0.0.0.0", 8069 }]},
>               {ssl, [
>                      {certfile, "/etc/ssl/riak/server.crt"},
>                      {cacertfile, "/etc/ssl/riak/root.crt"},
>                      {keyfile, "/etc/ssl/riak/server.key"}
>                     ]},
>               {handoff_port, 8099 },
>               {dtrace_support, false},
>               {enable_health_checks, true},
>               {platform_bin_dir, "/usr/sbin"},
>               {platform_data_dir, "/storage/riak"},
>               {platform_etc_dir, "/etc/riak"},
>               {platform_lib_dir, "/usr/lib/riak/lib"},
>               {platform_log_dir, "/var/log/riak"}
>              ]},
>  {riak_kv, [
>             {storage_backend, riak_kv_eleveldb_backend},
>             {anti_entropy, {on, []}},
>             {anti_entropy_build_limit, {1, 3600000}},
>             {anti_entropy_expire, 604800000},
>             {anti_entropy_concurrency, 2},
>             {anti_entropy_tick, 15000},
>             {anti_entropy_data_dir, "/storage/riak/anti_entropy"},
>             {anti_entropy_leveldb_opts, [{write_buffer_size, 4194304},
>                                          {max_open_files, 20}]},
> 
>             {mapred_name, "mapred"},
>             {mapred_2i_pipe, true},
>             {map_js_vm_count, 16 },
>             {reduce_js_vm_count, 12 },
>             {hook_js_vm_count, 20 },
>             {js_max_vm_mem, 8},
>             {js_thread_stack, 16},
>             {js_source_dir, "/etc/riak/mapreduce/js_source"},
>             {http_url_encoding, on},
>             {vnode_vclocks, true},
>             {listkeys_backpressure, true},
>             {vnode_mailbox_limit, {1, 5000}}
>            ]},
> 
>  {riak_search, [
>                 {enabled, true}
>                ]},
> 
>  {merge_index, [
>                 {data_root, "/storage/riak/merge_index"},
>                 {buffer_rollover_size, 1048576},
>                 {max_compact_segments, 20}
>                ]},
> 
>  {bitcask, [
>              {data_root, "/storage/riak/bitcask"}
>            ]},
> 
>  {eleveldb, [
>              {cache_size, 1024},
>              {max_open_files, 64},
>              {data_root, "/storage/riak/leveldb"}
>             ]},
> 
>  {lager, [
>             {handlers, [
>                            {lager_file_backend, [
>                                {"/var/log/riak/error.log", error, 10485760, 
> "$D0", 5},
>                                {"/var/log/riak/console.log", info, 10485760, 
> "$D0", 5}
>                            ]}
>                        ] },
> 
>             {crash_log, "/var/log/riak/crash.log"},
>             {crash_log_msg_size, 65536},
>             {crash_log_size, 10485760},
>             {crash_log_date, "$D0"},
>             {crash_log_count, 5},
>             {error_logger_redirect, true}
>         ]},
> 
>  {riak_sysmon, [
>          {process_limit, 30},
>          {port_limit, 2},
>          {gc_ms_limit, 0},
>          {heap_word_limit, 40111000},
>          {busy_port, true},
>          {busy_dist_port, true}
>         ]},
> 
>  {sasl, [
>          {sasl_error_logger, false}
>         ]},
> 
> Sorry to bug you with such a long e-mail but I wanted to be as thorough as 
> possible. I tried raising a few options but it didn't help: map_js_vm_count, 
> reduce_js_vm_count, js_max_vm_mem
> I also tried adding a timeout argument to the map reduce caller code but even 
> if I set it to 60,000 or more (this is milliseconds), the script is 
> terminating with timeout error after 10-12 secs. The same behaviour is 
> observed if I use http instead of pbc.
> 
> What seems to be the problem? Is this a matter of configuration? I am 
> surprised about the fact that the job runs with 20-25 days of data and not 
> more.
> 
> thank you for your efforts,
> Deyan
> _______________________________________________
> riak-users mailing list
> riak-users@lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to