Good idea -- I've added this to the wiki:
https://cwiki.apache.org/confluence/display/SPARK/Shuffle+Internals.  Happy
to stick it elsewhere if folks think there's a more convenient place.

On Thu, Jun 11, 2015 at 4:46 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Kay,
>
> Excellent write-up. This should be preserved for reference somewhere
> searchable.
>
> -Gerard.
>
>
>
> On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout <k...@eecs.berkeley.edu>
> wrote:
>
>> Here’s how the shuffle works.  This explains what happens for a single
>> task; this will happen in parallel for each task running on the machine,
>> and as Imran said, Spark runs up to “numCores” tasks concurrently on each
>> machine.  There's also an answer to the original question about why CPU use
>> is low at the very bottom.
>>
>> The key data structure used in fetching shuffle data is the “results”
>> queue in ShuffleBlockFetcherIterator, which buffers data that we have in
>> serialized (and maybe compressed) form, but haven’t yet deserialized /
>> processed.  The results queue is filled by many threads fetching data over
>> the network (the number of concurrent threads fetching data is equal to the
>> number of remote executors we’re currently fetching data from) [0], and is
>> consumed by a single thread that deserializes the data and computes some
>> function over it (e.g., if you’re doing rdd.count(), the thread
>> deserializes the data and counts the number of items).  As we fetch data
>> over the network, we track bytesInFlight, which is data that has been
>> requested (and possibly received) from a remote executor, but that hasn’t
>> yet been deserialized / processed by the consumer thread.  So, this
>> includes all of the data in the “results” queue, and possibly more data
>> that’s currently outstanding over the network.  We always issue as many
>> requests as we can, with the constraint that bytesInFlight remains less
>> than a specified maximum [1].
>>
>> In a little more detail, here’s exactly what happens when a task begins
>> reading shuffled data:
>>
>> (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data
>> [1] over the network (this happens here
>> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260>
>> ).
>>
>> These requests are all executed asynchronously using a ShuffleClient [2]
>> via the shuffleClient.fetchBlocks call
>> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149>
>>  [3].  We pass in a callback that, once a block has been successfully
>> fetched, sticks it on the “results” queue.
>>
>> (2) Begin processing the local data.  One by one, we request the local
>> data from the local block manager (which memory maps the file) and then stick
>> the result onto the results queue
>> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230>.
>> Because we memory map the files, which is speedy, the local data typically
>> all ends up on the results in front of the remote data.
>>
>> (3) One the async network requests have been issued (note — issued, but
>> not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1)
>> and (2) have happened), ShuffleBlockFetcherIterator returns an iterator
>> that gets wrapped too many times to count [4] and eventually gets unrolled
>> [5].  Each time next() is called on the iterator, it blocks waiting for an
>> item from the results queue.  This may return right away, or if the queue
>> is empty, will block waiting on new data from the network [6].  Before
>> returning from next(), we update our accounting for the bytes in flight:
>> the chunk of data we return is no longer considered in-flight, because it’s
>> about to be processed, so we update the current bytesInFlight, and if it
>> won’t result in > maxBytesInFlight outstanding, send some more requests for
>> data.
>>
>> ————————————————
>>
>> Notes:
>>
>> [0] Note that these threads consume almost no CPU resources, because they
>> just receive data from the OS and then execute a callback that sticks the
>> data on the results queue.
>>
>> [1] We limit the data outstanding on the network to avoid using too much
>> memory to hold the data we’ve fetched over the network but haven’t yet
>> processed.
>>
>> [1.5] Each request may include multiple shuffle blocks, where is a
>> "block" is the data output for this reduce task by a particular map task.
>> All of the reduce tasks for a shuffle read a total of # map tasks * #
>> reduce tasks shuffle blocks; each reduce task reads # map tasks blocks.  We
>> do some hacks
>> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177>
>> to try to size these requests in a "good" way: we limit each request to
>> about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
>> concurrently without exceeding maxBytesInFlight.  5 is completely a magic
>> number here that was probably guessed by someone long long ago, and it
>> seems to work ok.
>>
>> [2] The default configuration uses NettyBlockTransferService as the
>> ShuffleClient implementation (note that this extends BlockTransferService,
>> which extends ShuffleClient).
>>
>> [3] If you’re curious how the shuffle client fetches data, the default
>> Spark configuration results in exactly one TCP connection from an executor
>> to each other executor.  If executor A is getting shuffle data from
>> executor B, we start by sending an OpenBlocks message from A to B.  The
>> OpenBlocks message includes the list of blocks that A wants to fetch, and
>> causes the remote executor, B, to start to pull the corresponding data into
>> memory from disk (we typically memory map the files, so this may not
>> actually result in the data being read yet), and also to store some state
>> associated with this “stream” of data.  The remote executor, B, responds
>> with a stream ID that helps it to identify the connection.  Next, A
>> requests blocks one at a time from B using an ChunkFetchRequest message
>> (this happens here
>> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L101>
>>  in
>> OneForOneBlockFetcher, which calls this code
>> <https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java#L97>
>> in TransportClient; currently, we have a one-to-one mapping from a chunk to
>> a particular block).  It’s possible that there are many sets of shuffle
>> data being fetched concurrently between A and B (e.g., because many tasks
>> are run concurrently).  These requests are serialized, so one block is sent
>> at a time from B, and they’re sent in the order that the requests were
>> issued on A.
>>
>> [4] In BlockStoreShuffleFetcher, which handles failures; then in
>> HashShuffleReader, which helps aggregate some of the data; etc.
>>
>> [5] This happens in BlockManager.putIterator, if the RDD is going to be
>> cached; in the function passed in to ResultTask, if this is the last stage
>> in a job; or via the writer.write() call in ShuffleMapTask, if this is a
>> stage that generates intermediate shuffle data.
>>
>> [6] We time how long we spend blocking on data from the network; this is
>> what’s shown as “fetch wait time” in Spark’s UI.
>>
>> ——————————————————
>>
>> To answer the original question about why CPU use is low, this means that
>> the main thread (that pulls data off of the results queue) is blocking on
>> I/O.  It could be blocking to receive data over the network (which will be
>> included in the fetch wait time shown in the UI, described above), or it
>> could be blocking to read data from the local disk (because the shuffle
>> data that is read locally is memory-mapped, so the thread may block on disk
>> if the data hasn’t been read into memory yet).  It could also be blocking
>> because it’s writing new shuffle output to disk (also shown in the UI, as
>> shuffle write time), or because it’s spilling intermediate data.
>> Everything except the spilling is shown in the handy-dandy new
>> visualization that Kousuke recently added
>> <https://github.com/apache/spark/commit/a5f7b3b9c7f05598a1cc8e582e5facee1029cd5e>to
>> the UI (available in 1.4 onwards): if you look at the stage view, and click
>> on “Event Timeline”, you’ll see a visualization of the tasks and how long
>> they spent blocked on various things (we should add the spilling to
>> this…there is a long-outstanding JIRA for this).
>>
>> Hope this helps!
>>
>> -Kay
>>
>> On Thu, Jun 11, 2015 at 7:42 AM, Imran Rashid <iras...@cloudera.com>
>> wrote:
>>
>>> That is not exactly correct -- that being said I'm not 100% on these
>>> details either so I'd appreciate you double checking  and / or another dev
>>> confirming my description.
>>>
>>>
>>> Spark actually has more threads going then the "numCores" you specify.
>>> "numCores" is really used for how many threads are actively executing
>>> tasks.  There are more threads for doing the fetching (the details of which
>>> I'm not that familiar with) -- that never cuts into the number of actively
>>> executing tasks for "numCores".  There isn't a 1-to-1 correspondence
>>> between one shuffle block and one task -- a shuffle-read task most likely
>>> needs to fetch many shuffle blocks, with some local and some remote (in
>>> most cases).  So, from your lingo above, c is numCores, but c_1 is just an
>>> independent pool of threads.
>>>
>>> this obvious follow up question is, if you've actually more than
>>> "numCores" threads going at the same time, how come cpu usage is low?  You
>>> could still have your cpus stuck waiting on i/o, from disk or network, so
>>> they aren't getting fully utilized.  And the cpu can also be idle waiting
>>> for memory, if there are a lot of cache misses (I'm not sure how that will
>>> show up in cpu monitoring).  If that were the case, that could even be from
>>> too many threads, as a lot of time is spent context switching ... but I'm
>>> just guessing now.
>>>
>>> hope this helps,
>>> Imran
>>>
>>>
>>>
>>> On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes <91m...@gmail.com> wrote:
>>>
>>>> Hi Imran,
>>>>
>>>> Thank you again for your email.
>>>>
>>>> I just want to ask one further question to clarify the implementation
>>>> of the shuffle block fetches. When you say that rather than sitting
>>>> idle, [the executor] will immediately start reading the local block, I
>>>> would guess that, in implementation, the executor is going to launch
>>>> concurrent threads to read both local and remote blocks, which it
>>>> seems to do in the initialize() method of
>>>> core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case
>>>> or would the Executor run all local fetch threads first?
>>>>
>>>> The reason I ask is that if the slave machine on which the Executor is
>>>> running has some number of cores, c, then I  would have thought that
>>>> some of the threads launched would occupy some number, c_1, of the
>>>> cores and conduct the local reads (where c_1 <= c). The other threads
>>>> would occupy the other (c - c_1) cores' cycles until *all* necessary
>>>> blocks have been read, and depending on c and the number of blocks to
>>>> fetch so that none of the cores are idle if there are many blocks to
>>>> fetch. (I monitor the CPU utilization of our nodes throughout a job,
>>>> and generally find them under-utilized statistically speaking; that
>>>> is, their usage over the whole job is lower than expected, with short
>>>> burst of high usage, so I ask this question in a specific way for this
>>>> reason, since I can see trends in the probability density functions of
>>>> CPU utilization as the #partitions of our RDDs are increased).
>>>>
>>>> ShuffleBlockFetcherIterator.scala:
>>>>
>>>>   private[this] def initialize(): Unit = {
>>>>                 ...
>>>>     // Send out initial requests for blocks, up to our maxBytesInFlight
>>>>     while (fetchRequests.nonEmpty &&
>>>>       (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size
>>>> <= maxBytesInFlight)) {
>>>>       sendRequest(fetchRequests.dequeue())
>>>>     }
>>>>     val numFetches = remoteRequests.size - fetchRequests.size
>>>>     logInfo("Started " + numFetches + " remote fetches in" +
>>>> Utils.getUsedTimeMs(startTime))
>>>>
>>>>     // Get Local Blocks
>>>>     fetchLocalBlocks()
>>>>     logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
>>>>   }
>>>>   private[this] def fetchLocalBlocks() {
>>>>     val iter = localBlocks.iterator
>>>>     while (iter.hasNext) {
>>>>       val blockId = iter.next()
>>>>       try {
>>>>         val buf = blockManager.getBlockData(blockId)
>>>>         shuffleMetrics.incLocalBlocksFetched(1)
>>>>         shuffleMetrics.incLocalBytesRead(buf.size)
>>>>         buf.retain()
>>>>         results.put(new SuccessFetchResult(blockId, 0, buf))
>>>>       } catch {
>>>>                                 ...
>>>>       }
>>>>     }
>>>>   }
>>>>
>>>> Obviously, I will have to sit down with core/.../network/nio/* and
>>>> core/.../shuffle/* and do my own homework on this, but from what I can
>>>> tell, the BlockDataManager relies on either
>>>> NioBlockTransferService.scala or the NettyBlockTransferService.scala
>>>> (which are set in SparkEnv.scala), both of which do the grunt work of
>>>> actually buffering and transferring the blocks' bytes. Finally, the
>>>> tasks in new stage for which the shuffle outputs have been fetched
>>>> will not commence until all of the block fetching threads (both local
>>>> and remote) have terminated.
>>>>
>>>> Does the above paint an accurate picture? I would really appreciate
>>>> clarification on the concurrency, since I would like to determine why
>>>> our jobs have under-utilization and poor weak scaling efficiency.
>>>>
>>>> I will cc this thread over to the dev list. I did not cc them in case
>>>> my previous question was trivial---I didn't want to spam the list
>>>> unnecessarily, since I do not see these kinds of questions posed there
>>>> frequently.
>>>>
>>>> Thanks a bunch,
>>>> Mike
>>>>
>>>>
>>>> On 6/10/15, Imran Rashid <iras...@cloudera.com> wrote:
>>>> > Hi Mike,
>>>> >
>>>> > no, this is a good question, I can see how my response could be
>>>> interpreted
>>>> > both ways.
>>>> >
>>>> > To be more precise:
>>>> > *nothing* is fetched until the shuffle-read stage starts.  So it is
>>>> normal
>>>> > to see a spike in cluster bandwidth when that stage starts.  There is
>>>> a
>>>> > hard-boundary between stages -- that is, spark never starts any tasks
>>>> in
>>>> > one stage until *all* tasks in the dependent stages have been
>>>> completed.
>>>> > (There has been on-and-off discussion about relaxing this, but IMO
>>>> this is
>>>> > unlikely to change in the near future.)  So spark will wait for all
>>>> of the
>>>> > tasks in the previous shuffle-write stage to finish, and then kick
>>>> off a
>>>> > bunch of shuffle-read tasks in the next stage, leading to the spike
>>>> you
>>>> > see.
>>>> >
>>>> > I was referring to the way blocks are fetched within one of those
>>>> > shuffle-read tasks.  One of those tasks is will probably going to
>>>> need a
>>>> > bunch of different blocks, from many executors.  But some of the
>>>> blocks it
>>>> > needs will probably exist locally.  So the task first sends out a
>>>> request
>>>> > to fetch blocks remotely (leading to the spike), but rather than
>>>> sitting
>>>> > idle, it will immediately start reading the local blocks.  Ideally,
>>>> by the
>>>> > time its done reading the local blocks, some of the remote blocks have
>>>> > already been fetched, so no time is spent *waiting* for the remote
>>>> reads.
>>>> > As the remote blocks get read, spark sends out more requests, trying
>>>> to
>>>> > balance how much data needs to be buffered vs. preventing any waiting
>>>> on
>>>> > remote reads (which can  be controlled by
>>>> spark.reducer.maxSizeInFlight).
>>>> >
>>>> > Hope that clarifies things!
>>>> >
>>>> > btw, you sent this last question to just me -- I think its a good
>>>> question,
>>>> > do you mind sending it to the list?  I figured that was accidental but
>>>> > wanted to check.
>>>> >
>>>> > Imran
>>>> >
>>>> > On Wed, Jun 10, 2015 at 12:20 AM, Mike Hynes <91m...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> Hi Imran,
>>>> >> One additional quick question---I just want to confirm that I fully
>>>> >> understand your comment that "blocks are fetched before they are
>>>> >> needed." Typically on our system, we see spikes in cluster bandwidth
>>>> >> (with ganglia) at stage boundaries, so I previously assumed that all
>>>> >> shuffle read occurred there. Do you mean that the blocks are fetched
>>>> >> by the shuffle read iterator, and hence when tasks occur afterwards
>>>> >> the necessary blocks have already been fetched?
>>>> >> Thanks---I am sorry if this is an obvious question, but I'd like to
>>>> >> understand this as precisely as possible.
>>>> >> Mike
>>>> >>
>>>> >> On 6/10/15, Mike Hynes <91m...@gmail.com> wrote:
>>>> >> > Ahhh---forgive my typo: what I mean is,
>>>> >> > (t2 - t1) >= (t_ser + t_deser + t_exec)
>>>> >> > is satisfied, empirically.
>>>> >> >
>>>> >> > On 6/10/15, Mike Hynes <91m...@gmail.com> wrote:
>>>> >> >> Hi Imran,
>>>> >> >>
>>>> >> >> Thank you for your email.
>>>> >> >>
>>>> >> >> In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I
>>>> >> >> have found it to be true, although I have not included the
>>>> >> >> t_{wait_for_read} in this, since it is---so far as I can
>>>> tell---been
>>>> >> >> either zero or negligible compared to the task time.
>>>> >> >>
>>>> >> >> Thanks,
>>>> >> >> Mike
>>>> >> >>
>>>> >> >> On 6/8/15, Imran Rashid <iras...@cloudera.com> wrote:
>>>> >> >>> Hi Mike,
>>>> >> >>>
>>>> >> >>> all good questions, let me take a stab at answering them:
>>>> >> >>>
>>>> >> >>> 1. Event Logs + Stages:
>>>> >> >>>
>>>> >> >>> Its normal for stages to get skipped if they are shuffle map
>>>> stages,
>>>> >> >>> which
>>>> >> >>> get read multiple times.  Eg., here's a little example program I
>>>> >> >>> wrote
>>>> >> >>> earlier to demonstrate this: "d3" doesn't need to be re-shuffled
>>>> >> >>> since
>>>> >> >>> each
>>>> >> >>> time its read w/ the same partitioner.  So skipping stages in
>>>> this
>>>> >> >>> way
>>>> >> >>> is
>>>> >> >>> a
>>>> >> >>> good thing:
>>>> >> >>>
>>>> >> >>> val partitioner = new org.apache.spark.HashPartitioner(10)
>>>> >> >>> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) ->
>>>> >> >>> x}.partitionBy(partitioner)
>>>> >> >>> (0 until 5).foreach { idx =>
>>>> >> >>>   val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x
>>>> % 10)
>>>> >> ->
>>>> >> >>> x}.partitionBy(partitioner)
>>>> >> >>>   println(idx + " ---> " + otherData.join(d3).count())
>>>> >> >>> }
>>>> >> >>>
>>>> >> >>> If you run this, f you look in the UI you'd see that all jobs
>>>> except
>>>> >> for
>>>> >> >>> the first one have one stage that is skipped.  You will also see
>>>> this
>>>> >> in
>>>> >> >>> the log:
>>>> >> >>>
>>>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage:
>>>> >> >>> List(Stage
>>>> >> >>> 12,
>>>> >> >>> Stage 13)
>>>> >> >>>
>>>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage
>>>> 13)
>>>> >> >>>
>>>> >> >>> Admittedly that is not very clear, but that is sort of
>>>> indicating to
>>>> >> you
>>>> >> >>> that the DAGScheduler first created stage 12 as a necessary
>>>> step, and
>>>> >> >>> then
>>>> >> >>> later on changed its mind by realizing that everything it needed
>>>> for
>>>> >> >>> stage
>>>> >> >>> 12 already existed, so there was nothing to do.
>>>> >> >>>
>>>> >> >>>
>>>> >> >>> 2. Extracting Event Log Information
>>>> >> >>>
>>>> >> >>> maybe you are interested in SparkListener ? Though
>>>> unfortunately, I
>>>> >> >>> don't
>>>> >> >>> know of a good blog post describing it, hopefully the docs are
>>>> clear
>>>> >> ...
>>>> >> >>>
>>>> >> >>> 3. Time Metrics in Spark Event Log
>>>> >> >>>
>>>> >> >>> This is a great question.  I *think* the only exception is that
>>>> t_gc
>>>> >> >>> is
>>>> >> >>> really overlapped with t_exec.  So I think you should really
>>>> expect
>>>> >> >>>
>>>> >> >>> (t2 - t1) < (t_ser + t_deser + t_exec)
>>>> >> >>>
>>>> >> >>> I am not 100% sure about this, though.  I'd be curious if that
>>>> was
>>>> >> >>> constraint was ever violated.
>>>> >> >>>
>>>> >> >>>
>>>> >> >>> As for your question on shuffle read vs. shuffle write time -- I
>>>> >> >>> wouldn't
>>>> >> >>> necessarily expect the same stage to have times for both shuffle
>>>> read
>>>> >> >>> &
>>>> >> >>> shuffle write -- in the simplest case, you'll have shuffle write
>>>> >> >>> times
>>>> >> >>> in
>>>> >> >>> one, and shuffle read times in the next one.  But even taking
>>>> that
>>>> >> >>> into
>>>> >> >>> account, there is a difference in the way they work & are
>>>> measured.
>>>> >> >>>  shuffle read operations are pipelined and the way we measure
>>>> shuffle
>>>> >> >>> read,
>>>> >> >>> its just how much time is spent *waiting* for network transfer.
>>>> It
>>>> >> >>> could
>>>> >> >>> be that there is no (measurable) wait time b/c the next blocks
>>>> are
>>>> >> >>> fetched
>>>> >> >>> before they are needed.  Shuffle writes occur in the normal task
>>>> >> >>> execution
>>>> >> >>> thread, though, so we (try to) measure all of it.
>>>> >> >>>
>>>> >> >>>
>>>> >> >>> On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com>
>>>> wrote:
>>>> >> >>>
>>>> >> >>>> Hi Patrick and Akhil,
>>>> >> >>>>
>>>> >> >>>> Thank you both for your responses. This is a bit of an extended
>>>> >> >>>> email,
>>>> >> >>>> but I'd like to:
>>>> >> >>>> 1. Answer your (Patrick) note about the "missing" stages since
>>>> the
>>>> >> >>>> IDs
>>>> >> >>>> do (briefly) appear in the event logs
>>>> >> >>>> 2. Ask for advice/experience with extracting information from
>>>> the
>>>> >> >>>> event logs in a columnar, delimiter-separated format.
>>>> >> >>>> 3. Ask about the time metrics reported in the event logs;
>>>> currently,
>>>> >> >>>> the elapsed time for a task does not equal the sum of the times
>>>> for
>>>> >> >>>> its components
>>>> >> >>>>
>>>> >> >>>> 1. Event Logs + Stages:
>>>> >> >>>> =========================
>>>> >> >>>>
>>>> >> >>>> As I said before, In the spark logs (the log4j configurable ones
>>>> >> >>>> from
>>>> >> >>>> the driver), I only see references to some stages, where the
>>>> stage
>>>> >> >>>> IDs
>>>> >> >>>> are not arithmetically increasing. In the event logs, however, I
>>>> >> >>>> will
>>>> >> >>>> see reference to *every* stage, although not all stages will
>>>> have
>>>> >> >>>> tasks associated with them.
>>>> >> >>>>
>>>> >> >>>> For instance, to examine the actual stages that have tasks, you
>>>> can
>>>> >> >>>> see missing stages:
>>>> >> >>>> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \
>>>> >> >>>> #               | grep -Eo '"Stage ID":[[:digit:]]+'  \
>>>> >> >>>> #               | sort -n|uniq | head -n 5
>>>> >> >>>> "Stage ID":0
>>>> >> >>>> "Stage ID":1
>>>> >> >>>> "Stage ID":10
>>>> >> >>>> "Stage ID":11
>>>> >> >>>> "Stage ID":110
>>>> >> >>>>
>>>> >> >>>> However, these "missing" stages *do* appear in the event logs as
>>>> >> >>>> Stage
>>>> >> >>>> IDs in the jobs submitted, i.e: for
>>>> >> >>>> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo
>>>> >> >>>> 'Stage
>>>> >> >>>> IDs":\[.*\]' | head -n 5
>>>> >> >>>> Stage IDs":[0,1,2]
>>>> >> >>>> Stage IDs":[5,3,4]
>>>> >> >>>> Stage IDs":[6,7,8]
>>>> >> >>>> Stage IDs":[9,10,11]
>>>> >> >>>> Stage IDs":[12,13,14]
>>>> >> >>>>
>>>> >> >>>> I do not know if this amounts to a bug, since I am not familiar
>>>> with
>>>> >> >>>> the scheduler in detail. The stages have seemingly been created
>>>> >> >>>> somewhere in the DAG, but then have no associated tasks and
>>>> never
>>>> >> >>>> appear again.
>>>> >> >>>>
>>>> >> >>>> 2. Extracting Event Log Information
>>>> >> >>>> ====================================
>>>> >> >>>> Currently we are running scalability tests, and are finding very
>>>> >> >>>> poor
>>>> >> >>>> scalability for certain block matrix algorithms. I would like to
>>>> >> >>>> have
>>>> >> >>>> finer detail about the communication time and bandwidth when
>>>> data is
>>>> >> >>>> transferred between nodes.
>>>> >> >>>>
>>>> >> >>>> I would really just like to have a file with nothing but task
>>>> info
>>>> >> >>>> in
>>>> >> >>>> a format such as:
>>>> >> >>>> timestamp (ms), task ID, hostname, execution time (ms), GC time
>>>> >> >>>> (ms),
>>>> >> >>>> ...
>>>> >> >>>> 0010294, 1, slave-1, 503, 34, ...
>>>> >> >>>> 0010392, 2, slave-2, 543, 32, ...
>>>> >> >>>> and similarly for jobs/stages/rdd_memory/shuffle output/etc.
>>>> >> >>>>
>>>> >> >>>> I have extracted the relevant time fields from the spark event
>>>> logs
>>>> >> >>>> with a sed script, but I wonder if there is an even more
>>>> expedient
>>>> >> >>>> way. Unfortunately, I do not immediately see how to do this
>>>> using
>>>> >> >>>> the
>>>> >> >>>> $SPARK_HOME/conf/metrics.properties file and haven't come
>>>> across a
>>>> >> >>>> blog/etc that describes this. Could anyone please comment on
>>>> whether
>>>> >> >>>> or not a metrics configuation for this already exists?
>>>> >> >>>>
>>>> >> >>>> 3. Time Metrics in Spark Event Log
>>>> >> >>>> ==================================
>>>> >> >>>> I am confused about the times reported for tasks in the event
>>>> log.
>>>> >> >>>> There are launch and finish timestamps given for each task (call
>>>> >> >>>> them
>>>> >> >>>> t1 and t2, respectively), as well as GC time (t_gc), execution
>>>> time
>>>> >> >>>> (t_exec), and serialization times (t_ser, t_deser). However the
>>>> >> >>>> times
>>>> >> >>>> do not add up as I would have expected. I would imagine that the
>>>> >> >>>> elapsed time t2 - t1 would be slightly larger than the sum of
>>>> the
>>>> >> >>>> component times. However, I can find many instances in the event
>>>> >> >>>> logs
>>>> >> >>>> where:
>>>> >> >>>> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec)
>>>> >> >>>> The difference can be 500 ms or more, which is not negligible
>>>> for my
>>>> >> >>>> current execution times of ~5000 ms. I have attached a plot that
>>>> >> >>>> illustrates this.
>>>> >> >>>>
>>>> >> >>>> Regarding this, I'd like to ask:
>>>> >> >>>> 1. How exactly are these times are being measured?
>>>> >> >>>> 2. Should the sum of the component times equal the elapsed
>>>> (clock)
>>>> >> >>>> time for the task?
>>>> >> >>>> 3. If not, which component(s) is(are) being excluded, and when
>>>> do
>>>> >> >>>> they
>>>> >> >>>> occur?
>>>> >> >>>> 4. There are occasionally reported measurements for Shuffle
>>>> Write
>>>> >> >>>> time, but not shuffle read time. Is there a method to determine
>>>> the
>>>> >> >>>> time required to shuffle data? Could this be done by look at
>>>> delays
>>>> >> >>>> between the first task in a new stage and the last task in the
>>>> >> >>>> previous stage?
>>>> >> >>>>
>>>> >> >>>> Thank you very much for your time,
>>>> >> >>>> Mike
>>>> >> >>>>
>>>> >> >>>>
>>>> >> >>>> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote:
>>>> >> >>>> > Hey Mike,
>>>> >> >>>> >
>>>> >> >>>> > Stage ID's are not guaranteed to be sequential because of the
>>>> way
>>>> >> the
>>>> >> >>>> > DAG scheduler works (only increasing). In some cases stage ID
>>>> >> numbers
>>>> >> >>>> > are skipped when stages are generated.
>>>> >> >>>> >
>>>> >> >>>> > Any stage/ID that appears in the Spark UI is an actual stage,
>>>> so
>>>> >> >>>> > if
>>>> >> >>>> > you see ID's in there, but they are not in the logs, then let
>>>> us
>>>> >> know
>>>> >> >>>> > (that would be a bug).
>>>> >> >>>> >
>>>> >> >>>> > - Patrick
>>>> >> >>>> >
>>>> >> >>>> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das
>>>> >> >>>> > <ak...@sigmoidanalytics.com>
>>>> >> >>>> > wrote:
>>>> >> >>>> >> Are you seeing the same behavior on the driver UI? (that
>>>> running
>>>> >> >>>> >> on
>>>> >> >>>> >> port
>>>> >> >>>> >> 4040), If you click on the stage id header you can sort the
>>>> >> >>>> >> stages
>>>> >> >>>> >> based
>>>> >> >>>> >> on
>>>> >> >>>> >> IDs.
>>>> >> >>>> >>
>>>> >> >>>> >> Thanks
>>>> >> >>>> >> Best Regards
>>>> >> >>>> >>
>>>> >> >>>> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <
>>>> 91m...@gmail.com>
>>>> >> >>>> >> wrote:
>>>> >> >>>> >>>
>>>> >> >>>> >>> Hi folks,
>>>> >> >>>> >>>
>>>> >> >>>> >>> When I look at the output logs for an iterative Spark
>>>> program, I
>>>> >> >>>> >>> see
>>>> >> >>>> >>> that the stage IDs are not arithmetically numbered---that
>>>> is,
>>>> >> there
>>>> >> >>>> >>> are gaps between stages and I might find log information
>>>> about
>>>> >> >>>> >>> Stage
>>>> >> >>>> >>> 0, 1,2, 5, but not 3 or 4.
>>>> >> >>>> >>>
>>>> >> >>>> >>> As an example, the output from the Spark logs below shows
>>>> what I
>>>> >> >>>> >>> mean:
>>>> >> >>>> >>>
>>>> >> >>>> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr  | grep
>>>> finished
>>>> >> >>>> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at
>>>> >> >>>> >>> blockMap.scala:1444)
>>>> >> >>>> >>> finished in 7.820 s:
>>>> >> >>>> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810)
>>>> >> >>>> >>> finished
>>>> >> >>>> >>> in 3.874 s:
>>>> >> >>>> >>> 18291:INFO:DAGScheduler:Stage 2 (count at
>>>> blockMap.scala:1179)
>>>> >> >>>> >>> finished in 2.237 s:
>>>> >> >>>> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817)
>>>> >> >>>> >>> finished
>>>> >> >>>> >>> in 1.749 s:
>>>> >> >>>> >>> 21254:INFO:DAGScheduler:Stage 5 (count at
>>>> blockMap.scala:1180)
>>>> >> >>>> >>> finished in 1.082 s:
>>>> >> >>>> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810)
>>>> >> >>>> >>> finished
>>>> >> >>>> >>> in 2.078 s:
>>>> >> >>>> >>> 24773:INFO:DAGScheduler:Stage 8 (count at
>>>> blockMap.scala:1188)
>>>> >> >>>> >>> finished in 1.317 s:
>>>> >> >>>> >>> 26455:INFO:DAGScheduler:Stage 10 (map at
>>>> blockMap.scala:1817)
>>>> >> >>>> >>> finished
>>>> >> >>>> >>> in 1.638 s:
>>>> >> >>>> >>> 27228:INFO:DAGScheduler:Stage 11 (count at
>>>> blockMap.scala:1189)
>>>> >> >>>> >>> finished in 0.732 s:
>>>> >> >>>> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at
>>>> >> >>>> >>> blockMap.scala:1302)
>>>> >> >>>> >>> finished in 0.192 s:
>>>> >> >>>> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at
>>>> >> >>>> >>> blockMap.scala:1302)
>>>> >> >>>> >>> finished in 0.170 s:
>>>> >> >>>> >>> 28018:INFO:DAGScheduler:Stage 20 (count at
>>>> blockMap.scala:1201)
>>>> >> >>>> >>> finished in 0.270 s:
>>>> >> >>>> >>> 28611:INFO:DAGScheduler:Stage 23 (map at
>>>> blockMap.scala:1355)
>>>> >> >>>> >>> finished
>>>> >> >>>> >>> in 0.455 s:
>>>> >> >>>> >>> 29598:INFO:DAGScheduler:Stage 24 (count at
>>>> blockMap.scala:274)
>>>> >> >>>> >>> finished in 0.928 s:
>>>> >> >>>> >>> 29954:INFO:DAGScheduler:Stage 27 (map at
>>>> blockMap.scala:1355)
>>>> >> >>>> >>> finished
>>>> >> >>>> >>> in 0.305 s:
>>>> >> >>>> >>> 30390:INFO:DAGScheduler:Stage 28 (count at
>>>> blockMap.scala:275)
>>>> >> >>>> >>> finished in 0.391 s:
>>>> >> >>>> >>> 30452:INFO:DAGScheduler:Stage 32 (first at
>>>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s:
>>>> >> >>>> >>> 30506:INFO:DAGScheduler:Stage 36 (first at
>>>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s:
>>>> >> >>>> >>>
>>>> >> >>>> >>> Can anyone comment on this being normal behavior? Is it
>>>> >> >>>> >>> indicative
>>>> >> >>>> >>> of
>>>> >> >>>> >>> faults causing stages to be resubmitted? I also cannot find
>>>> the
>>>> >> >>>> >>> missing stages in any stage's parent List(Stage x, Stage y,
>>>> ...)
>>>> >> >>>> >>>
>>>> >> >>>> >>> Thanks,
>>>> >> >>>> >>> Mike
>>>> >> >>>> >>>
>>>> >> >>>> >>>
>>>> >> >>>> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote:
>>>> >> >>>> >>> > Thanks, René. I actually added a warning to the new JDBC
>>>> >> >>>> reader/writer
>>>> >> >>>> >>> > interface for 1.4.0.
>>>> >> >>>> >>> >
>>>> >> >>>> >>> > Even with that, I think we should support throttling JDBC;
>>>> >> >>>> >>> > otherwise
>>>> >> >>>> >>> > it's
>>>> >> >>>> >>> > too convenient for our users to DOS their production
>>>> database
>>>> >> >>>> servers!
>>>> >> >>>> >>> >
>>>> >> >>>> >>> >
>>>> >> >>>> >>> >   /**
>>>> >> >>>> >>> >    * Construct a [[DataFrame]] representing the database
>>>> table
>>>> >> >>>> >>> > accessible
>>>> >> >>>> >>> > via JDBC URL
>>>> >> >>>> >>> >    * url named table. Partitions of the table will be
>>>> >> >>>> >>> > retrieved
>>>> >> >>>> >>> > in
>>>> >> >>>> >>> > parallel
>>>> >> >>>> >>> > based on the parameters
>>>> >> >>>> >>> >    * passed to this function.
>>>> >> >>>> >>> >    *
>>>> >> >>>> >>> > *   * Don't create too many partitions in parallel on a
>>>> large
>>>> >> >>>> cluster;
>>>> >> >>>> >>> > otherwise Spark might crash*
>>>> >> >>>> >>> > *   * your external database systems.*
>>>> >> >>>> >>> >    *
>>>> >> >>>> >>> >    * @param url JDBC database url of the form
>>>> >> >>>> >>> > `jdbc:subprotocol:subname`
>>>> >> >>>> >>> >    * @param table Name of the table in the external
>>>> database.
>>>> >> >>>> >>> >    * @param columnName the name of a column of integral
>>>> type
>>>> >> that
>>>> >> >>>> will
>>>> >> >>>> >>> > be
>>>> >> >>>> >>> > used for partitioning.
>>>> >> >>>> >>> >    * @param lowerBound the minimum value of `columnName`
>>>> used
>>>> >> >>>> >>> > to
>>>> >> >>>> >>> > decide
>>>> >> >>>> >>> > partition stride
>>>> >> >>>> >>> >    * @param upperBound the maximum value of `columnName`
>>>> used
>>>> >> >>>> >>> > to
>>>> >> >>>> >>> > decide
>>>> >> >>>> >>> > partition stride
>>>> >> >>>> >>> >    * @param numPartitions the number of partitions.  the
>>>> range
>>>> >> >>>> >>> > `minValue`-`maxValue` will be split
>>>> >> >>>> >>> >    *                      evenly into this many partitions
>>>> >> >>>> >>> >    * @param connectionProperties JDBC database connection
>>>> >> >>>> >>> > arguments,
>>>> >> >>>> a
>>>> >> >>>> >>> > list
>>>> >> >>>> >>> > of arbitrary string
>>>> >> >>>> >>> >    *                             tag/value. Normally at
>>>> least
>>>> >> >>>> >>> > a
>>>> >> >>>> "user"
>>>> >> >>>> >>> > and
>>>> >> >>>> >>> > "password" property
>>>> >> >>>> >>> >    *                             should be included.
>>>> >> >>>> >>> >    *
>>>> >> >>>> >>> >    * @since 1.4.0
>>>> >> >>>> >>> >    */
>>>> >> >>>> >>> >
>>>> >> >>>> >>> >
>>>> >> >>>> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <
>>>> >> rtref...@gmail.com>
>>>> >> >>>> >>> > wrote:
>>>> >> >>>> >>> >
>>>> >> >>>> >>> >> Hi,
>>>> >> >>>> >>> >>
>>>> >> >>>> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
>>>> >> >>>> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where
>>>> >> >>>> >>> >> "where"
>>>> >> >>>> >>> >> is
>>>> >> >>>> an
>>>> >> >>>> >>> >> Array[String] of 32 to 48 elements).  (The code is
>>>> tailored
>>>> >> >>>> >>> >> to
>>>> >> >>>> >>> >> your
>>>> >> >>>> >>> >> db,
>>>> >> >>>> >>> >> specifically through the where conditions, I'd have
>>>> otherwise
>>>> >> >>>> >>> >> post
>>>> >> >>>> >>> >> it)
>>>> >> >>>> >>> >> That should be the DataFrame API, but I'm just trying to
>>>> load
>>>> >> >>>> >>> >> everything
>>>> >> >>>> >>> >> and discard it as soon as possible :-)
>>>> >> >>>> >>> >>
>>>> >> >>>> >>> >> (1) Never do a silent drop of the values by default: it
>>>> kills
>>>> >> >>>> >>> >> confidence.
>>>> >> >>>> >>> >> An option sounds reasonable.  Some sort of insight / log
>>>> >> >>>> >>> >> would
>>>> >> >>>> >>> >> be
>>>> >> >>>> >>> >> great.
>>>> >> >>>> >>> >> (How many columns of what type were truncated? why?)
>>>> >> >>>> >>> >> Note that I could declare the field as string via
>>>> >> >>>> >>> >> JdbcDialects
>>>> >> >>>> (thank
>>>> >> >>>> >>> >> you
>>>> >> >>>> >>> >> guys for merging that :-) ).
>>>> >> >>>> >>> >> I have quite bad experiences with silent drops /
>>>> truncates of
>>>> >> >>>> columns
>>>> >> >>>> >>> >> and
>>>> >> >>>> >>> >> thus _like_ the strict way of spark. It causes trouble
>>>> but
>>>> >> >>>> >>> >> noticing
>>>> >> >>>> >>> >> later
>>>> >> >>>> >>> >> that your data was corrupted during conversion is even
>>>> worse.
>>>> >> >>>> >>> >>
>>>> >> >>>> >>> >> (2) SPARK-8004
>>>> >> https://issues.apache.org/jira/browse/SPARK-8004
>>>> >> >>>> >>> >>
>>>> >> >>>> >>> >> (3) One option would be to make it safe to use, the other
>>>> >> option
>>>> >> >>>> >>> >> would
>>>> >> >>>> >>> >> be
>>>> >> >>>> >>> >> to document the behavior (s.th. like "WARNING: this
>>>> method
>>>> >> tries
>>>> >> >>>> >>> >> to
>>>> >> >>>> >>> >> load
>>>> >> >>>> >>> >> as many partitions as possible, make sure your database
>>>> can
>>>> >> >>>> >>> >> handle
>>>> >> >>>> >>> >> the
>>>> >> >>>> >>> >> load
>>>> >> >>>> >>> >> or load them in chunks and use union"). SPARK-8008
>>>> >> >>>> >>> >> https://issues.apache.org/jira/browse/SPARK-8008
>>>> >> >>>> >>> >>
>>>> >> >>>> >>> >> Regards,
>>>> >> >>>> >>> >>   Rene Treffer
>>>> >> >>>> >>> >>
>>>> >> >>>> >>> >
>>>> >> >>>> >>>
>>>> >> >>>> >>>
>>>> >> >>>> >>> --
>>>> >> >>>> >>> Thanks,
>>>> >> >>>> >>> Mike
>>>> >> >>>> >>>
>>>> >> >>>> >>>
>>>> >> ---------------------------------------------------------------------
>>>> >> >>>> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>>> >> >>>> >>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>> >> >>>> >>>
>>>> >> >>>> >>
>>>> >> >>>> >
>>>> >> >>>>
>>>> >> >>>>
>>>> >> >>>> --
>>>> >> >>>> Thanks,
>>>> >> >>>> Mike
>>>> >> >>>>
>>>> >> >>>>
>>>> >> >>>>
>>>> ---------------------------------------------------------------------
>>>> >> >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>>> >> >>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>> >> >>>>
>>>> >> >>>
>>>> >> >>
>>>> >> >>
>>>> >> >> --
>>>> >> >> Thanks,
>>>> >> >> Mike
>>>> >> >>
>>>> >> >
>>>> >> >
>>>> >> > --
>>>> >> > Thanks,
>>>> >> > Mike
>>>> >> >
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Thanks,
>>>> >> Mike
>>>> >>
>>>> >
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Mike
>>>>
>>>
>>>
>>
>

Reply via email to