On Mon, May 17, 2021 at 2:31 PM Lalwani, Jayesh <jlalw...@amazon.com> wrote: > > If the UDFs are computationally expensive, I wouldn't solve this problem with > UDFs at all. If they are working in an iterative manner, and assuming each > iteration is independent of other iterations (yes, I know that's a big > assumptiuon), I would think about exploding your dataframe to have a row per > iteration, and working on each row separately, and then aggregating in the > end. This allows you to scale your computation much better.
Ah, in this case, I mean "iterative" in the sense of the "code/run/examine" sense of the word, not that the UDF itself is performing an iterative computation. > > I know not all computations can be map-reducable like that. However, most can. > > Split and merge data workflows in Spark don't work like their DAG > representations, unless you add costly caches. Without caching, each split > will result in Spark rereading data from the source, even if the splits are > getting merged together. The only way to avoid it is by caching at the split > point, which depending on the amount of data can become costly. Also, joins > result in shuffles. Avoiding splits and merges is better. > > To give you an example, we had an application that applied a series of rules > to rows. The output required was a dataframe with an additional column that > indicated which rule the row satisfied. In our initial implementation, we had > a series of r one per rule. For N rules, we created N dataframes that had the > rows that satisfied the rules. The we unioned the N data frames. Horrible > performance that didn't scale with N. We reimplemented to add N Boolean > columns; one per rule; that indicated if the rule was satisfied. We just kept > adding the boolen columns to the dataframe. After iterating over the rules, > we added another column that indicated out which rule was satisfied, and then > dropped the Boolean columns. Much better performance that scaled with N. > Spark read from datasource just once, and since there were no joins/unions, > there was no shuffle The hitch in your example, and what we're trying to avoid, is that if you need to change one of these boolean columns, you end up needing to recompute everything "afterwards" in the DAG (AFAICT), even if the "latter" stages don't have a true dependency on the changed column. We do explorations of very large physics datasets, and one of the disadvantages of our bespoke analysis software is that any change to the analysis code involves re-computing everything from scratch. A big goal of mine is to make it so that what was changed is recomputed, and no more, which will speed up the rate at which we can find new physics. Cheers Andrew > > On 5/17/21, 2:56 PM, "Andrew Melo" <andrew.m...@gmail.com> wrote: > > CAUTION: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > In our case, these UDFs are quite expensive and worked on in an > iterative manner, so being able to cache the two "sides" of the graphs > independently will speed up the development cycle. Otherwise, if you > modify foo() here, then you have to recompute bar and baz, even though > they're unchanged. > > df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', > baz('x')) > > Additionally, a longer goal would be to be able to persist/cache these > columns to disk so a downstream user could later mix and match several > (10s) of these columns together as their inputs w/o having to > explicitly compute them themselves. > > Cheers > Andrew > > On Mon, May 17, 2021 at 1:10 PM Sean Owen <sro...@gmail.com> wrote: > > > > Why join here - just add two columns to the DataFrame directly? > > > > On Mon, May 17, 2021 at 1:04 PM Andrew Melo <andrew.m...@gmail.com> > wrote: > >> > >> Anyone have ideas about the below Q? > >> > >> It seems to me that given that "diamond" DAG, that spark could see > >> that the rows haven't been shuffled/filtered, it could do some type of > >> "zip join" to push them together, but I've not been able to get a plan > >> that doesn't do a hash/sort merge join > >> > >> Cheers > >> Andrew > >> > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org