Kay,

Excellent write-up. This should be preserved for reference somewhere
searchable.

-Gerard.



On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout <k...@eecs.berkeley.edu>
wrote:

> Here’s how the shuffle works.  This explains what happens for a single
> task; this will happen in parallel for each task running on the machine,
> and as Imran said, Spark runs up to “numCores” tasks concurrently on each
> machine.  There's also an answer to the original question about why CPU use
> is low at the very bottom.
>
> The key data structure used in fetching shuffle data is the “results”
> queue in ShuffleBlockFetcherIterator, which buffers data that we have in
> serialized (and maybe compressed) form, but haven’t yet deserialized /
> processed.  The results queue is filled by many threads fetching data over
> the network (the number of concurrent threads fetching data is equal to the
> number of remote executors we’re currently fetching data from) [0], and is
> consumed by a single thread that deserializes the data and computes some
> function over it (e.g., if you’re doing rdd.count(), the thread
> deserializes the data and counts the number of items).  As we fetch data
> over the network, we track bytesInFlight, which is data that has been
> requested (and possibly received) from a remote executor, but that hasn’t
> yet been deserialized / processed by the consumer thread.  So, this
> includes all of the data in the “results” queue, and possibly more data
> that’s currently outstanding over the network.  We always issue as many
> requests as we can, with the constraint that bytesInFlight remains less
> than a specified maximum [1].
>
> In a little more detail, here’s exactly what happens when a task begins
> reading shuffled data:
>
> (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1]
> over the network (this happens here
> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260>
> ).
>
> These requests are all executed asynchronously using a ShuffleClient [2]
> via the shuffleClient.fetchBlocks call
> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149>
>  [3].  We pass in a callback that, once a block has been successfully
> fetched, sticks it on the “results” queue.
>
> (2) Begin processing the local data.  One by one, we request the local
> data from the local block manager (which memory maps the file) and then stick
> the result onto the results queue
> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230>.
> Because we memory map the files, which is speedy, the local data typically
> all ends up on the results in front of the remote data.
>
> (3) One the async network requests have been issued (note — issued, but
> not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1)
> and (2) have happened), ShuffleBlockFetcherIterator returns an iterator
> that gets wrapped too many times to count [4] and eventually gets unrolled
> [5].  Each time next() is called on the iterator, it blocks waiting for an
> item from the results queue.  This may return right away, or if the queue
> is empty, will block waiting on new data from the network [6].  Before
> returning from next(), we update our accounting for the bytes in flight:
> the chunk of data we return is no longer considered in-flight, because it’s
> about to be processed, so we update the current bytesInFlight, and if it
> won’t result in > maxBytesInFlight outstanding, send some more requests for
> data.
>
> ————————————————
>
> Notes:
>
> [0] Note that these threads consume almost no CPU resources, because they
> just receive data from the OS and then execute a callback that sticks the
> data on the results queue.
>
> [1] We limit the data outstanding on the network to avoid using too much
> memory to hold the data we’ve fetched over the network but haven’t yet
> processed.
>
> [1.5] Each request may include multiple shuffle blocks, where is a "block"
> is the data output for this reduce task by a particular map task.  All of
> the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks
> shuffle blocks; each reduce task reads # map tasks blocks.  We do some
> hacks
> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177>
> to try to size these requests in a "good" way: we limit each request to
> about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines
> concurrently without exceeding maxBytesInFlight.  5 is completely a magic
> number here that was probably guessed by someone long long ago, and it
> seems to work ok.
>
> [2] The default configuration uses NettyBlockTransferService as the
> ShuffleClient implementation (note that this extends BlockTransferService,
> which extends ShuffleClient).
>
> [3] If you’re curious how the shuffle client fetches data, the default
> Spark configuration results in exactly one TCP connection from an executor
> to each other executor.  If executor A is getting shuffle data from
> executor B, we start by sending an OpenBlocks message from A to B.  The
> OpenBlocks message includes the list of blocks that A wants to fetch, and
> causes the remote executor, B, to start to pull the corresponding data into
> memory from disk (we typically memory map the files, so this may not
> actually result in the data being read yet), and also to store some state
> associated with this “stream” of data.  The remote executor, B, responds
> with a stream ID that helps it to identify the connection.  Next, A
> requests blocks one at a time from B using an ChunkFetchRequest message
> (this happens here
> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L101>
>  in
> OneForOneBlockFetcher, which calls this code
> <https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java#L97>
> in TransportClient; currently, we have a one-to-one mapping from a chunk to
> a particular block).  It’s possible that there are many sets of shuffle
> data being fetched concurrently between A and B (e.g., because many tasks
> are run concurrently).  These requests are serialized, so one block is sent
> at a time from B, and they’re sent in the order that the requests were
> issued on A.
>
> [4] In BlockStoreShuffleFetcher, which handles failures; then in
> HashShuffleReader, which helps aggregate some of the data; etc.
>
> [5] This happens in BlockManager.putIterator, if the RDD is going to be
> cached; in the function passed in to ResultTask, if this is the last stage
> in a job; or via the writer.write() call in ShuffleMapTask, if this is a
> stage that generates intermediate shuffle data.
>
> [6] We time how long we spend blocking on data from the network; this is
> what’s shown as “fetch wait time” in Spark’s UI.
>
> ——————————————————
>
> To answer the original question about why CPU use is low, this means that
> the main thread (that pulls data off of the results queue) is blocking on
> I/O.  It could be blocking to receive data over the network (which will be
> included in the fetch wait time shown in the UI, described above), or it
> could be blocking to read data from the local disk (because the shuffle
> data that is read locally is memory-mapped, so the thread may block on disk
> if the data hasn’t been read into memory yet).  It could also be blocking
> because it’s writing new shuffle output to disk (also shown in the UI, as
> shuffle write time), or because it’s spilling intermediate data.
> Everything except the spilling is shown in the handy-dandy new
> visualization that Kousuke recently added
> <https://github.com/apache/spark/commit/a5f7b3b9c7f05598a1cc8e582e5facee1029cd5e>to
> the UI (available in 1.4 onwards): if you look at the stage view, and click
> on “Event Timeline”, you’ll see a visualization of the tasks and how long
> they spent blocked on various things (we should add the spilling to
> this…there is a long-outstanding JIRA for this).
>
> Hope this helps!
>
> -Kay
>
> On Thu, Jun 11, 2015 at 7:42 AM, Imran Rashid <iras...@cloudera.com>
> wrote:
>
>> That is not exactly correct -- that being said I'm not 100% on these
>> details either so I'd appreciate you double checking  and / or another dev
>> confirming my description.
>>
>>
>> Spark actually has more threads going then the "numCores" you specify.
>> "numCores" is really used for how many threads are actively executing
>> tasks.  There are more threads for doing the fetching (the details of which
>> I'm not that familiar with) -- that never cuts into the number of actively
>> executing tasks for "numCores".  There isn't a 1-to-1 correspondence
>> between one shuffle block and one task -- a shuffle-read task most likely
>> needs to fetch many shuffle blocks, with some local and some remote (in
>> most cases).  So, from your lingo above, c is numCores, but c_1 is just an
>> independent pool of threads.
>>
>> this obvious follow up question is, if you've actually more than
>> "numCores" threads going at the same time, how come cpu usage is low?  You
>> could still have your cpus stuck waiting on i/o, from disk or network, so
>> they aren't getting fully utilized.  And the cpu can also be idle waiting
>> for memory, if there are a lot of cache misses (I'm not sure how that will
>> show up in cpu monitoring).  If that were the case, that could even be from
>> too many threads, as a lot of time is spent context switching ... but I'm
>> just guessing now.
>>
>> hope this helps,
>> Imran
>>
>>
>>
>> On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> Hi Imran,
>>>
>>> Thank you again for your email.
>>>
>>> I just want to ask one further question to clarify the implementation
>>> of the shuffle block fetches. When you say that rather than sitting
>>> idle, [the executor] will immediately start reading the local block, I
>>> would guess that, in implementation, the executor is going to launch
>>> concurrent threads to read both local and remote blocks, which it
>>> seems to do in the initialize() method of
>>> core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case
>>> or would the Executor run all local fetch threads first?
>>>
>>> The reason I ask is that if the slave machine on which the Executor is
>>> running has some number of cores, c, then I  would have thought that
>>> some of the threads launched would occupy some number, c_1, of the
>>> cores and conduct the local reads (where c_1 <= c). The other threads
>>> would occupy the other (c - c_1) cores' cycles until *all* necessary
>>> blocks have been read, and depending on c and the number of blocks to
>>> fetch so that none of the cores are idle if there are many blocks to
>>> fetch. (I monitor the CPU utilization of our nodes throughout a job,
>>> and generally find them under-utilized statistically speaking; that
>>> is, their usage over the whole job is lower than expected, with short
>>> burst of high usage, so I ask this question in a specific way for this
>>> reason, since I can see trends in the probability density functions of
>>> CPU utilization as the #partitions of our RDDs are increased).
>>>
>>> ShuffleBlockFetcherIterator.scala:
>>>
>>>   private[this] def initialize(): Unit = {
>>>                 ...
>>>     // Send out initial requests for blocks, up to our maxBytesInFlight
>>>     while (fetchRequests.nonEmpty &&
>>>       (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size
>>> <= maxBytesInFlight)) {
>>>       sendRequest(fetchRequests.dequeue())
>>>     }
>>>     val numFetches = remoteRequests.size - fetchRequests.size
>>>     logInfo("Started " + numFetches + " remote fetches in" +
>>> Utils.getUsedTimeMs(startTime))
>>>
>>>     // Get Local Blocks
>>>     fetchLocalBlocks()
>>>     logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
>>>   }
>>>   private[this] def fetchLocalBlocks() {
>>>     val iter = localBlocks.iterator
>>>     while (iter.hasNext) {
>>>       val blockId = iter.next()
>>>       try {
>>>         val buf = blockManager.getBlockData(blockId)
>>>         shuffleMetrics.incLocalBlocksFetched(1)
>>>         shuffleMetrics.incLocalBytesRead(buf.size)
>>>         buf.retain()
>>>         results.put(new SuccessFetchResult(blockId, 0, buf))
>>>       } catch {
>>>                                 ...
>>>       }
>>>     }
>>>   }
>>>
>>> Obviously, I will have to sit down with core/.../network/nio/* and
>>> core/.../shuffle/* and do my own homework on this, but from what I can
>>> tell, the BlockDataManager relies on either
>>> NioBlockTransferService.scala or the NettyBlockTransferService.scala
>>> (which are set in SparkEnv.scala), both of which do the grunt work of
>>> actually buffering and transferring the blocks' bytes. Finally, the
>>> tasks in new stage for which the shuffle outputs have been fetched
>>> will not commence until all of the block fetching threads (both local
>>> and remote) have terminated.
>>>
>>> Does the above paint an accurate picture? I would really appreciate
>>> clarification on the concurrency, since I would like to determine why
>>> our jobs have under-utilization and poor weak scaling efficiency.
>>>
>>> I will cc this thread over to the dev list. I did not cc them in case
>>> my previous question was trivial---I didn't want to spam the list
>>> unnecessarily, since I do not see these kinds of questions posed there
>>> frequently.
>>>
>>> Thanks a bunch,
>>> Mike
>>>
>>>
>>> On 6/10/15, Imran Rashid <iras...@cloudera.com> wrote:
>>> > Hi Mike,
>>> >
>>> > no, this is a good question, I can see how my response could be
>>> interpreted
>>> > both ways.
>>> >
>>> > To be more precise:
>>> > *nothing* is fetched until the shuffle-read stage starts.  So it is
>>> normal
>>> > to see a spike in cluster bandwidth when that stage starts.  There is a
>>> > hard-boundary between stages -- that is, spark never starts any tasks
>>> in
>>> > one stage until *all* tasks in the dependent stages have been
>>> completed.
>>> > (There has been on-and-off discussion about relaxing this, but IMO
>>> this is
>>> > unlikely to change in the near future.)  So spark will wait for all of
>>> the
>>> > tasks in the previous shuffle-write stage to finish, and then kick off
>>> a
>>> > bunch of shuffle-read tasks in the next stage, leading to the spike you
>>> > see.
>>> >
>>> > I was referring to the way blocks are fetched within one of those
>>> > shuffle-read tasks.  One of those tasks is will probably going to need
>>> a
>>> > bunch of different blocks, from many executors.  But some of the
>>> blocks it
>>> > needs will probably exist locally.  So the task first sends out a
>>> request
>>> > to fetch blocks remotely (leading to the spike), but rather than
>>> sitting
>>> > idle, it will immediately start reading the local blocks.  Ideally, by
>>> the
>>> > time its done reading the local blocks, some of the remote blocks have
>>> > already been fetched, so no time is spent *waiting* for the remote
>>> reads.
>>> > As the remote blocks get read, spark sends out more requests, trying to
>>> > balance how much data needs to be buffered vs. preventing any waiting
>>> on
>>> > remote reads (which can  be controlled by
>>> spark.reducer.maxSizeInFlight).
>>> >
>>> > Hope that clarifies things!
>>> >
>>> > btw, you sent this last question to just me -- I think its a good
>>> question,
>>> > do you mind sending it to the list?  I figured that was accidental but
>>> > wanted to check.
>>> >
>>> > Imran
>>> >
>>> > On Wed, Jun 10, 2015 at 12:20 AM, Mike Hynes <91m...@gmail.com> wrote:
>>> >
>>> >> Hi Imran,
>>> >> One additional quick question---I just want to confirm that I fully
>>> >> understand your comment that "blocks are fetched before they are
>>> >> needed." Typically on our system, we see spikes in cluster bandwidth
>>> >> (with ganglia) at stage boundaries, so I previously assumed that all
>>> >> shuffle read occurred there. Do you mean that the blocks are fetched
>>> >> by the shuffle read iterator, and hence when tasks occur afterwards
>>> >> the necessary blocks have already been fetched?
>>> >> Thanks---I am sorry if this is an obvious question, but I'd like to
>>> >> understand this as precisely as possible.
>>> >> Mike
>>> >>
>>> >> On 6/10/15, Mike Hynes <91m...@gmail.com> wrote:
>>> >> > Ahhh---forgive my typo: what I mean is,
>>> >> > (t2 - t1) >= (t_ser + t_deser + t_exec)
>>> >> > is satisfied, empirically.
>>> >> >
>>> >> > On 6/10/15, Mike Hynes <91m...@gmail.com> wrote:
>>> >> >> Hi Imran,
>>> >> >>
>>> >> >> Thank you for your email.
>>> >> >>
>>> >> >> In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I
>>> >> >> have found it to be true, although I have not included the
>>> >> >> t_{wait_for_read} in this, since it is---so far as I can
>>> tell---been
>>> >> >> either zero or negligible compared to the task time.
>>> >> >>
>>> >> >> Thanks,
>>> >> >> Mike
>>> >> >>
>>> >> >> On 6/8/15, Imran Rashid <iras...@cloudera.com> wrote:
>>> >> >>> Hi Mike,
>>> >> >>>
>>> >> >>> all good questions, let me take a stab at answering them:
>>> >> >>>
>>> >> >>> 1. Event Logs + Stages:
>>> >> >>>
>>> >> >>> Its normal for stages to get skipped if they are shuffle map
>>> stages,
>>> >> >>> which
>>> >> >>> get read multiple times.  Eg., here's a little example program I
>>> >> >>> wrote
>>> >> >>> earlier to demonstrate this: "d3" doesn't need to be re-shuffled
>>> >> >>> since
>>> >> >>> each
>>> >> >>> time its read w/ the same partitioner.  So skipping stages in this
>>> >> >>> way
>>> >> >>> is
>>> >> >>> a
>>> >> >>> good thing:
>>> >> >>>
>>> >> >>> val partitioner = new org.apache.spark.HashPartitioner(10)
>>> >> >>> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) ->
>>> >> >>> x}.partitionBy(partitioner)
>>> >> >>> (0 until 5).foreach { idx =>
>>> >> >>>   val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x %
>>> 10)
>>> >> ->
>>> >> >>> x}.partitionBy(partitioner)
>>> >> >>>   println(idx + " ---> " + otherData.join(d3).count())
>>> >> >>> }
>>> >> >>>
>>> >> >>> If you run this, f you look in the UI you'd see that all jobs
>>> except
>>> >> for
>>> >> >>> the first one have one stage that is skipped.  You will also see
>>> this
>>> >> in
>>> >> >>> the log:
>>> >> >>>
>>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage:
>>> >> >>> List(Stage
>>> >> >>> 12,
>>> >> >>> Stage 13)
>>> >> >>>
>>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage
>>> 13)
>>> >> >>>
>>> >> >>> Admittedly that is not very clear, but that is sort of indicating
>>> to
>>> >> you
>>> >> >>> that the DAGScheduler first created stage 12 as a necessary step,
>>> and
>>> >> >>> then
>>> >> >>> later on changed its mind by realizing that everything it needed
>>> for
>>> >> >>> stage
>>> >> >>> 12 already existed, so there was nothing to do.
>>> >> >>>
>>> >> >>>
>>> >> >>> 2. Extracting Event Log Information
>>> >> >>>
>>> >> >>> maybe you are interested in SparkListener ? Though unfortunately,
>>> I
>>> >> >>> don't
>>> >> >>> know of a good blog post describing it, hopefully the docs are
>>> clear
>>> >> ...
>>> >> >>>
>>> >> >>> 3. Time Metrics in Spark Event Log
>>> >> >>>
>>> >> >>> This is a great question.  I *think* the only exception is that
>>> t_gc
>>> >> >>> is
>>> >> >>> really overlapped with t_exec.  So I think you should really
>>> expect
>>> >> >>>
>>> >> >>> (t2 - t1) < (t_ser + t_deser + t_exec)
>>> >> >>>
>>> >> >>> I am not 100% sure about this, though.  I'd be curious if that was
>>> >> >>> constraint was ever violated.
>>> >> >>>
>>> >> >>>
>>> >> >>> As for your question on shuffle read vs. shuffle write time -- I
>>> >> >>> wouldn't
>>> >> >>> necessarily expect the same stage to have times for both shuffle
>>> read
>>> >> >>> &
>>> >> >>> shuffle write -- in the simplest case, you'll have shuffle write
>>> >> >>> times
>>> >> >>> in
>>> >> >>> one, and shuffle read times in the next one.  But even taking that
>>> >> >>> into
>>> >> >>> account, there is a difference in the way they work & are
>>> measured.
>>> >> >>>  shuffle read operations are pipelined and the way we measure
>>> shuffle
>>> >> >>> read,
>>> >> >>> its just how much time is spent *waiting* for network transfer.
>>> It
>>> >> >>> could
>>> >> >>> be that there is no (measurable) wait time b/c the next blocks are
>>> >> >>> fetched
>>> >> >>> before they are needed.  Shuffle writes occur in the normal task
>>> >> >>> execution
>>> >> >>> thread, though, so we (try to) measure all of it.
>>> >> >>>
>>> >> >>>
>>> >> >>> On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com>
>>> wrote:
>>> >> >>>
>>> >> >>>> Hi Patrick and Akhil,
>>> >> >>>>
>>> >> >>>> Thank you both for your responses. This is a bit of an extended
>>> >> >>>> email,
>>> >> >>>> but I'd like to:
>>> >> >>>> 1. Answer your (Patrick) note about the "missing" stages since
>>> the
>>> >> >>>> IDs
>>> >> >>>> do (briefly) appear in the event logs
>>> >> >>>> 2. Ask for advice/experience with extracting information from the
>>> >> >>>> event logs in a columnar, delimiter-separated format.
>>> >> >>>> 3. Ask about the time metrics reported in the event logs;
>>> currently,
>>> >> >>>> the elapsed time for a task does not equal the sum of the times
>>> for
>>> >> >>>> its components
>>> >> >>>>
>>> >> >>>> 1. Event Logs + Stages:
>>> >> >>>> =========================
>>> >> >>>>
>>> >> >>>> As I said before, In the spark logs (the log4j configurable ones
>>> >> >>>> from
>>> >> >>>> the driver), I only see references to some stages, where the
>>> stage
>>> >> >>>> IDs
>>> >> >>>> are not arithmetically increasing. In the event logs, however, I
>>> >> >>>> will
>>> >> >>>> see reference to *every* stage, although not all stages will have
>>> >> >>>> tasks associated with them.
>>> >> >>>>
>>> >> >>>> For instance, to examine the actual stages that have tasks, you
>>> can
>>> >> >>>> see missing stages:
>>> >> >>>> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \
>>> >> >>>> #               | grep -Eo '"Stage ID":[[:digit:]]+'  \
>>> >> >>>> #               | sort -n|uniq | head -n 5
>>> >> >>>> "Stage ID":0
>>> >> >>>> "Stage ID":1
>>> >> >>>> "Stage ID":10
>>> >> >>>> "Stage ID":11
>>> >> >>>> "Stage ID":110
>>> >> >>>>
>>> >> >>>> However, these "missing" stages *do* appear in the event logs as
>>> >> >>>> Stage
>>> >> >>>> IDs in the jobs submitted, i.e: for
>>> >> >>>> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo
>>> >> >>>> 'Stage
>>> >> >>>> IDs":\[.*\]' | head -n 5
>>> >> >>>> Stage IDs":[0,1,2]
>>> >> >>>> Stage IDs":[5,3,4]
>>> >> >>>> Stage IDs":[6,7,8]
>>> >> >>>> Stage IDs":[9,10,11]
>>> >> >>>> Stage IDs":[12,13,14]
>>> >> >>>>
>>> >> >>>> I do not know if this amounts to a bug, since I am not familiar
>>> with
>>> >> >>>> the scheduler in detail. The stages have seemingly been created
>>> >> >>>> somewhere in the DAG, but then have no associated tasks and never
>>> >> >>>> appear again.
>>> >> >>>>
>>> >> >>>> 2. Extracting Event Log Information
>>> >> >>>> ====================================
>>> >> >>>> Currently we are running scalability tests, and are finding very
>>> >> >>>> poor
>>> >> >>>> scalability for certain block matrix algorithms. I would like to
>>> >> >>>> have
>>> >> >>>> finer detail about the communication time and bandwidth when
>>> data is
>>> >> >>>> transferred between nodes.
>>> >> >>>>
>>> >> >>>> I would really just like to have a file with nothing but task
>>> info
>>> >> >>>> in
>>> >> >>>> a format such as:
>>> >> >>>> timestamp (ms), task ID, hostname, execution time (ms), GC time
>>> >> >>>> (ms),
>>> >> >>>> ...
>>> >> >>>> 0010294, 1, slave-1, 503, 34, ...
>>> >> >>>> 0010392, 2, slave-2, 543, 32, ...
>>> >> >>>> and similarly for jobs/stages/rdd_memory/shuffle output/etc.
>>> >> >>>>
>>> >> >>>> I have extracted the relevant time fields from the spark event
>>> logs
>>> >> >>>> with a sed script, but I wonder if there is an even more
>>> expedient
>>> >> >>>> way. Unfortunately, I do not immediately see how to do this using
>>> >> >>>> the
>>> >> >>>> $SPARK_HOME/conf/metrics.properties file and haven't come across
>>> a
>>> >> >>>> blog/etc that describes this. Could anyone please comment on
>>> whether
>>> >> >>>> or not a metrics configuation for this already exists?
>>> >> >>>>
>>> >> >>>> 3. Time Metrics in Spark Event Log
>>> >> >>>> ==================================
>>> >> >>>> I am confused about the times reported for tasks in the event
>>> log.
>>> >> >>>> There are launch and finish timestamps given for each task (call
>>> >> >>>> them
>>> >> >>>> t1 and t2, respectively), as well as GC time (t_gc), execution
>>> time
>>> >> >>>> (t_exec), and serialization times (t_ser, t_deser). However the
>>> >> >>>> times
>>> >> >>>> do not add up as I would have expected. I would imagine that the
>>> >> >>>> elapsed time t2 - t1 would be slightly larger than the sum of the
>>> >> >>>> component times. However, I can find many instances in the event
>>> >> >>>> logs
>>> >> >>>> where:
>>> >> >>>> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec)
>>> >> >>>> The difference can be 500 ms or more, which is not negligible
>>> for my
>>> >> >>>> current execution times of ~5000 ms. I have attached a plot that
>>> >> >>>> illustrates this.
>>> >> >>>>
>>> >> >>>> Regarding this, I'd like to ask:
>>> >> >>>> 1. How exactly are these times are being measured?
>>> >> >>>> 2. Should the sum of the component times equal the elapsed
>>> (clock)
>>> >> >>>> time for the task?
>>> >> >>>> 3. If not, which component(s) is(are) being excluded, and when do
>>> >> >>>> they
>>> >> >>>> occur?
>>> >> >>>> 4. There are occasionally reported measurements for Shuffle Write
>>> >> >>>> time, but not shuffle read time. Is there a method to determine
>>> the
>>> >> >>>> time required to shuffle data? Could this be done by look at
>>> delays
>>> >> >>>> between the first task in a new stage and the last task in the
>>> >> >>>> previous stage?
>>> >> >>>>
>>> >> >>>> Thank you very much for your time,
>>> >> >>>> Mike
>>> >> >>>>
>>> >> >>>>
>>> >> >>>> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote:
>>> >> >>>> > Hey Mike,
>>> >> >>>> >
>>> >> >>>> > Stage ID's are not guaranteed to be sequential because of the
>>> way
>>> >> the
>>> >> >>>> > DAG scheduler works (only increasing). In some cases stage ID
>>> >> numbers
>>> >> >>>> > are skipped when stages are generated.
>>> >> >>>> >
>>> >> >>>> > Any stage/ID that appears in the Spark UI is an actual stage,
>>> so
>>> >> >>>> > if
>>> >> >>>> > you see ID's in there, but they are not in the logs, then let
>>> us
>>> >> know
>>> >> >>>> > (that would be a bug).
>>> >> >>>> >
>>> >> >>>> > - Patrick
>>> >> >>>> >
>>> >> >>>> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das
>>> >> >>>> > <ak...@sigmoidanalytics.com>
>>> >> >>>> > wrote:
>>> >> >>>> >> Are you seeing the same behavior on the driver UI? (that
>>> running
>>> >> >>>> >> on
>>> >> >>>> >> port
>>> >> >>>> >> 4040), If you click on the stage id header you can sort the
>>> >> >>>> >> stages
>>> >> >>>> >> based
>>> >> >>>> >> on
>>> >> >>>> >> IDs.
>>> >> >>>> >>
>>> >> >>>> >> Thanks
>>> >> >>>> >> Best Regards
>>> >> >>>> >>
>>> >> >>>> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91m...@gmail.com
>>> >
>>> >> >>>> >> wrote:
>>> >> >>>> >>>
>>> >> >>>> >>> Hi folks,
>>> >> >>>> >>>
>>> >> >>>> >>> When I look at the output logs for an iterative Spark
>>> program, I
>>> >> >>>> >>> see
>>> >> >>>> >>> that the stage IDs are not arithmetically numbered---that is,
>>> >> there
>>> >> >>>> >>> are gaps between stages and I might find log information
>>> about
>>> >> >>>> >>> Stage
>>> >> >>>> >>> 0, 1,2, 5, but not 3 or 4.
>>> >> >>>> >>>
>>> >> >>>> >>> As an example, the output from the Spark logs below shows
>>> what I
>>> >> >>>> >>> mean:
>>> >> >>>> >>>
>>> >> >>>> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr  | grep finished
>>> >> >>>> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at
>>> >> >>>> >>> blockMap.scala:1444)
>>> >> >>>> >>> finished in 7.820 s:
>>> >> >>>> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810)
>>> >> >>>> >>> finished
>>> >> >>>> >>> in 3.874 s:
>>> >> >>>> >>> 18291:INFO:DAGScheduler:Stage 2 (count at
>>> blockMap.scala:1179)
>>> >> >>>> >>> finished in 2.237 s:
>>> >> >>>> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817)
>>> >> >>>> >>> finished
>>> >> >>>> >>> in 1.749 s:
>>> >> >>>> >>> 21254:INFO:DAGScheduler:Stage 5 (count at
>>> blockMap.scala:1180)
>>> >> >>>> >>> finished in 1.082 s:
>>> >> >>>> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810)
>>> >> >>>> >>> finished
>>> >> >>>> >>> in 2.078 s:
>>> >> >>>> >>> 24773:INFO:DAGScheduler:Stage 8 (count at
>>> blockMap.scala:1188)
>>> >> >>>> >>> finished in 1.317 s:
>>> >> >>>> >>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817)
>>> >> >>>> >>> finished
>>> >> >>>> >>> in 1.638 s:
>>> >> >>>> >>> 27228:INFO:DAGScheduler:Stage 11 (count at
>>> blockMap.scala:1189)
>>> >> >>>> >>> finished in 0.732 s:
>>> >> >>>> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at
>>> >> >>>> >>> blockMap.scala:1302)
>>> >> >>>> >>> finished in 0.192 s:
>>> >> >>>> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at
>>> >> >>>> >>> blockMap.scala:1302)
>>> >> >>>> >>> finished in 0.170 s:
>>> >> >>>> >>> 28018:INFO:DAGScheduler:Stage 20 (count at
>>> blockMap.scala:1201)
>>> >> >>>> >>> finished in 0.270 s:
>>> >> >>>> >>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355)
>>> >> >>>> >>> finished
>>> >> >>>> >>> in 0.455 s:
>>> >> >>>> >>> 29598:INFO:DAGScheduler:Stage 24 (count at
>>> blockMap.scala:274)
>>> >> >>>> >>> finished in 0.928 s:
>>> >> >>>> >>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355)
>>> >> >>>> >>> finished
>>> >> >>>> >>> in 0.305 s:
>>> >> >>>> >>> 30390:INFO:DAGScheduler:Stage 28 (count at
>>> blockMap.scala:275)
>>> >> >>>> >>> finished in 0.391 s:
>>> >> >>>> >>> 30452:INFO:DAGScheduler:Stage 32 (first at
>>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s:
>>> >> >>>> >>> 30506:INFO:DAGScheduler:Stage 36 (first at
>>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s:
>>> >> >>>> >>>
>>> >> >>>> >>> Can anyone comment on this being normal behavior? Is it
>>> >> >>>> >>> indicative
>>> >> >>>> >>> of
>>> >> >>>> >>> faults causing stages to be resubmitted? I also cannot find
>>> the
>>> >> >>>> >>> missing stages in any stage's parent List(Stage x, Stage y,
>>> ...)
>>> >> >>>> >>>
>>> >> >>>> >>> Thanks,
>>> >> >>>> >>> Mike
>>> >> >>>> >>>
>>> >> >>>> >>>
>>> >> >>>> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote:
>>> >> >>>> >>> > Thanks, René. I actually added a warning to the new JDBC
>>> >> >>>> reader/writer
>>> >> >>>> >>> > interface for 1.4.0.
>>> >> >>>> >>> >
>>> >> >>>> >>> > Even with that, I think we should support throttling JDBC;
>>> >> >>>> >>> > otherwise
>>> >> >>>> >>> > it's
>>> >> >>>> >>> > too convenient for our users to DOS their production
>>> database
>>> >> >>>> servers!
>>> >> >>>> >>> >
>>> >> >>>> >>> >
>>> >> >>>> >>> >   /**
>>> >> >>>> >>> >    * Construct a [[DataFrame]] representing the database
>>> table
>>> >> >>>> >>> > accessible
>>> >> >>>> >>> > via JDBC URL
>>> >> >>>> >>> >    * url named table. Partitions of the table will be
>>> >> >>>> >>> > retrieved
>>> >> >>>> >>> > in
>>> >> >>>> >>> > parallel
>>> >> >>>> >>> > based on the parameters
>>> >> >>>> >>> >    * passed to this function.
>>> >> >>>> >>> >    *
>>> >> >>>> >>> > *   * Don't create too many partitions in parallel on a
>>> large
>>> >> >>>> cluster;
>>> >> >>>> >>> > otherwise Spark might crash*
>>> >> >>>> >>> > *   * your external database systems.*
>>> >> >>>> >>> >    *
>>> >> >>>> >>> >    * @param url JDBC database url of the form
>>> >> >>>> >>> > `jdbc:subprotocol:subname`
>>> >> >>>> >>> >    * @param table Name of the table in the external
>>> database.
>>> >> >>>> >>> >    * @param columnName the name of a column of integral
>>> type
>>> >> that
>>> >> >>>> will
>>> >> >>>> >>> > be
>>> >> >>>> >>> > used for partitioning.
>>> >> >>>> >>> >    * @param lowerBound the minimum value of `columnName`
>>> used
>>> >> >>>> >>> > to
>>> >> >>>> >>> > decide
>>> >> >>>> >>> > partition stride
>>> >> >>>> >>> >    * @param upperBound the maximum value of `columnName`
>>> used
>>> >> >>>> >>> > to
>>> >> >>>> >>> > decide
>>> >> >>>> >>> > partition stride
>>> >> >>>> >>> >    * @param numPartitions the number of partitions.  the
>>> range
>>> >> >>>> >>> > `minValue`-`maxValue` will be split
>>> >> >>>> >>> >    *                      evenly into this many partitions
>>> >> >>>> >>> >    * @param connectionProperties JDBC database connection
>>> >> >>>> >>> > arguments,
>>> >> >>>> a
>>> >> >>>> >>> > list
>>> >> >>>> >>> > of arbitrary string
>>> >> >>>> >>> >    *                             tag/value. Normally at
>>> least
>>> >> >>>> >>> > a
>>> >> >>>> "user"
>>> >> >>>> >>> > and
>>> >> >>>> >>> > "password" property
>>> >> >>>> >>> >    *                             should be included.
>>> >> >>>> >>> >    *
>>> >> >>>> >>> >    * @since 1.4.0
>>> >> >>>> >>> >    */
>>> >> >>>> >>> >
>>> >> >>>> >>> >
>>> >> >>>> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <
>>> >> rtref...@gmail.com>
>>> >> >>>> >>> > wrote:
>>> >> >>>> >>> >
>>> >> >>>> >>> >> Hi,
>>> >> >>>> >>> >>
>>> >> >>>> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ =>
>>> >> >>>> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where
>>> >> >>>> >>> >> "where"
>>> >> >>>> >>> >> is
>>> >> >>>> an
>>> >> >>>> >>> >> Array[String] of 32 to 48 elements).  (The code is
>>> tailored
>>> >> >>>> >>> >> to
>>> >> >>>> >>> >> your
>>> >> >>>> >>> >> db,
>>> >> >>>> >>> >> specifically through the where conditions, I'd have
>>> otherwise
>>> >> >>>> >>> >> post
>>> >> >>>> >>> >> it)
>>> >> >>>> >>> >> That should be the DataFrame API, but I'm just trying to
>>> load
>>> >> >>>> >>> >> everything
>>> >> >>>> >>> >> and discard it as soon as possible :-)
>>> >> >>>> >>> >>
>>> >> >>>> >>> >> (1) Never do a silent drop of the values by default: it
>>> kills
>>> >> >>>> >>> >> confidence.
>>> >> >>>> >>> >> An option sounds reasonable.  Some sort of insight / log
>>> >> >>>> >>> >> would
>>> >> >>>> >>> >> be
>>> >> >>>> >>> >> great.
>>> >> >>>> >>> >> (How many columns of what type were truncated? why?)
>>> >> >>>> >>> >> Note that I could declare the field as string via
>>> >> >>>> >>> >> JdbcDialects
>>> >> >>>> (thank
>>> >> >>>> >>> >> you
>>> >> >>>> >>> >> guys for merging that :-) ).
>>> >> >>>> >>> >> I have quite bad experiences with silent drops /
>>> truncates of
>>> >> >>>> columns
>>> >> >>>> >>> >> and
>>> >> >>>> >>> >> thus _like_ the strict way of spark. It causes trouble but
>>> >> >>>> >>> >> noticing
>>> >> >>>> >>> >> later
>>> >> >>>> >>> >> that your data was corrupted during conversion is even
>>> worse.
>>> >> >>>> >>> >>
>>> >> >>>> >>> >> (2) SPARK-8004
>>> >> https://issues.apache.org/jira/browse/SPARK-8004
>>> >> >>>> >>> >>
>>> >> >>>> >>> >> (3) One option would be to make it safe to use, the other
>>> >> option
>>> >> >>>> >>> >> would
>>> >> >>>> >>> >> be
>>> >> >>>> >>> >> to document the behavior (s.th. like "WARNING: this
>>> method
>>> >> tries
>>> >> >>>> >>> >> to
>>> >> >>>> >>> >> load
>>> >> >>>> >>> >> as many partitions as possible, make sure your database
>>> can
>>> >> >>>> >>> >> handle
>>> >> >>>> >>> >> the
>>> >> >>>> >>> >> load
>>> >> >>>> >>> >> or load them in chunks and use union"). SPARK-8008
>>> >> >>>> >>> >> https://issues.apache.org/jira/browse/SPARK-8008
>>> >> >>>> >>> >>
>>> >> >>>> >>> >> Regards,
>>> >> >>>> >>> >>   Rene Treffer
>>> >> >>>> >>> >>
>>> >> >>>> >>> >
>>> >> >>>> >>>
>>> >> >>>> >>>
>>> >> >>>> >>> --
>>> >> >>>> >>> Thanks,
>>> >> >>>> >>> Mike
>>> >> >>>> >>>
>>> >> >>>> >>>
>>> >> ---------------------------------------------------------------------
>>> >> >>>> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> >> >>>> >>> For additional commands, e-mail: dev-h...@spark.apache.org
>>> >> >>>> >>>
>>> >> >>>> >>
>>> >> >>>> >
>>> >> >>>>
>>> >> >>>>
>>> >> >>>> --
>>> >> >>>> Thanks,
>>> >> >>>> Mike
>>> >> >>>>
>>> >> >>>>
>>> >> >>>>
>>> ---------------------------------------------------------------------
>>> >> >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> >> >>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>> >> >>>>
>>> >> >>>
>>> >> >>
>>> >> >>
>>> >> >> --
>>> >> >> Thanks,
>>> >> >> Mike
>>> >> >>
>>> >> >
>>> >> >
>>> >> > --
>>> >> > Thanks,
>>> >> > Mike
>>> >> >
>>> >>
>>> >>
>>> >> --
>>> >> Thanks,
>>> >> Mike
>>> >>
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>

Reply via email to