Good idea -- I've added this to the wiki: https://cwiki.apache.org/confluence/display/SPARK/Shuffle+Internals. Happy to stick it elsewhere if folks think there's a more convenient place.
On Thu, Jun 11, 2015 at 4:46 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Kay, > > Excellent write-up. This should be preserved for reference somewhere > searchable. > > -Gerard. > > > > On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout <k...@eecs.berkeley.edu> > wrote: > >> Here’s how the shuffle works. This explains what happens for a single >> task; this will happen in parallel for each task running on the machine, >> and as Imran said, Spark runs up to “numCores” tasks concurrently on each >> machine. There's also an answer to the original question about why CPU use >> is low at the very bottom. >> >> The key data structure used in fetching shuffle data is the “results” >> queue in ShuffleBlockFetcherIterator, which buffers data that we have in >> serialized (and maybe compressed) form, but haven’t yet deserialized / >> processed. The results queue is filled by many threads fetching data over >> the network (the number of concurrent threads fetching data is equal to the >> number of remote executors we’re currently fetching data from) [0], and is >> consumed by a single thread that deserializes the data and computes some >> function over it (e.g., if you’re doing rdd.count(), the thread >> deserializes the data and counts the number of items). As we fetch data >> over the network, we track bytesInFlight, which is data that has been >> requested (and possibly received) from a remote executor, but that hasn’t >> yet been deserialized / processed by the consumer thread. So, this >> includes all of the data in the “results” queue, and possibly more data >> that’s currently outstanding over the network. We always issue as many >> requests as we can, with the constraint that bytesInFlight remains less >> than a specified maximum [1]. >> >> In a little more detail, here’s exactly what happens when a task begins >> reading shuffled data: >> >> (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data >> [1] over the network (this happens here >> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260> >> ). >> >> These requests are all executed asynchronously using a ShuffleClient [2] >> via the shuffleClient.fetchBlocks call >> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149> >> [3]. We pass in a callback that, once a block has been successfully >> fetched, sticks it on the “results” queue. >> >> (2) Begin processing the local data. One by one, we request the local >> data from the local block manager (which memory maps the file) and then stick >> the result onto the results queue >> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230>. >> Because we memory map the files, which is speedy, the local data typically >> all ends up on the results in front of the remote data. >> >> (3) One the async network requests have been issued (note — issued, but >> not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) >> and (2) have happened), ShuffleBlockFetcherIterator returns an iterator >> that gets wrapped too many times to count [4] and eventually gets unrolled >> [5]. Each time next() is called on the iterator, it blocks waiting for an >> item from the results queue. This may return right away, or if the queue >> is empty, will block waiting on new data from the network [6]. Before >> returning from next(), we update our accounting for the bytes in flight: >> the chunk of data we return is no longer considered in-flight, because it’s >> about to be processed, so we update the current bytesInFlight, and if it >> won’t result in > maxBytesInFlight outstanding, send some more requests for >> data. >> >> ———————————————— >> >> Notes: >> >> [0] Note that these threads consume almost no CPU resources, because they >> just receive data from the OS and then execute a callback that sticks the >> data on the results queue. >> >> [1] We limit the data outstanding on the network to avoid using too much >> memory to hold the data we’ve fetched over the network but haven’t yet >> processed. >> >> [1.5] Each request may include multiple shuffle blocks, where is a >> "block" is the data output for this reduce task by a particular map task. >> All of the reduce tasks for a shuffle read a total of # map tasks * # >> reduce tasks shuffle blocks; each reduce task reads # map tasks blocks. We >> do some hacks >> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177> >> to try to size these requests in a "good" way: we limit each request to >> about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines >> concurrently without exceeding maxBytesInFlight. 5 is completely a magic >> number here that was probably guessed by someone long long ago, and it >> seems to work ok. >> >> [2] The default configuration uses NettyBlockTransferService as the >> ShuffleClient implementation (note that this extends BlockTransferService, >> which extends ShuffleClient). >> >> [3] If you’re curious how the shuffle client fetches data, the default >> Spark configuration results in exactly one TCP connection from an executor >> to each other executor. If executor A is getting shuffle data from >> executor B, we start by sending an OpenBlocks message from A to B. The >> OpenBlocks message includes the list of blocks that A wants to fetch, and >> causes the remote executor, B, to start to pull the corresponding data into >> memory from disk (we typically memory map the files, so this may not >> actually result in the data being read yet), and also to store some state >> associated with this “stream” of data. The remote executor, B, responds >> with a stream ID that helps it to identify the connection. Next, A >> requests blocks one at a time from B using an ChunkFetchRequest message >> (this happens here >> <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L101> >> in >> OneForOneBlockFetcher, which calls this code >> <https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java#L97> >> in TransportClient; currently, we have a one-to-one mapping from a chunk to >> a particular block). It’s possible that there are many sets of shuffle >> data being fetched concurrently between A and B (e.g., because many tasks >> are run concurrently). These requests are serialized, so one block is sent >> at a time from B, and they’re sent in the order that the requests were >> issued on A. >> >> [4] In BlockStoreShuffleFetcher, which handles failures; then in >> HashShuffleReader, which helps aggregate some of the data; etc. >> >> [5] This happens in BlockManager.putIterator, if the RDD is going to be >> cached; in the function passed in to ResultTask, if this is the last stage >> in a job; or via the writer.write() call in ShuffleMapTask, if this is a >> stage that generates intermediate shuffle data. >> >> [6] We time how long we spend blocking on data from the network; this is >> what’s shown as “fetch wait time” in Spark’s UI. >> >> —————————————————— >> >> To answer the original question about why CPU use is low, this means that >> the main thread (that pulls data off of the results queue) is blocking on >> I/O. It could be blocking to receive data over the network (which will be >> included in the fetch wait time shown in the UI, described above), or it >> could be blocking to read data from the local disk (because the shuffle >> data that is read locally is memory-mapped, so the thread may block on disk >> if the data hasn’t been read into memory yet). It could also be blocking >> because it’s writing new shuffle output to disk (also shown in the UI, as >> shuffle write time), or because it’s spilling intermediate data. >> Everything except the spilling is shown in the handy-dandy new >> visualization that Kousuke recently added >> <https://github.com/apache/spark/commit/a5f7b3b9c7f05598a1cc8e582e5facee1029cd5e>to >> the UI (available in 1.4 onwards): if you look at the stage view, and click >> on “Event Timeline”, you’ll see a visualization of the tasks and how long >> they spent blocked on various things (we should add the spilling to >> this…there is a long-outstanding JIRA for this). >> >> Hope this helps! >> >> -Kay >> >> On Thu, Jun 11, 2015 at 7:42 AM, Imran Rashid <iras...@cloudera.com> >> wrote: >> >>> That is not exactly correct -- that being said I'm not 100% on these >>> details either so I'd appreciate you double checking and / or another dev >>> confirming my description. >>> >>> >>> Spark actually has more threads going then the "numCores" you specify. >>> "numCores" is really used for how many threads are actively executing >>> tasks. There are more threads for doing the fetching (the details of which >>> I'm not that familiar with) -- that never cuts into the number of actively >>> executing tasks for "numCores". There isn't a 1-to-1 correspondence >>> between one shuffle block and one task -- a shuffle-read task most likely >>> needs to fetch many shuffle blocks, with some local and some remote (in >>> most cases). So, from your lingo above, c is numCores, but c_1 is just an >>> independent pool of threads. >>> >>> this obvious follow up question is, if you've actually more than >>> "numCores" threads going at the same time, how come cpu usage is low? You >>> could still have your cpus stuck waiting on i/o, from disk or network, so >>> they aren't getting fully utilized. And the cpu can also be idle waiting >>> for memory, if there are a lot of cache misses (I'm not sure how that will >>> show up in cpu monitoring). If that were the case, that could even be from >>> too many threads, as a lot of time is spent context switching ... but I'm >>> just guessing now. >>> >>> hope this helps, >>> Imran >>> >>> >>> >>> On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes <91m...@gmail.com> wrote: >>> >>>> Hi Imran, >>>> >>>> Thank you again for your email. >>>> >>>> I just want to ask one further question to clarify the implementation >>>> of the shuffle block fetches. When you say that rather than sitting >>>> idle, [the executor] will immediately start reading the local block, I >>>> would guess that, in implementation, the executor is going to launch >>>> concurrent threads to read both local and remote blocks, which it >>>> seems to do in the initialize() method of >>>> core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case >>>> or would the Executor run all local fetch threads first? >>>> >>>> The reason I ask is that if the slave machine on which the Executor is >>>> running has some number of cores, c, then I would have thought that >>>> some of the threads launched would occupy some number, c_1, of the >>>> cores and conduct the local reads (where c_1 <= c). The other threads >>>> would occupy the other (c - c_1) cores' cycles until *all* necessary >>>> blocks have been read, and depending on c and the number of blocks to >>>> fetch so that none of the cores are idle if there are many blocks to >>>> fetch. (I monitor the CPU utilization of our nodes throughout a job, >>>> and generally find them under-utilized statistically speaking; that >>>> is, their usage over the whole job is lower than expected, with short >>>> burst of high usage, so I ask this question in a specific way for this >>>> reason, since I can see trends in the probability density functions of >>>> CPU utilization as the #partitions of our RDDs are increased). >>>> >>>> ShuffleBlockFetcherIterator.scala: >>>> >>>> private[this] def initialize(): Unit = { >>>> ... >>>> // Send out initial requests for blocks, up to our maxBytesInFlight >>>> while (fetchRequests.nonEmpty && >>>> (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size >>>> <= maxBytesInFlight)) { >>>> sendRequest(fetchRequests.dequeue()) >>>> } >>>> val numFetches = remoteRequests.size - fetchRequests.size >>>> logInfo("Started " + numFetches + " remote fetches in" + >>>> Utils.getUsedTimeMs(startTime)) >>>> >>>> // Get Local Blocks >>>> fetchLocalBlocks() >>>> logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime)) >>>> } >>>> private[this] def fetchLocalBlocks() { >>>> val iter = localBlocks.iterator >>>> while (iter.hasNext) { >>>> val blockId = iter.next() >>>> try { >>>> val buf = blockManager.getBlockData(blockId) >>>> shuffleMetrics.incLocalBlocksFetched(1) >>>> shuffleMetrics.incLocalBytesRead(buf.size) >>>> buf.retain() >>>> results.put(new SuccessFetchResult(blockId, 0, buf)) >>>> } catch { >>>> ... >>>> } >>>> } >>>> } >>>> >>>> Obviously, I will have to sit down with core/.../network/nio/* and >>>> core/.../shuffle/* and do my own homework on this, but from what I can >>>> tell, the BlockDataManager relies on either >>>> NioBlockTransferService.scala or the NettyBlockTransferService.scala >>>> (which are set in SparkEnv.scala), both of which do the grunt work of >>>> actually buffering and transferring the blocks' bytes. Finally, the >>>> tasks in new stage for which the shuffle outputs have been fetched >>>> will not commence until all of the block fetching threads (both local >>>> and remote) have terminated. >>>> >>>> Does the above paint an accurate picture? I would really appreciate >>>> clarification on the concurrency, since I would like to determine why >>>> our jobs have under-utilization and poor weak scaling efficiency. >>>> >>>> I will cc this thread over to the dev list. I did not cc them in case >>>> my previous question was trivial---I didn't want to spam the list >>>> unnecessarily, since I do not see these kinds of questions posed there >>>> frequently. >>>> >>>> Thanks a bunch, >>>> Mike >>>> >>>> >>>> On 6/10/15, Imran Rashid <iras...@cloudera.com> wrote: >>>> > Hi Mike, >>>> > >>>> > no, this is a good question, I can see how my response could be >>>> interpreted >>>> > both ways. >>>> > >>>> > To be more precise: >>>> > *nothing* is fetched until the shuffle-read stage starts. So it is >>>> normal >>>> > to see a spike in cluster bandwidth when that stage starts. There is >>>> a >>>> > hard-boundary between stages -- that is, spark never starts any tasks >>>> in >>>> > one stage until *all* tasks in the dependent stages have been >>>> completed. >>>> > (There has been on-and-off discussion about relaxing this, but IMO >>>> this is >>>> > unlikely to change in the near future.) So spark will wait for all >>>> of the >>>> > tasks in the previous shuffle-write stage to finish, and then kick >>>> off a >>>> > bunch of shuffle-read tasks in the next stage, leading to the spike >>>> you >>>> > see. >>>> > >>>> > I was referring to the way blocks are fetched within one of those >>>> > shuffle-read tasks. One of those tasks is will probably going to >>>> need a >>>> > bunch of different blocks, from many executors. But some of the >>>> blocks it >>>> > needs will probably exist locally. So the task first sends out a >>>> request >>>> > to fetch blocks remotely (leading to the spike), but rather than >>>> sitting >>>> > idle, it will immediately start reading the local blocks. Ideally, >>>> by the >>>> > time its done reading the local blocks, some of the remote blocks have >>>> > already been fetched, so no time is spent *waiting* for the remote >>>> reads. >>>> > As the remote blocks get read, spark sends out more requests, trying >>>> to >>>> > balance how much data needs to be buffered vs. preventing any waiting >>>> on >>>> > remote reads (which can be controlled by >>>> spark.reducer.maxSizeInFlight). >>>> > >>>> > Hope that clarifies things! >>>> > >>>> > btw, you sent this last question to just me -- I think its a good >>>> question, >>>> > do you mind sending it to the list? I figured that was accidental but >>>> > wanted to check. >>>> > >>>> > Imran >>>> > >>>> > On Wed, Jun 10, 2015 at 12:20 AM, Mike Hynes <91m...@gmail.com> >>>> wrote: >>>> > >>>> >> Hi Imran, >>>> >> One additional quick question---I just want to confirm that I fully >>>> >> understand your comment that "blocks are fetched before they are >>>> >> needed." Typically on our system, we see spikes in cluster bandwidth >>>> >> (with ganglia) at stage boundaries, so I previously assumed that all >>>> >> shuffle read occurred there. Do you mean that the blocks are fetched >>>> >> by the shuffle read iterator, and hence when tasks occur afterwards >>>> >> the necessary blocks have already been fetched? >>>> >> Thanks---I am sorry if this is an obvious question, but I'd like to >>>> >> understand this as precisely as possible. >>>> >> Mike >>>> >> >>>> >> On 6/10/15, Mike Hynes <91m...@gmail.com> wrote: >>>> >> > Ahhh---forgive my typo: what I mean is, >>>> >> > (t2 - t1) >= (t_ser + t_deser + t_exec) >>>> >> > is satisfied, empirically. >>>> >> > >>>> >> > On 6/10/15, Mike Hynes <91m...@gmail.com> wrote: >>>> >> >> Hi Imran, >>>> >> >> >>>> >> >> Thank you for your email. >>>> >> >> >>>> >> >> In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I >>>> >> >> have found it to be true, although I have not included the >>>> >> >> t_{wait_for_read} in this, since it is---so far as I can >>>> tell---been >>>> >> >> either zero or negligible compared to the task time. >>>> >> >> >>>> >> >> Thanks, >>>> >> >> Mike >>>> >> >> >>>> >> >> On 6/8/15, Imran Rashid <iras...@cloudera.com> wrote: >>>> >> >>> Hi Mike, >>>> >> >>> >>>> >> >>> all good questions, let me take a stab at answering them: >>>> >> >>> >>>> >> >>> 1. Event Logs + Stages: >>>> >> >>> >>>> >> >>> Its normal for stages to get skipped if they are shuffle map >>>> stages, >>>> >> >>> which >>>> >> >>> get read multiple times. Eg., here's a little example program I >>>> >> >>> wrote >>>> >> >>> earlier to demonstrate this: "d3" doesn't need to be re-shuffled >>>> >> >>> since >>>> >> >>> each >>>> >> >>> time its read w/ the same partitioner. So skipping stages in >>>> this >>>> >> >>> way >>>> >> >>> is >>>> >> >>> a >>>> >> >>> good thing: >>>> >> >>> >>>> >> >>> val partitioner = new org.apache.spark.HashPartitioner(10) >>>> >> >>> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> >>>> >> >>> x}.partitionBy(partitioner) >>>> >> >>> (0 until 5).foreach { idx => >>>> >> >>> val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x >>>> % 10) >>>> >> -> >>>> >> >>> x}.partitionBy(partitioner) >>>> >> >>> println(idx + " ---> " + otherData.join(d3).count()) >>>> >> >>> } >>>> >> >>> >>>> >> >>> If you run this, f you look in the UI you'd see that all jobs >>>> except >>>> >> for >>>> >> >>> the first one have one stage that is skipped. You will also see >>>> this >>>> >> in >>>> >> >>> the log: >>>> >> >>> >>>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: >>>> >> >>> List(Stage >>>> >> >>> 12, >>>> >> >>> Stage 13) >>>> >> >>> >>>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage >>>> 13) >>>> >> >>> >>>> >> >>> Admittedly that is not very clear, but that is sort of >>>> indicating to >>>> >> you >>>> >> >>> that the DAGScheduler first created stage 12 as a necessary >>>> step, and >>>> >> >>> then >>>> >> >>> later on changed its mind by realizing that everything it needed >>>> for >>>> >> >>> stage >>>> >> >>> 12 already existed, so there was nothing to do. >>>> >> >>> >>>> >> >>> >>>> >> >>> 2. Extracting Event Log Information >>>> >> >>> >>>> >> >>> maybe you are interested in SparkListener ? Though >>>> unfortunately, I >>>> >> >>> don't >>>> >> >>> know of a good blog post describing it, hopefully the docs are >>>> clear >>>> >> ... >>>> >> >>> >>>> >> >>> 3. Time Metrics in Spark Event Log >>>> >> >>> >>>> >> >>> This is a great question. I *think* the only exception is that >>>> t_gc >>>> >> >>> is >>>> >> >>> really overlapped with t_exec. So I think you should really >>>> expect >>>> >> >>> >>>> >> >>> (t2 - t1) < (t_ser + t_deser + t_exec) >>>> >> >>> >>>> >> >>> I am not 100% sure about this, though. I'd be curious if that >>>> was >>>> >> >>> constraint was ever violated. >>>> >> >>> >>>> >> >>> >>>> >> >>> As for your question on shuffle read vs. shuffle write time -- I >>>> >> >>> wouldn't >>>> >> >>> necessarily expect the same stage to have times for both shuffle >>>> read >>>> >> >>> & >>>> >> >>> shuffle write -- in the simplest case, you'll have shuffle write >>>> >> >>> times >>>> >> >>> in >>>> >> >>> one, and shuffle read times in the next one. But even taking >>>> that >>>> >> >>> into >>>> >> >>> account, there is a difference in the way they work & are >>>> measured. >>>> >> >>> shuffle read operations are pipelined and the way we measure >>>> shuffle >>>> >> >>> read, >>>> >> >>> its just how much time is spent *waiting* for network transfer. >>>> It >>>> >> >>> could >>>> >> >>> be that there is no (measurable) wait time b/c the next blocks >>>> are >>>> >> >>> fetched >>>> >> >>> before they are needed. Shuffle writes occur in the normal task >>>> >> >>> execution >>>> >> >>> thread, though, so we (try to) measure all of it. >>>> >> >>> >>>> >> >>> >>>> >> >>> On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com> >>>> wrote: >>>> >> >>> >>>> >> >>>> Hi Patrick and Akhil, >>>> >> >>>> >>>> >> >>>> Thank you both for your responses. This is a bit of an extended >>>> >> >>>> email, >>>> >> >>>> but I'd like to: >>>> >> >>>> 1. Answer your (Patrick) note about the "missing" stages since >>>> the >>>> >> >>>> IDs >>>> >> >>>> do (briefly) appear in the event logs >>>> >> >>>> 2. Ask for advice/experience with extracting information from >>>> the >>>> >> >>>> event logs in a columnar, delimiter-separated format. >>>> >> >>>> 3. Ask about the time metrics reported in the event logs; >>>> currently, >>>> >> >>>> the elapsed time for a task does not equal the sum of the times >>>> for >>>> >> >>>> its components >>>> >> >>>> >>>> >> >>>> 1. Event Logs + Stages: >>>> >> >>>> ========================= >>>> >> >>>> >>>> >> >>>> As I said before, In the spark logs (the log4j configurable ones >>>> >> >>>> from >>>> >> >>>> the driver), I only see references to some stages, where the >>>> stage >>>> >> >>>> IDs >>>> >> >>>> are not arithmetically increasing. In the event logs, however, I >>>> >> >>>> will >>>> >> >>>> see reference to *every* stage, although not all stages will >>>> have >>>> >> >>>> tasks associated with them. >>>> >> >>>> >>>> >> >>>> For instance, to examine the actual stages that have tasks, you >>>> can >>>> >> >>>> see missing stages: >>>> >> >>>> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \ >>>> >> >>>> # | grep -Eo '"Stage ID":[[:digit:]]+' \ >>>> >> >>>> # | sort -n|uniq | head -n 5 >>>> >> >>>> "Stage ID":0 >>>> >> >>>> "Stage ID":1 >>>> >> >>>> "Stage ID":10 >>>> >> >>>> "Stage ID":11 >>>> >> >>>> "Stage ID":110 >>>> >> >>>> >>>> >> >>>> However, these "missing" stages *do* appear in the event logs as >>>> >> >>>> Stage >>>> >> >>>> IDs in the jobs submitted, i.e: for >>>> >> >>>> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo >>>> >> >>>> 'Stage >>>> >> >>>> IDs":\[.*\]' | head -n 5 >>>> >> >>>> Stage IDs":[0,1,2] >>>> >> >>>> Stage IDs":[5,3,4] >>>> >> >>>> Stage IDs":[6,7,8] >>>> >> >>>> Stage IDs":[9,10,11] >>>> >> >>>> Stage IDs":[12,13,14] >>>> >> >>>> >>>> >> >>>> I do not know if this amounts to a bug, since I am not familiar >>>> with >>>> >> >>>> the scheduler in detail. The stages have seemingly been created >>>> >> >>>> somewhere in the DAG, but then have no associated tasks and >>>> never >>>> >> >>>> appear again. >>>> >> >>>> >>>> >> >>>> 2. Extracting Event Log Information >>>> >> >>>> ==================================== >>>> >> >>>> Currently we are running scalability tests, and are finding very >>>> >> >>>> poor >>>> >> >>>> scalability for certain block matrix algorithms. I would like to >>>> >> >>>> have >>>> >> >>>> finer detail about the communication time and bandwidth when >>>> data is >>>> >> >>>> transferred between nodes. >>>> >> >>>> >>>> >> >>>> I would really just like to have a file with nothing but task >>>> info >>>> >> >>>> in >>>> >> >>>> a format such as: >>>> >> >>>> timestamp (ms), task ID, hostname, execution time (ms), GC time >>>> >> >>>> (ms), >>>> >> >>>> ... >>>> >> >>>> 0010294, 1, slave-1, 503, 34, ... >>>> >> >>>> 0010392, 2, slave-2, 543, 32, ... >>>> >> >>>> and similarly for jobs/stages/rdd_memory/shuffle output/etc. >>>> >> >>>> >>>> >> >>>> I have extracted the relevant time fields from the spark event >>>> logs >>>> >> >>>> with a sed script, but I wonder if there is an even more >>>> expedient >>>> >> >>>> way. Unfortunately, I do not immediately see how to do this >>>> using >>>> >> >>>> the >>>> >> >>>> $SPARK_HOME/conf/metrics.properties file and haven't come >>>> across a >>>> >> >>>> blog/etc that describes this. Could anyone please comment on >>>> whether >>>> >> >>>> or not a metrics configuation for this already exists? >>>> >> >>>> >>>> >> >>>> 3. Time Metrics in Spark Event Log >>>> >> >>>> ================================== >>>> >> >>>> I am confused about the times reported for tasks in the event >>>> log. >>>> >> >>>> There are launch and finish timestamps given for each task (call >>>> >> >>>> them >>>> >> >>>> t1 and t2, respectively), as well as GC time (t_gc), execution >>>> time >>>> >> >>>> (t_exec), and serialization times (t_ser, t_deser). However the >>>> >> >>>> times >>>> >> >>>> do not add up as I would have expected. I would imagine that the >>>> >> >>>> elapsed time t2 - t1 would be slightly larger than the sum of >>>> the >>>> >> >>>> component times. However, I can find many instances in the event >>>> >> >>>> logs >>>> >> >>>> where: >>>> >> >>>> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec) >>>> >> >>>> The difference can be 500 ms or more, which is not negligible >>>> for my >>>> >> >>>> current execution times of ~5000 ms. I have attached a plot that >>>> >> >>>> illustrates this. >>>> >> >>>> >>>> >> >>>> Regarding this, I'd like to ask: >>>> >> >>>> 1. How exactly are these times are being measured? >>>> >> >>>> 2. Should the sum of the component times equal the elapsed >>>> (clock) >>>> >> >>>> time for the task? >>>> >> >>>> 3. If not, which component(s) is(are) being excluded, and when >>>> do >>>> >> >>>> they >>>> >> >>>> occur? >>>> >> >>>> 4. There are occasionally reported measurements for Shuffle >>>> Write >>>> >> >>>> time, but not shuffle read time. Is there a method to determine >>>> the >>>> >> >>>> time required to shuffle data? Could this be done by look at >>>> delays >>>> >> >>>> between the first task in a new stage and the last task in the >>>> >> >>>> previous stage? >>>> >> >>>> >>>> >> >>>> Thank you very much for your time, >>>> >> >>>> Mike >>>> >> >>>> >>>> >> >>>> >>>> >> >>>> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote: >>>> >> >>>> > Hey Mike, >>>> >> >>>> > >>>> >> >>>> > Stage ID's are not guaranteed to be sequential because of the >>>> way >>>> >> the >>>> >> >>>> > DAG scheduler works (only increasing). In some cases stage ID >>>> >> numbers >>>> >> >>>> > are skipped when stages are generated. >>>> >> >>>> > >>>> >> >>>> > Any stage/ID that appears in the Spark UI is an actual stage, >>>> so >>>> >> >>>> > if >>>> >> >>>> > you see ID's in there, but they are not in the logs, then let >>>> us >>>> >> know >>>> >> >>>> > (that would be a bug). >>>> >> >>>> > >>>> >> >>>> > - Patrick >>>> >> >>>> > >>>> >> >>>> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das >>>> >> >>>> > <ak...@sigmoidanalytics.com> >>>> >> >>>> > wrote: >>>> >> >>>> >> Are you seeing the same behavior on the driver UI? (that >>>> running >>>> >> >>>> >> on >>>> >> >>>> >> port >>>> >> >>>> >> 4040), If you click on the stage id header you can sort the >>>> >> >>>> >> stages >>>> >> >>>> >> based >>>> >> >>>> >> on >>>> >> >>>> >> IDs. >>>> >> >>>> >> >>>> >> >>>> >> Thanks >>>> >> >>>> >> Best Regards >>>> >> >>>> >> >>>> >> >>>> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes < >>>> 91m...@gmail.com> >>>> >> >>>> >> wrote: >>>> >> >>>> >>> >>>> >> >>>> >>> Hi folks, >>>> >> >>>> >>> >>>> >> >>>> >>> When I look at the output logs for an iterative Spark >>>> program, I >>>> >> >>>> >>> see >>>> >> >>>> >>> that the stage IDs are not arithmetically numbered---that >>>> is, >>>> >> there >>>> >> >>>> >>> are gaps between stages and I might find log information >>>> about >>>> >> >>>> >>> Stage >>>> >> >>>> >>> 0, 1,2, 5, but not 3 or 4. >>>> >> >>>> >>> >>>> >> >>>> >>> As an example, the output from the Spark logs below shows >>>> what I >>>> >> >>>> >>> mean: >>>> >> >>>> >>> >>>> >> >>>> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr | grep >>>> finished >>>> >> >>>> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at >>>> >> >>>> >>> blockMap.scala:1444) >>>> >> >>>> >>> finished in 7.820 s: >>>> >> >>>> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) >>>> >> >>>> >>> finished >>>> >> >>>> >>> in 3.874 s: >>>> >> >>>> >>> 18291:INFO:DAGScheduler:Stage 2 (count at >>>> blockMap.scala:1179) >>>> >> >>>> >>> finished in 2.237 s: >>>> >> >>>> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) >>>> >> >>>> >>> finished >>>> >> >>>> >>> in 1.749 s: >>>> >> >>>> >>> 21254:INFO:DAGScheduler:Stage 5 (count at >>>> blockMap.scala:1180) >>>> >> >>>> >>> finished in 1.082 s: >>>> >> >>>> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) >>>> >> >>>> >>> finished >>>> >> >>>> >>> in 2.078 s: >>>> >> >>>> >>> 24773:INFO:DAGScheduler:Stage 8 (count at >>>> blockMap.scala:1188) >>>> >> >>>> >>> finished in 1.317 s: >>>> >> >>>> >>> 26455:INFO:DAGScheduler:Stage 10 (map at >>>> blockMap.scala:1817) >>>> >> >>>> >>> finished >>>> >> >>>> >>> in 1.638 s: >>>> >> >>>> >>> 27228:INFO:DAGScheduler:Stage 11 (count at >>>> blockMap.scala:1189) >>>> >> >>>> >>> finished in 0.732 s: >>>> >> >>>> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at >>>> >> >>>> >>> blockMap.scala:1302) >>>> >> >>>> >>> finished in 0.192 s: >>>> >> >>>> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at >>>> >> >>>> >>> blockMap.scala:1302) >>>> >> >>>> >>> finished in 0.170 s: >>>> >> >>>> >>> 28018:INFO:DAGScheduler:Stage 20 (count at >>>> blockMap.scala:1201) >>>> >> >>>> >>> finished in 0.270 s: >>>> >> >>>> >>> 28611:INFO:DAGScheduler:Stage 23 (map at >>>> blockMap.scala:1355) >>>> >> >>>> >>> finished >>>> >> >>>> >>> in 0.455 s: >>>> >> >>>> >>> 29598:INFO:DAGScheduler:Stage 24 (count at >>>> blockMap.scala:274) >>>> >> >>>> >>> finished in 0.928 s: >>>> >> >>>> >>> 29954:INFO:DAGScheduler:Stage 27 (map at >>>> blockMap.scala:1355) >>>> >> >>>> >>> finished >>>> >> >>>> >>> in 0.305 s: >>>> >> >>>> >>> 30390:INFO:DAGScheduler:Stage 28 (count at >>>> blockMap.scala:275) >>>> >> >>>> >>> finished in 0.391 s: >>>> >> >>>> >>> 30452:INFO:DAGScheduler:Stage 32 (first at >>>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s: >>>> >> >>>> >>> 30506:INFO:DAGScheduler:Stage 36 (first at >>>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s: >>>> >> >>>> >>> >>>> >> >>>> >>> Can anyone comment on this being normal behavior? Is it >>>> >> >>>> >>> indicative >>>> >> >>>> >>> of >>>> >> >>>> >>> faults causing stages to be resubmitted? I also cannot find >>>> the >>>> >> >>>> >>> missing stages in any stage's parent List(Stage x, Stage y, >>>> ...) >>>> >> >>>> >>> >>>> >> >>>> >>> Thanks, >>>> >> >>>> >>> Mike >>>> >> >>>> >>> >>>> >> >>>> >>> >>>> >> >>>> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote: >>>> >> >>>> >>> > Thanks, René. I actually added a warning to the new JDBC >>>> >> >>>> reader/writer >>>> >> >>>> >>> > interface for 1.4.0. >>>> >> >>>> >>> > >>>> >> >>>> >>> > Even with that, I think we should support throttling JDBC; >>>> >> >>>> >>> > otherwise >>>> >> >>>> >>> > it's >>>> >> >>>> >>> > too convenient for our users to DOS their production >>>> database >>>> >> >>>> servers! >>>> >> >>>> >>> > >>>> >> >>>> >>> > >>>> >> >>>> >>> > /** >>>> >> >>>> >>> > * Construct a [[DataFrame]] representing the database >>>> table >>>> >> >>>> >>> > accessible >>>> >> >>>> >>> > via JDBC URL >>>> >> >>>> >>> > * url named table. Partitions of the table will be >>>> >> >>>> >>> > retrieved >>>> >> >>>> >>> > in >>>> >> >>>> >>> > parallel >>>> >> >>>> >>> > based on the parameters >>>> >> >>>> >>> > * passed to this function. >>>> >> >>>> >>> > * >>>> >> >>>> >>> > * * Don't create too many partitions in parallel on a >>>> large >>>> >> >>>> cluster; >>>> >> >>>> >>> > otherwise Spark might crash* >>>> >> >>>> >>> > * * your external database systems.* >>>> >> >>>> >>> > * >>>> >> >>>> >>> > * @param url JDBC database url of the form >>>> >> >>>> >>> > `jdbc:subprotocol:subname` >>>> >> >>>> >>> > * @param table Name of the table in the external >>>> database. >>>> >> >>>> >>> > * @param columnName the name of a column of integral >>>> type >>>> >> that >>>> >> >>>> will >>>> >> >>>> >>> > be >>>> >> >>>> >>> > used for partitioning. >>>> >> >>>> >>> > * @param lowerBound the minimum value of `columnName` >>>> used >>>> >> >>>> >>> > to >>>> >> >>>> >>> > decide >>>> >> >>>> >>> > partition stride >>>> >> >>>> >>> > * @param upperBound the maximum value of `columnName` >>>> used >>>> >> >>>> >>> > to >>>> >> >>>> >>> > decide >>>> >> >>>> >>> > partition stride >>>> >> >>>> >>> > * @param numPartitions the number of partitions. the >>>> range >>>> >> >>>> >>> > `minValue`-`maxValue` will be split >>>> >> >>>> >>> > * evenly into this many partitions >>>> >> >>>> >>> > * @param connectionProperties JDBC database connection >>>> >> >>>> >>> > arguments, >>>> >> >>>> a >>>> >> >>>> >>> > list >>>> >> >>>> >>> > of arbitrary string >>>> >> >>>> >>> > * tag/value. Normally at >>>> least >>>> >> >>>> >>> > a >>>> >> >>>> "user" >>>> >> >>>> >>> > and >>>> >> >>>> >>> > "password" property >>>> >> >>>> >>> > * should be included. >>>> >> >>>> >>> > * >>>> >> >>>> >>> > * @since 1.4.0 >>>> >> >>>> >>> > */ >>>> >> >>>> >>> > >>>> >> >>>> >>> > >>>> >> >>>> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer < >>>> >> rtref...@gmail.com> >>>> >> >>>> >>> > wrote: >>>> >> >>>> >>> > >>>> >> >>>> >>> >> Hi, >>>> >> >>>> >>> >> >>>> >> >>>> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ => >>>> >> >>>> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where >>>> >> >>>> >>> >> "where" >>>> >> >>>> >>> >> is >>>> >> >>>> an >>>> >> >>>> >>> >> Array[String] of 32 to 48 elements). (The code is >>>> tailored >>>> >> >>>> >>> >> to >>>> >> >>>> >>> >> your >>>> >> >>>> >>> >> db, >>>> >> >>>> >>> >> specifically through the where conditions, I'd have >>>> otherwise >>>> >> >>>> >>> >> post >>>> >> >>>> >>> >> it) >>>> >> >>>> >>> >> That should be the DataFrame API, but I'm just trying to >>>> load >>>> >> >>>> >>> >> everything >>>> >> >>>> >>> >> and discard it as soon as possible :-) >>>> >> >>>> >>> >> >>>> >> >>>> >>> >> (1) Never do a silent drop of the values by default: it >>>> kills >>>> >> >>>> >>> >> confidence. >>>> >> >>>> >>> >> An option sounds reasonable. Some sort of insight / log >>>> >> >>>> >>> >> would >>>> >> >>>> >>> >> be >>>> >> >>>> >>> >> great. >>>> >> >>>> >>> >> (How many columns of what type were truncated? why?) >>>> >> >>>> >>> >> Note that I could declare the field as string via >>>> >> >>>> >>> >> JdbcDialects >>>> >> >>>> (thank >>>> >> >>>> >>> >> you >>>> >> >>>> >>> >> guys for merging that :-) ). >>>> >> >>>> >>> >> I have quite bad experiences with silent drops / >>>> truncates of >>>> >> >>>> columns >>>> >> >>>> >>> >> and >>>> >> >>>> >>> >> thus _like_ the strict way of spark. It causes trouble >>>> but >>>> >> >>>> >>> >> noticing >>>> >> >>>> >>> >> later >>>> >> >>>> >>> >> that your data was corrupted during conversion is even >>>> worse. >>>> >> >>>> >>> >> >>>> >> >>>> >>> >> (2) SPARK-8004 >>>> >> https://issues.apache.org/jira/browse/SPARK-8004 >>>> >> >>>> >>> >> >>>> >> >>>> >>> >> (3) One option would be to make it safe to use, the other >>>> >> option >>>> >> >>>> >>> >> would >>>> >> >>>> >>> >> be >>>> >> >>>> >>> >> to document the behavior (s.th. like "WARNING: this >>>> method >>>> >> tries >>>> >> >>>> >>> >> to >>>> >> >>>> >>> >> load >>>> >> >>>> >>> >> as many partitions as possible, make sure your database >>>> can >>>> >> >>>> >>> >> handle >>>> >> >>>> >>> >> the >>>> >> >>>> >>> >> load >>>> >> >>>> >>> >> or load them in chunks and use union"). SPARK-8008 >>>> >> >>>> >>> >> https://issues.apache.org/jira/browse/SPARK-8008 >>>> >> >>>> >>> >> >>>> >> >>>> >>> >> Regards, >>>> >> >>>> >>> >> Rene Treffer >>>> >> >>>> >>> >> >>>> >> >>>> >>> > >>>> >> >>>> >>> >>>> >> >>>> >>> >>>> >> >>>> >>> -- >>>> >> >>>> >>> Thanks, >>>> >> >>>> >>> Mike >>>> >> >>>> >>> >>>> >> >>>> >>> >>>> >> --------------------------------------------------------------------- >>>> >> >>>> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>>> >> >>>> >>> For additional commands, e-mail: dev-h...@spark.apache.org >>>> >> >>>> >>> >>>> >> >>>> >> >>>> >> >>>> > >>>> >> >>>> >>>> >> >>>> >>>> >> >>>> -- >>>> >> >>>> Thanks, >>>> >> >>>> Mike >>>> >> >>>> >>>> >> >>>> >>>> >> >>>> >>>> --------------------------------------------------------------------- >>>> >> >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>>> >> >>>> For additional commands, e-mail: dev-h...@spark.apache.org >>>> >> >>>> >>>> >> >>> >>>> >> >> >>>> >> >> >>>> >> >> -- >>>> >> >> Thanks, >>>> >> >> Mike >>>> >> >> >>>> >> > >>>> >> > >>>> >> > -- >>>> >> > Thanks, >>>> >> > Mike >>>> >> > >>>> >> >>>> >> >>>> >> -- >>>> >> Thanks, >>>> >> Mike >>>> >> >>>> > >>>> >>>> >>>> -- >>>> Thanks, >>>> Mike >>>> >>> >>> >> >