If the UDFs are computationally expensive, I wouldn't solve this problem with UDFs at all. If they are working in an iterative manner, and assuming each iteration is independent of other iterations (yes, I know that's a big assumptiuon), I would think about exploding your dataframe to have a row per iteration, and working on each row separately, and then aggregating in the end. This allows you to scale your computation much better.
I know not all computations can be map-reducable like that. However, most can. Split and merge data workflows in Spark don't work like their DAG representations, unless you add costly caches. Without caching, each split will result in Spark rereading data from the source, even if the splits are getting merged together. The only way to avoid it is by caching at the split point, which depending on the amount of data can become costly. Also, joins result in shuffles. Avoiding splits and merges is better. To give you an example, we had an application that applied a series of rules to rows. The output required was a dataframe with an additional column that indicated which rule the row satisfied. In our initial implementation, we had a series of r one per rule. For N rules, we created N dataframes that had the rows that satisfied the rules. The we unioned the N data frames. Horrible performance that didn't scale with N. We reimplemented to add N Boolean columns; one per rule; that indicated if the rule was satisfied. We just kept adding the boolen columns to the dataframe. After iterating over the rules, we added another column that indicated out which rule was satisfied, and then dropped the Boolean columns. Much better performance that scaled with N. Spark read from datasource just once, and since there were no joins/unions, there was no shuffle On 5/17/21, 2:56 PM, "Andrew Melo" <[email protected]> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. In our case, these UDFs are quite expensive and worked on in an iterative manner, so being able to cache the two "sides" of the graphs independently will speed up the development cycle. Otherwise, if you modify foo() here, then you have to recompute bar and baz, even though they're unchanged. df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', baz('x')) Additionally, a longer goal would be to be able to persist/cache these columns to disk so a downstream user could later mix and match several (10s) of these columns together as their inputs w/o having to explicitly compute them themselves. Cheers Andrew On Mon, May 17, 2021 at 1:10 PM Sean Owen <[email protected]> wrote: > > Why join here - just add two columns to the DataFrame directly? > > On Mon, May 17, 2021 at 1:04 PM Andrew Melo <[email protected]> wrote: >> >> Anyone have ideas about the below Q? >> >> It seems to me that given that "diamond" DAG, that spark could see >> that the rows haven't been shuffled/filtered, it could do some type of >> "zip join" to push them together, but I've not been able to get a plan >> that doesn't do a hash/sort merge join >> >> Cheers >> Andrew >> --------------------------------------------------------------------- To unsubscribe e-mail: [email protected]
