I get it now. Thank you. Do you also happen to know the difference between boltload and connectionLoad? https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/grouping/Load.java * @param boltLoad the load as reported by the bolt 0.0 no load 1.0 fully loaded * @param connectionLoad the load as reported by the connection to the bolt 0.0 no load 1.0 fully loaded. boltLoad is measured on the basis of queue length. What about connectionLoad?
Best regards, Yamini Joshi On Tue, Apr 26, 2016 at 1:26 PM, Rodrigo Valladares < [email protected]> wrote: > The balancing is done based on the left capacity of the queue. A read > somewhere on the code (I don;t remember) that the load mapping gives us a > number between 0 and 1 with 0 meaning no tuples in the queue and 1 its a > full capacity. > > So int val = (int)(101 - (load.get(targets[i]) * 100)); gives us the > left capacity normalized between 1 and 101 (I think its summing 1 to avoid > the case where all executors are fully loaded in which case you would end > up with a zero division). > > loads array store the left capacity and total store the sum of the left > capacity. > > If you generate random number between 0 and total in a uniform > distribution the chance for this number to fall between 0 and l1, with l1 > being the load in executor 1, would be l1/total. The chance of it falling > between l1 and l1 + l2 would be l2/total, etc. > > That is what the for loop is doing. > > 2016-04-26 12:26 GMT-05:00 Yamini Joshi <[email protected]>: > >> This is the core function: >> >> https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java >> public List<Integer> chooseTasks(int taskId, List<Object> values, >> LoadMapping load) { >> if ((lastUpdate + 1000) < System.currentTimeMillis()) { >> int local_total = 0; >> for (int i = 0; i < targets.length; i++) { >> int val = (int)(101 - (load.get(targets[i]) * 100)); >> loads[i] = val; >> local_total += val; >> } >> total = local_total; >> lastUpdate = System.currentTimeMillis(); >> } >> int selected = random.nextInt(total); >> int sum = 0; >> for (int i = 0; i < targets.length; i++) { >> sum += loads[i]; >> if (selected < sum) { >> return rets[i]; >> } >> } >> return rets[rets.length-1]; >> } >> >> My question: If we have to select the target task with min/max load, why >> are using a random value from the total load and comparing all target tasks >> to it? >> >> Best regards, >> Yamini Joshi >> >> On Tue, Apr 26, 2016 at 11:04 AM, Rodrigo Valladares < >> [email protected]> wrote: >> >>> I was doing some research on this topic. I checked the code a while back >>> and from what I understood this grouping balances the tuples according to >>> the current queue of each executor. >>> >>> For example Bolt A have 4 executors 2 with a queue at 50% capacity and 2 >>> at 40% capacity. The distribution would be proportional to "left capacity >>> on the queue" (1 - capacity) so for executor 1 and 2 it would be: >>> 0.5/(0.5*2 +0.6*2) . >>> >>> I think it was built taking heterogeneous clusters in consideration. In >>> cases where some machines are faster than others shuffle grouping perform >>> very poorly since it does not take into consideration that executors have >>> different latencies. You might expect that slower machine will built a >>> queue faster on its executors so this balancing would address this >>> limitation. I did not develop or tested this so I can't assure you it will >>> work like this, but I think this the idea behind it. >>> >>> 2016-04-26 10:46 GMT-05:00 Tom Brown <[email protected]>: >>> >>>> Hi Yamini, >>>> >>>> Your question piqued my interest, so I decided to see what I could >>>> figure out. After reading this JIRA >>>> https://issues.apache.org/jira/browse/STORM-162, I am still confused. >>>> Hopefully someone else on the list will be able to help us. >>>> >>>> Is LoadAwareShuffleGrouping a way of changing a basic shuffle operation >>>> (e.g. randomized round robin) to include downstream load as an input? Or is >>>> it an extension of a fields grouping that could allow the same tuple to be >>>> sent to different downstream tasks (A or B) depending on which has the >>>> higher load? >>>> >>>> --Tom >>>> >>>> On Tue, Apr 26, 2016 at 8:53 AM, Yamini Joshi <[email protected]> >>>> wrote: >>>> >>>>> Hello everyone! >>>>> >>>>> I am new to Storm and I was looking into different stream grouping >>>>> when I came across LoadAwareShuffleGrouping. Can someone tell me what it >>>>> is >>>>> exactly? Has it been included in the latest storm build? All my google >>>>> searches on this point to JIRA tickets. >>>>> >>>>> Any help is appreciated. >>>>> >>>>> Thank you. >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> >>> -- >>> Rodrigo Valladares Cotta >>> Master's Student, Computer Science >>> University of Nebraska-Lincoln >>> >> >> > > > -- > Rodrigo Valladares Cotta > Master's Student, Computer Science > University of Nebraska-Lincoln >
