Kay, Excellent write-up. This should be preserved for reference somewhere searchable.
-Gerard. On Fri, Jun 12, 2015 at 1:19 AM, Kay Ousterhout <k...@eecs.berkeley.edu> wrote: > Here’s how the shuffle works. This explains what happens for a single > task; this will happen in parallel for each task running on the machine, > and as Imran said, Spark runs up to “numCores” tasks concurrently on each > machine. There's also an answer to the original question about why CPU use > is low at the very bottom. > > The key data structure used in fetching shuffle data is the “results” > queue in ShuffleBlockFetcherIterator, which buffers data that we have in > serialized (and maybe compressed) form, but haven’t yet deserialized / > processed. The results queue is filled by many threads fetching data over > the network (the number of concurrent threads fetching data is equal to the > number of remote executors we’re currently fetching data from) [0], and is > consumed by a single thread that deserializes the data and computes some > function over it (e.g., if you’re doing rdd.count(), the thread > deserializes the data and counts the number of items). As we fetch data > over the network, we track bytesInFlight, which is data that has been > requested (and possibly received) from a remote executor, but that hasn’t > yet been deserialized / processed by the consumer thread. So, this > includes all of the data in the “results” queue, and possibly more data > that’s currently outstanding over the network. We always issue as many > requests as we can, with the constraint that bytesInFlight remains less > than a specified maximum [1]. > > In a little more detail, here’s exactly what happens when a task begins > reading shuffled data: > > (1) Issue requests [1.5] to fetch up to maxBytesInFlight bytes of data [1] > over the network (this happens here > <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L260> > ). > > These requests are all executed asynchronously using a ShuffleClient [2] > via the shuffleClient.fetchBlocks call > <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L149> > [3]. We pass in a callback that, once a block has been successfully > fetched, sticks it on the “results” queue. > > (2) Begin processing the local data. One by one, we request the local > data from the local block manager (which memory maps the file) and then stick > the result onto the results queue > <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L230>. > Because we memory map the files, which is speedy, the local data typically > all ends up on the results in front of the remote data. > > (3) One the async network requests have been issued (note — issued, but > not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) > and (2) have happened), ShuffleBlockFetcherIterator returns an iterator > that gets wrapped too many times to count [4] and eventually gets unrolled > [5]. Each time next() is called on the iterator, it blocks waiting for an > item from the results queue. This may return right away, or if the queue > is empty, will block waiting on new data from the network [6]. Before > returning from next(), we update our accounting for the bytes in flight: > the chunk of data we return is no longer considered in-flight, because it’s > about to be processed, so we update the current bytesInFlight, and if it > won’t result in > maxBytesInFlight outstanding, send some more requests for > data. > > ———————————————— > > Notes: > > [0] Note that these threads consume almost no CPU resources, because they > just receive data from the OS and then execute a callback that sticks the > data on the results queue. > > [1] We limit the data outstanding on the network to avoid using too much > memory to hold the data we’ve fetched over the network but haven’t yet > processed. > > [1.5] Each request may include multiple shuffle blocks, where is a "block" > is the data output for this reduce task by a particular map task. All of > the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks > shuffle blocks; each reduce task reads # map tasks blocks. We do some > hacks > <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L177> > to try to size these requests in a "good" way: we limit each request to > about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines > concurrently without exceeding maxBytesInFlight. 5 is completely a magic > number here that was probably guessed by someone long long ago, and it > seems to work ok. > > [2] The default configuration uses NettyBlockTransferService as the > ShuffleClient implementation (note that this extends BlockTransferService, > which extends ShuffleClient). > > [3] If you’re curious how the shuffle client fetches data, the default > Spark configuration results in exactly one TCP connection from an executor > to each other executor. If executor A is getting shuffle data from > executor B, we start by sending an OpenBlocks message from A to B. The > OpenBlocks message includes the list of blocks that A wants to fetch, and > causes the remote executor, B, to start to pull the corresponding data into > memory from disk (we typically memory map the files, so this may not > actually result in the data being read yet), and also to store some state > associated with this “stream” of data. The remote executor, B, responds > with a stream ID that helps it to identify the connection. Next, A > requests blocks one at a time from B using an ChunkFetchRequest message > (this happens here > <https://github.com/apache/spark/blob/95690a17d328f205c3398b9b477b4072b6fe908f/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java#L101> > in > OneForOneBlockFetcher, which calls this code > <https://github.com/apache/spark/blob/master/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java#L97> > in TransportClient; currently, we have a one-to-one mapping from a chunk to > a particular block). It’s possible that there are many sets of shuffle > data being fetched concurrently between A and B (e.g., because many tasks > are run concurrently). These requests are serialized, so one block is sent > at a time from B, and they’re sent in the order that the requests were > issued on A. > > [4] In BlockStoreShuffleFetcher, which handles failures; then in > HashShuffleReader, which helps aggregate some of the data; etc. > > [5] This happens in BlockManager.putIterator, if the RDD is going to be > cached; in the function passed in to ResultTask, if this is the last stage > in a job; or via the writer.write() call in ShuffleMapTask, if this is a > stage that generates intermediate shuffle data. > > [6] We time how long we spend blocking on data from the network; this is > what’s shown as “fetch wait time” in Spark’s UI. > > —————————————————— > > To answer the original question about why CPU use is low, this means that > the main thread (that pulls data off of the results queue) is blocking on > I/O. It could be blocking to receive data over the network (which will be > included in the fetch wait time shown in the UI, described above), or it > could be blocking to read data from the local disk (because the shuffle > data that is read locally is memory-mapped, so the thread may block on disk > if the data hasn’t been read into memory yet). It could also be blocking > because it’s writing new shuffle output to disk (also shown in the UI, as > shuffle write time), or because it’s spilling intermediate data. > Everything except the spilling is shown in the handy-dandy new > visualization that Kousuke recently added > <https://github.com/apache/spark/commit/a5f7b3b9c7f05598a1cc8e582e5facee1029cd5e>to > the UI (available in 1.4 onwards): if you look at the stage view, and click > on “Event Timeline”, you’ll see a visualization of the tasks and how long > they spent blocked on various things (we should add the spilling to > this…there is a long-outstanding JIRA for this). > > Hope this helps! > > -Kay > > On Thu, Jun 11, 2015 at 7:42 AM, Imran Rashid <iras...@cloudera.com> > wrote: > >> That is not exactly correct -- that being said I'm not 100% on these >> details either so I'd appreciate you double checking and / or another dev >> confirming my description. >> >> >> Spark actually has more threads going then the "numCores" you specify. >> "numCores" is really used for how many threads are actively executing >> tasks. There are more threads for doing the fetching (the details of which >> I'm not that familiar with) -- that never cuts into the number of actively >> executing tasks for "numCores". There isn't a 1-to-1 correspondence >> between one shuffle block and one task -- a shuffle-read task most likely >> needs to fetch many shuffle blocks, with some local and some remote (in >> most cases). So, from your lingo above, c is numCores, but c_1 is just an >> independent pool of threads. >> >> this obvious follow up question is, if you've actually more than >> "numCores" threads going at the same time, how come cpu usage is low? You >> could still have your cpus stuck waiting on i/o, from disk or network, so >> they aren't getting fully utilized. And the cpu can also be idle waiting >> for memory, if there are a lot of cache misses (I'm not sure how that will >> show up in cpu monitoring). If that were the case, that could even be from >> too many threads, as a lot of time is spent context switching ... but I'm >> just guessing now. >> >> hope this helps, >> Imran >> >> >> >> On Wed, Jun 10, 2015 at 1:41 PM, Mike Hynes <91m...@gmail.com> wrote: >> >>> Hi Imran, >>> >>> Thank you again for your email. >>> >>> I just want to ask one further question to clarify the implementation >>> of the shuffle block fetches. When you say that rather than sitting >>> idle, [the executor] will immediately start reading the local block, I >>> would guess that, in implementation, the executor is going to launch >>> concurrent threads to read both local and remote blocks, which it >>> seems to do in the initialize() method of >>> core/.../storage/ShuffleBlockFetcherIterator.scala. Is that the case >>> or would the Executor run all local fetch threads first? >>> >>> The reason I ask is that if the slave machine on which the Executor is >>> running has some number of cores, c, then I would have thought that >>> some of the threads launched would occupy some number, c_1, of the >>> cores and conduct the local reads (where c_1 <= c). The other threads >>> would occupy the other (c - c_1) cores' cycles until *all* necessary >>> blocks have been read, and depending on c and the number of blocks to >>> fetch so that none of the cores are idle if there are many blocks to >>> fetch. (I monitor the CPU utilization of our nodes throughout a job, >>> and generally find them under-utilized statistically speaking; that >>> is, their usage over the whole job is lower than expected, with short >>> burst of high usage, so I ask this question in a specific way for this >>> reason, since I can see trends in the probability density functions of >>> CPU utilization as the #partitions of our RDDs are increased). >>> >>> ShuffleBlockFetcherIterator.scala: >>> >>> private[this] def initialize(): Unit = { >>> ... >>> // Send out initial requests for blocks, up to our maxBytesInFlight >>> while (fetchRequests.nonEmpty && >>> (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size >>> <= maxBytesInFlight)) { >>> sendRequest(fetchRequests.dequeue()) >>> } >>> val numFetches = remoteRequests.size - fetchRequests.size >>> logInfo("Started " + numFetches + " remote fetches in" + >>> Utils.getUsedTimeMs(startTime)) >>> >>> // Get Local Blocks >>> fetchLocalBlocks() >>> logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime)) >>> } >>> private[this] def fetchLocalBlocks() { >>> val iter = localBlocks.iterator >>> while (iter.hasNext) { >>> val blockId = iter.next() >>> try { >>> val buf = blockManager.getBlockData(blockId) >>> shuffleMetrics.incLocalBlocksFetched(1) >>> shuffleMetrics.incLocalBytesRead(buf.size) >>> buf.retain() >>> results.put(new SuccessFetchResult(blockId, 0, buf)) >>> } catch { >>> ... >>> } >>> } >>> } >>> >>> Obviously, I will have to sit down with core/.../network/nio/* and >>> core/.../shuffle/* and do my own homework on this, but from what I can >>> tell, the BlockDataManager relies on either >>> NioBlockTransferService.scala or the NettyBlockTransferService.scala >>> (which are set in SparkEnv.scala), both of which do the grunt work of >>> actually buffering and transferring the blocks' bytes. Finally, the >>> tasks in new stage for which the shuffle outputs have been fetched >>> will not commence until all of the block fetching threads (both local >>> and remote) have terminated. >>> >>> Does the above paint an accurate picture? I would really appreciate >>> clarification on the concurrency, since I would like to determine why >>> our jobs have under-utilization and poor weak scaling efficiency. >>> >>> I will cc this thread over to the dev list. I did not cc them in case >>> my previous question was trivial---I didn't want to spam the list >>> unnecessarily, since I do not see these kinds of questions posed there >>> frequently. >>> >>> Thanks a bunch, >>> Mike >>> >>> >>> On 6/10/15, Imran Rashid <iras...@cloudera.com> wrote: >>> > Hi Mike, >>> > >>> > no, this is a good question, I can see how my response could be >>> interpreted >>> > both ways. >>> > >>> > To be more precise: >>> > *nothing* is fetched until the shuffle-read stage starts. So it is >>> normal >>> > to see a spike in cluster bandwidth when that stage starts. There is a >>> > hard-boundary between stages -- that is, spark never starts any tasks >>> in >>> > one stage until *all* tasks in the dependent stages have been >>> completed. >>> > (There has been on-and-off discussion about relaxing this, but IMO >>> this is >>> > unlikely to change in the near future.) So spark will wait for all of >>> the >>> > tasks in the previous shuffle-write stage to finish, and then kick off >>> a >>> > bunch of shuffle-read tasks in the next stage, leading to the spike you >>> > see. >>> > >>> > I was referring to the way blocks are fetched within one of those >>> > shuffle-read tasks. One of those tasks is will probably going to need >>> a >>> > bunch of different blocks, from many executors. But some of the >>> blocks it >>> > needs will probably exist locally. So the task first sends out a >>> request >>> > to fetch blocks remotely (leading to the spike), but rather than >>> sitting >>> > idle, it will immediately start reading the local blocks. Ideally, by >>> the >>> > time its done reading the local blocks, some of the remote blocks have >>> > already been fetched, so no time is spent *waiting* for the remote >>> reads. >>> > As the remote blocks get read, spark sends out more requests, trying to >>> > balance how much data needs to be buffered vs. preventing any waiting >>> on >>> > remote reads (which can be controlled by >>> spark.reducer.maxSizeInFlight). >>> > >>> > Hope that clarifies things! >>> > >>> > btw, you sent this last question to just me -- I think its a good >>> question, >>> > do you mind sending it to the list? I figured that was accidental but >>> > wanted to check. >>> > >>> > Imran >>> > >>> > On Wed, Jun 10, 2015 at 12:20 AM, Mike Hynes <91m...@gmail.com> wrote: >>> > >>> >> Hi Imran, >>> >> One additional quick question---I just want to confirm that I fully >>> >> understand your comment that "blocks are fetched before they are >>> >> needed." Typically on our system, we see spikes in cluster bandwidth >>> >> (with ganglia) at stage boundaries, so I previously assumed that all >>> >> shuffle read occurred there. Do you mean that the blocks are fetched >>> >> by the shuffle read iterator, and hence when tasks occur afterwards >>> >> the necessary blocks have already been fetched? >>> >> Thanks---I am sorry if this is an obvious question, but I'd like to >>> >> understand this as precisely as possible. >>> >> Mike >>> >> >>> >> On 6/10/15, Mike Hynes <91m...@gmail.com> wrote: >>> >> > Ahhh---forgive my typo: what I mean is, >>> >> > (t2 - t1) >= (t_ser + t_deser + t_exec) >>> >> > is satisfied, empirically. >>> >> > >>> >> > On 6/10/15, Mike Hynes <91m...@gmail.com> wrote: >>> >> >> Hi Imran, >>> >> >> >>> >> >> Thank you for your email. >>> >> >> >>> >> >> In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I >>> >> >> have found it to be true, although I have not included the >>> >> >> t_{wait_for_read} in this, since it is---so far as I can >>> tell---been >>> >> >> either zero or negligible compared to the task time. >>> >> >> >>> >> >> Thanks, >>> >> >> Mike >>> >> >> >>> >> >> On 6/8/15, Imran Rashid <iras...@cloudera.com> wrote: >>> >> >>> Hi Mike, >>> >> >>> >>> >> >>> all good questions, let me take a stab at answering them: >>> >> >>> >>> >> >>> 1. Event Logs + Stages: >>> >> >>> >>> >> >>> Its normal for stages to get skipped if they are shuffle map >>> stages, >>> >> >>> which >>> >> >>> get read multiple times. Eg., here's a little example program I >>> >> >>> wrote >>> >> >>> earlier to demonstrate this: "d3" doesn't need to be re-shuffled >>> >> >>> since >>> >> >>> each >>> >> >>> time its read w/ the same partitioner. So skipping stages in this >>> >> >>> way >>> >> >>> is >>> >> >>> a >>> >> >>> good thing: >>> >> >>> >>> >> >>> val partitioner = new org.apache.spark.HashPartitioner(10) >>> >> >>> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> >>> >> >>> x}.partitionBy(partitioner) >>> >> >>> (0 until 5).foreach { idx => >>> >> >>> val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % >>> 10) >>> >> -> >>> >> >>> x}.partitionBy(partitioner) >>> >> >>> println(idx + " ---> " + otherData.join(d3).count()) >>> >> >>> } >>> >> >>> >>> >> >>> If you run this, f you look in the UI you'd see that all jobs >>> except >>> >> for >>> >> >>> the first one have one stage that is skipped. You will also see >>> this >>> >> in >>> >> >>> the log: >>> >> >>> >>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: >>> >> >>> List(Stage >>> >> >>> 12, >>> >> >>> Stage 13) >>> >> >>> >>> >> >>> 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage >>> 13) >>> >> >>> >>> >> >>> Admittedly that is not very clear, but that is sort of indicating >>> to >>> >> you >>> >> >>> that the DAGScheduler first created stage 12 as a necessary step, >>> and >>> >> >>> then >>> >> >>> later on changed its mind by realizing that everything it needed >>> for >>> >> >>> stage >>> >> >>> 12 already existed, so there was nothing to do. >>> >> >>> >>> >> >>> >>> >> >>> 2. Extracting Event Log Information >>> >> >>> >>> >> >>> maybe you are interested in SparkListener ? Though unfortunately, >>> I >>> >> >>> don't >>> >> >>> know of a good blog post describing it, hopefully the docs are >>> clear >>> >> ... >>> >> >>> >>> >> >>> 3. Time Metrics in Spark Event Log >>> >> >>> >>> >> >>> This is a great question. I *think* the only exception is that >>> t_gc >>> >> >>> is >>> >> >>> really overlapped with t_exec. So I think you should really >>> expect >>> >> >>> >>> >> >>> (t2 - t1) < (t_ser + t_deser + t_exec) >>> >> >>> >>> >> >>> I am not 100% sure about this, though. I'd be curious if that was >>> >> >>> constraint was ever violated. >>> >> >>> >>> >> >>> >>> >> >>> As for your question on shuffle read vs. shuffle write time -- I >>> >> >>> wouldn't >>> >> >>> necessarily expect the same stage to have times for both shuffle >>> read >>> >> >>> & >>> >> >>> shuffle write -- in the simplest case, you'll have shuffle write >>> >> >>> times >>> >> >>> in >>> >> >>> one, and shuffle read times in the next one. But even taking that >>> >> >>> into >>> >> >>> account, there is a difference in the way they work & are >>> measured. >>> >> >>> shuffle read operations are pipelined and the way we measure >>> shuffle >>> >> >>> read, >>> >> >>> its just how much time is spent *waiting* for network transfer. >>> It >>> >> >>> could >>> >> >>> be that there is no (measurable) wait time b/c the next blocks are >>> >> >>> fetched >>> >> >>> before they are needed. Shuffle writes occur in the normal task >>> >> >>> execution >>> >> >>> thread, though, so we (try to) measure all of it. >>> >> >>> >>> >> >>> >>> >> >>> On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com> >>> wrote: >>> >> >>> >>> >> >>>> Hi Patrick and Akhil, >>> >> >>>> >>> >> >>>> Thank you both for your responses. This is a bit of an extended >>> >> >>>> email, >>> >> >>>> but I'd like to: >>> >> >>>> 1. Answer your (Patrick) note about the "missing" stages since >>> the >>> >> >>>> IDs >>> >> >>>> do (briefly) appear in the event logs >>> >> >>>> 2. Ask for advice/experience with extracting information from the >>> >> >>>> event logs in a columnar, delimiter-separated format. >>> >> >>>> 3. Ask about the time metrics reported in the event logs; >>> currently, >>> >> >>>> the elapsed time for a task does not equal the sum of the times >>> for >>> >> >>>> its components >>> >> >>>> >>> >> >>>> 1. Event Logs + Stages: >>> >> >>>> ========================= >>> >> >>>> >>> >> >>>> As I said before, In the spark logs (the log4j configurable ones >>> >> >>>> from >>> >> >>>> the driver), I only see references to some stages, where the >>> stage >>> >> >>>> IDs >>> >> >>>> are not arithmetically increasing. In the event logs, however, I >>> >> >>>> will >>> >> >>>> see reference to *every* stage, although not all stages will have >>> >> >>>> tasks associated with them. >>> >> >>>> >>> >> >>>> For instance, to examine the actual stages that have tasks, you >>> can >>> >> >>>> see missing stages: >>> >> >>>> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \ >>> >> >>>> # | grep -Eo '"Stage ID":[[:digit:]]+' \ >>> >> >>>> # | sort -n|uniq | head -n 5 >>> >> >>>> "Stage ID":0 >>> >> >>>> "Stage ID":1 >>> >> >>>> "Stage ID":10 >>> >> >>>> "Stage ID":11 >>> >> >>>> "Stage ID":110 >>> >> >>>> >>> >> >>>> However, these "missing" stages *do* appear in the event logs as >>> >> >>>> Stage >>> >> >>>> IDs in the jobs submitted, i.e: for >>> >> >>>> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo >>> >> >>>> 'Stage >>> >> >>>> IDs":\[.*\]' | head -n 5 >>> >> >>>> Stage IDs":[0,1,2] >>> >> >>>> Stage IDs":[5,3,4] >>> >> >>>> Stage IDs":[6,7,8] >>> >> >>>> Stage IDs":[9,10,11] >>> >> >>>> Stage IDs":[12,13,14] >>> >> >>>> >>> >> >>>> I do not know if this amounts to a bug, since I am not familiar >>> with >>> >> >>>> the scheduler in detail. The stages have seemingly been created >>> >> >>>> somewhere in the DAG, but then have no associated tasks and never >>> >> >>>> appear again. >>> >> >>>> >>> >> >>>> 2. Extracting Event Log Information >>> >> >>>> ==================================== >>> >> >>>> Currently we are running scalability tests, and are finding very >>> >> >>>> poor >>> >> >>>> scalability for certain block matrix algorithms. I would like to >>> >> >>>> have >>> >> >>>> finer detail about the communication time and bandwidth when >>> data is >>> >> >>>> transferred between nodes. >>> >> >>>> >>> >> >>>> I would really just like to have a file with nothing but task >>> info >>> >> >>>> in >>> >> >>>> a format such as: >>> >> >>>> timestamp (ms), task ID, hostname, execution time (ms), GC time >>> >> >>>> (ms), >>> >> >>>> ... >>> >> >>>> 0010294, 1, slave-1, 503, 34, ... >>> >> >>>> 0010392, 2, slave-2, 543, 32, ... >>> >> >>>> and similarly for jobs/stages/rdd_memory/shuffle output/etc. >>> >> >>>> >>> >> >>>> I have extracted the relevant time fields from the spark event >>> logs >>> >> >>>> with a sed script, but I wonder if there is an even more >>> expedient >>> >> >>>> way. Unfortunately, I do not immediately see how to do this using >>> >> >>>> the >>> >> >>>> $SPARK_HOME/conf/metrics.properties file and haven't come across >>> a >>> >> >>>> blog/etc that describes this. Could anyone please comment on >>> whether >>> >> >>>> or not a metrics configuation for this already exists? >>> >> >>>> >>> >> >>>> 3. Time Metrics in Spark Event Log >>> >> >>>> ================================== >>> >> >>>> I am confused about the times reported for tasks in the event >>> log. >>> >> >>>> There are launch and finish timestamps given for each task (call >>> >> >>>> them >>> >> >>>> t1 and t2, respectively), as well as GC time (t_gc), execution >>> time >>> >> >>>> (t_exec), and serialization times (t_ser, t_deser). However the >>> >> >>>> times >>> >> >>>> do not add up as I would have expected. I would imagine that the >>> >> >>>> elapsed time t2 - t1 would be slightly larger than the sum of the >>> >> >>>> component times. However, I can find many instances in the event >>> >> >>>> logs >>> >> >>>> where: >>> >> >>>> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec) >>> >> >>>> The difference can be 500 ms or more, which is not negligible >>> for my >>> >> >>>> current execution times of ~5000 ms. I have attached a plot that >>> >> >>>> illustrates this. >>> >> >>>> >>> >> >>>> Regarding this, I'd like to ask: >>> >> >>>> 1. How exactly are these times are being measured? >>> >> >>>> 2. Should the sum of the component times equal the elapsed >>> (clock) >>> >> >>>> time for the task? >>> >> >>>> 3. If not, which component(s) is(are) being excluded, and when do >>> >> >>>> they >>> >> >>>> occur? >>> >> >>>> 4. There are occasionally reported measurements for Shuffle Write >>> >> >>>> time, but not shuffle read time. Is there a method to determine >>> the >>> >> >>>> time required to shuffle data? Could this be done by look at >>> delays >>> >> >>>> between the first task in a new stage and the last task in the >>> >> >>>> previous stage? >>> >> >>>> >>> >> >>>> Thank you very much for your time, >>> >> >>>> Mike >>> >> >>>> >>> >> >>>> >>> >> >>>> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote: >>> >> >>>> > Hey Mike, >>> >> >>>> > >>> >> >>>> > Stage ID's are not guaranteed to be sequential because of the >>> way >>> >> the >>> >> >>>> > DAG scheduler works (only increasing). In some cases stage ID >>> >> numbers >>> >> >>>> > are skipped when stages are generated. >>> >> >>>> > >>> >> >>>> > Any stage/ID that appears in the Spark UI is an actual stage, >>> so >>> >> >>>> > if >>> >> >>>> > you see ID's in there, but they are not in the logs, then let >>> us >>> >> know >>> >> >>>> > (that would be a bug). >>> >> >>>> > >>> >> >>>> > - Patrick >>> >> >>>> > >>> >> >>>> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das >>> >> >>>> > <ak...@sigmoidanalytics.com> >>> >> >>>> > wrote: >>> >> >>>> >> Are you seeing the same behavior on the driver UI? (that >>> running >>> >> >>>> >> on >>> >> >>>> >> port >>> >> >>>> >> 4040), If you click on the stage id header you can sort the >>> >> >>>> >> stages >>> >> >>>> >> based >>> >> >>>> >> on >>> >> >>>> >> IDs. >>> >> >>>> >> >>> >> >>>> >> Thanks >>> >> >>>> >> Best Regards >>> >> >>>> >> >>> >> >>>> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91m...@gmail.com >>> > >>> >> >>>> >> wrote: >>> >> >>>> >>> >>> >> >>>> >>> Hi folks, >>> >> >>>> >>> >>> >> >>>> >>> When I look at the output logs for an iterative Spark >>> program, I >>> >> >>>> >>> see >>> >> >>>> >>> that the stage IDs are not arithmetically numbered---that is, >>> >> there >>> >> >>>> >>> are gaps between stages and I might find log information >>> about >>> >> >>>> >>> Stage >>> >> >>>> >>> 0, 1,2, 5, but not 3 or 4. >>> >> >>>> >>> >>> >> >>>> >>> As an example, the output from the Spark logs below shows >>> what I >>> >> >>>> >>> mean: >>> >> >>>> >>> >>> >> >>>> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr | grep finished >>> >> >>>> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at >>> >> >>>> >>> blockMap.scala:1444) >>> >> >>>> >>> finished in 7.820 s: >>> >> >>>> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) >>> >> >>>> >>> finished >>> >> >>>> >>> in 3.874 s: >>> >> >>>> >>> 18291:INFO:DAGScheduler:Stage 2 (count at >>> blockMap.scala:1179) >>> >> >>>> >>> finished in 2.237 s: >>> >> >>>> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) >>> >> >>>> >>> finished >>> >> >>>> >>> in 1.749 s: >>> >> >>>> >>> 21254:INFO:DAGScheduler:Stage 5 (count at >>> blockMap.scala:1180) >>> >> >>>> >>> finished in 1.082 s: >>> >> >>>> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) >>> >> >>>> >>> finished >>> >> >>>> >>> in 2.078 s: >>> >> >>>> >>> 24773:INFO:DAGScheduler:Stage 8 (count at >>> blockMap.scala:1188) >>> >> >>>> >>> finished in 1.317 s: >>> >> >>>> >>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817) >>> >> >>>> >>> finished >>> >> >>>> >>> in 1.638 s: >>> >> >>>> >>> 27228:INFO:DAGScheduler:Stage 11 (count at >>> blockMap.scala:1189) >>> >> >>>> >>> finished in 0.732 s: >>> >> >>>> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at >>> >> >>>> >>> blockMap.scala:1302) >>> >> >>>> >>> finished in 0.192 s: >>> >> >>>> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at >>> >> >>>> >>> blockMap.scala:1302) >>> >> >>>> >>> finished in 0.170 s: >>> >> >>>> >>> 28018:INFO:DAGScheduler:Stage 20 (count at >>> blockMap.scala:1201) >>> >> >>>> >>> finished in 0.270 s: >>> >> >>>> >>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355) >>> >> >>>> >>> finished >>> >> >>>> >>> in 0.455 s: >>> >> >>>> >>> 29598:INFO:DAGScheduler:Stage 24 (count at >>> blockMap.scala:274) >>> >> >>>> >>> finished in 0.928 s: >>> >> >>>> >>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355) >>> >> >>>> >>> finished >>> >> >>>> >>> in 0.305 s: >>> >> >>>> >>> 30390:INFO:DAGScheduler:Stage 28 (count at >>> blockMap.scala:275) >>> >> >>>> >>> finished in 0.391 s: >>> >> >>>> >>> 30452:INFO:DAGScheduler:Stage 32 (first at >>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s: >>> >> >>>> >>> 30506:INFO:DAGScheduler:Stage 36 (first at >>> >> >>>> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s: >>> >> >>>> >>> >>> >> >>>> >>> Can anyone comment on this being normal behavior? Is it >>> >> >>>> >>> indicative >>> >> >>>> >>> of >>> >> >>>> >>> faults causing stages to be resubmitted? I also cannot find >>> the >>> >> >>>> >>> missing stages in any stage's parent List(Stage x, Stage y, >>> ...) >>> >> >>>> >>> >>> >> >>>> >>> Thanks, >>> >> >>>> >>> Mike >>> >> >>>> >>> >>> >> >>>> >>> >>> >> >>>> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote: >>> >> >>>> >>> > Thanks, René. I actually added a warning to the new JDBC >>> >> >>>> reader/writer >>> >> >>>> >>> > interface for 1.4.0. >>> >> >>>> >>> > >>> >> >>>> >>> > Even with that, I think we should support throttling JDBC; >>> >> >>>> >>> > otherwise >>> >> >>>> >>> > it's >>> >> >>>> >>> > too convenient for our users to DOS their production >>> database >>> >> >>>> servers! >>> >> >>>> >>> > >>> >> >>>> >>> > >>> >> >>>> >>> > /** >>> >> >>>> >>> > * Construct a [[DataFrame]] representing the database >>> table >>> >> >>>> >>> > accessible >>> >> >>>> >>> > via JDBC URL >>> >> >>>> >>> > * url named table. Partitions of the table will be >>> >> >>>> >>> > retrieved >>> >> >>>> >>> > in >>> >> >>>> >>> > parallel >>> >> >>>> >>> > based on the parameters >>> >> >>>> >>> > * passed to this function. >>> >> >>>> >>> > * >>> >> >>>> >>> > * * Don't create too many partitions in parallel on a >>> large >>> >> >>>> cluster; >>> >> >>>> >>> > otherwise Spark might crash* >>> >> >>>> >>> > * * your external database systems.* >>> >> >>>> >>> > * >>> >> >>>> >>> > * @param url JDBC database url of the form >>> >> >>>> >>> > `jdbc:subprotocol:subname` >>> >> >>>> >>> > * @param table Name of the table in the external >>> database. >>> >> >>>> >>> > * @param columnName the name of a column of integral >>> type >>> >> that >>> >> >>>> will >>> >> >>>> >>> > be >>> >> >>>> >>> > used for partitioning. >>> >> >>>> >>> > * @param lowerBound the minimum value of `columnName` >>> used >>> >> >>>> >>> > to >>> >> >>>> >>> > decide >>> >> >>>> >>> > partition stride >>> >> >>>> >>> > * @param upperBound the maximum value of `columnName` >>> used >>> >> >>>> >>> > to >>> >> >>>> >>> > decide >>> >> >>>> >>> > partition stride >>> >> >>>> >>> > * @param numPartitions the number of partitions. the >>> range >>> >> >>>> >>> > `minValue`-`maxValue` will be split >>> >> >>>> >>> > * evenly into this many partitions >>> >> >>>> >>> > * @param connectionProperties JDBC database connection >>> >> >>>> >>> > arguments, >>> >> >>>> a >>> >> >>>> >>> > list >>> >> >>>> >>> > of arbitrary string >>> >> >>>> >>> > * tag/value. Normally at >>> least >>> >> >>>> >>> > a >>> >> >>>> "user" >>> >> >>>> >>> > and >>> >> >>>> >>> > "password" property >>> >> >>>> >>> > * should be included. >>> >> >>>> >>> > * >>> >> >>>> >>> > * @since 1.4.0 >>> >> >>>> >>> > */ >>> >> >>>> >>> > >>> >> >>>> >>> > >>> >> >>>> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer < >>> >> rtref...@gmail.com> >>> >> >>>> >>> > wrote: >>> >> >>>> >>> > >>> >> >>>> >>> >> Hi, >>> >> >>>> >>> >> >>> >> >>>> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ => >>> >> >>>> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where >>> >> >>>> >>> >> "where" >>> >> >>>> >>> >> is >>> >> >>>> an >>> >> >>>> >>> >> Array[String] of 32 to 48 elements). (The code is >>> tailored >>> >> >>>> >>> >> to >>> >> >>>> >>> >> your >>> >> >>>> >>> >> db, >>> >> >>>> >>> >> specifically through the where conditions, I'd have >>> otherwise >>> >> >>>> >>> >> post >>> >> >>>> >>> >> it) >>> >> >>>> >>> >> That should be the DataFrame API, but I'm just trying to >>> load >>> >> >>>> >>> >> everything >>> >> >>>> >>> >> and discard it as soon as possible :-) >>> >> >>>> >>> >> >>> >> >>>> >>> >> (1) Never do a silent drop of the values by default: it >>> kills >>> >> >>>> >>> >> confidence. >>> >> >>>> >>> >> An option sounds reasonable. Some sort of insight / log >>> >> >>>> >>> >> would >>> >> >>>> >>> >> be >>> >> >>>> >>> >> great. >>> >> >>>> >>> >> (How many columns of what type were truncated? why?) >>> >> >>>> >>> >> Note that I could declare the field as string via >>> >> >>>> >>> >> JdbcDialects >>> >> >>>> (thank >>> >> >>>> >>> >> you >>> >> >>>> >>> >> guys for merging that :-) ). >>> >> >>>> >>> >> I have quite bad experiences with silent drops / >>> truncates of >>> >> >>>> columns >>> >> >>>> >>> >> and >>> >> >>>> >>> >> thus _like_ the strict way of spark. It causes trouble but >>> >> >>>> >>> >> noticing >>> >> >>>> >>> >> later >>> >> >>>> >>> >> that your data was corrupted during conversion is even >>> worse. >>> >> >>>> >>> >> >>> >> >>>> >>> >> (2) SPARK-8004 >>> >> https://issues.apache.org/jira/browse/SPARK-8004 >>> >> >>>> >>> >> >>> >> >>>> >>> >> (3) One option would be to make it safe to use, the other >>> >> option >>> >> >>>> >>> >> would >>> >> >>>> >>> >> be >>> >> >>>> >>> >> to document the behavior (s.th. like "WARNING: this >>> method >>> >> tries >>> >> >>>> >>> >> to >>> >> >>>> >>> >> load >>> >> >>>> >>> >> as many partitions as possible, make sure your database >>> can >>> >> >>>> >>> >> handle >>> >> >>>> >>> >> the >>> >> >>>> >>> >> load >>> >> >>>> >>> >> or load them in chunks and use union"). SPARK-8008 >>> >> >>>> >>> >> https://issues.apache.org/jira/browse/SPARK-8008 >>> >> >>>> >>> >> >>> >> >>>> >>> >> Regards, >>> >> >>>> >>> >> Rene Treffer >>> >> >>>> >>> >> >>> >> >>>> >>> > >>> >> >>>> >>> >>> >> >>>> >>> >>> >> >>>> >>> -- >>> >> >>>> >>> Thanks, >>> >> >>>> >>> Mike >>> >> >>>> >>> >>> >> >>>> >>> >>> >> --------------------------------------------------------------------- >>> >> >>>> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>> >> >>>> >>> For additional commands, e-mail: dev-h...@spark.apache.org >>> >> >>>> >>> >>> >> >>>> >> >>> >> >>>> > >>> >> >>>> >>> >> >>>> >>> >> >>>> -- >>> >> >>>> Thanks, >>> >> >>>> Mike >>> >> >>>> >>> >> >>>> >>> >> >>>> >>> --------------------------------------------------------------------- >>> >> >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>> >> >>>> For additional commands, e-mail: dev-h...@spark.apache.org >>> >> >>>> >>> >> >>> >>> >> >> >>> >> >> >>> >> >> -- >>> >> >> Thanks, >>> >> >> Mike >>> >> >> >>> >> > >>> >> > >>> >> > -- >>> >> > Thanks, >>> >> > Mike >>> >> > >>> >> >>> >> >>> >> -- >>> >> Thanks, >>> >> Mike >>> >> >>> > >>> >>> >>> -- >>> Thanks, >>> Mike >>> >> >> >