Yeah I created a JIRA a while back to piggy-back the map status info on top of the task (I honestly think it will be a small change). There isn't a good reason to broadcast the entire array and it can be an issue during large shuffles.
- Patrick On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson <ilike...@gmail.com> wrote: > I don't know of any way to avoid Akka doing a copy, but I would like to > mention that it's on the priority list to piggy-back only the map statuses > relevant to a particular map task on the task itself, thus reducing the > total amount of data sent over the wire by a factor of N for N physical > machines in your cluster. Ideally we would also avoid Akka entirely when > sending the tasks, as these can get somewhat large and Akka doesn't work > well with large messages. > > Do note that your solution of using broadcast to send the map tasks is very > similar to how the executor returns the result of a task when it's too big > for akka. We were thinking of refactoring this too, as using the block > manager has much higher latency than a direct TCP send. > > > On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan <mri...@gmail.com> > wrote: > >> Our current hack is to use Broadcast variables when serialized >> statuses are above some (configurable) size : and have the workers >> directly pull them from master. >> This is a workaround : so would be great if there was a >> better/principled solution. >> >> Please note that the responses are going to different workers >> requesting for the output statuses for shuffle (after map) - so not >> sure if back pressure buffers, etc would help. >> >> >> Regards, >> Mridul >> >> >> On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan <mri...@gmail.com> >> wrote: >> > Hi, >> > >> > While sending map output tracker result, the same serialized byte >> > array is sent multiple times - but the akka implementation copies it >> > to a private byte array within ByteString for each send. >> > Caching a ByteString instead of Array[Byte] did not help, since akka >> > does not support special casing ByteString : serializes the >> > ByteString, and copies the result out to an array before creating >> > ByteString out of it (in Array[Byte] serializing is thankfully simply >> > returning same array - so one copy only). >> > >> > >> > Given the need to send immutable data large number of times, is there >> > any way to do it in akka without copying internally in akka ? >> > >> > >> > To see how expensive it is, for 200 nodes withi large number of >> > mappers and reducers, the status becomes something like 30 mb for us - >> > and pulling this about 200 to 300 times results in OOM due to the >> > large number of copies sent out. >> > >> > >> > Thanks, >> > Mridul >>