For info, in our team have defined our own cogroup on dataframe in the past on different projects using different methods (rdd[row] based or union all collect list based).
I might be biased, but find the approach very useful in project to simplify and speed up transformations, and remove a lot of intermediate stages (distinct + join => just cogroup). Plus spark 2.4 introduced a lot of new operator for nested data. That's a win! On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote: > I am wondering do other people have opinion/use case on cogroup? > > On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote: > >> Alessandro, >> >> Thanks for the reply. I assume by "equi-join", you mean "equality full >> outer join" . >> >> Two issues I see with equity outer join is: >> (1) equity outer join will give n * m rows for each key (n and m being >> the corresponding number of rows in df1 and df2 for each key) >> (2) User needs to do some extra processing to transform n * m back to the >> desired shape (two sub dataframes with n and m rows) >> >> I think full outer join is an inefficient way to implement cogroup. If >> the end goal is to have two separate dataframes for each key, why joining >> them first and then unjoin them? >> >> >> >> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando < >> alessandro.solima...@gmail.com> wrote: >> >>> Hello, >>> I fail to see how an equi-join on the key columns is different than the >>> cogroup you propose. >>> >>> I think the accepted answer can shed some light: >>> >>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark >>> >>> Now you apply an udf on each iterable, one per key value (obtained with >>> cogroup). >>> >>> You can achieve the same by: >>> 1) join df1 and df2 on the key you want, >>> 2) apply "groupby" on such key >>> 3) finally apply a udaf (you can have a look here if you are not >>> familiar with them >>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), >>> that will process each group "in isolation". >>> >>> HTH, >>> Alessandro >>> >>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> We have been using Pyspark's groupby().apply() quite a bit and it has >>>> been very helpful in integrating Spark with our existing pandas-heavy >>>> libraries. >>>> >>>> Recently, we have found more and more cases where groupby().apply() is >>>> not sufficient - In some cases, we want to group two dataframes by the same >>>> key, and apply a function which takes two pd.DataFrame (also returns a >>>> pd.DataFrame) for each key. This feels very much like the "cogroup" >>>> operation in the RDD API. >>>> >>>> It would be great to be able to do sth like this: (not actual API, just >>>> to explain the use case): >>>> >>>> @pandas_udf(return_schema, ...) >>>> def my_udf(pdf1, pdf2) >>>> # pdf1 and pdf2 are the subset of the original dataframes that is >>>> associated with a particular key >>>> result = ... # some code that uses pdf1 and pdf2 >>>> return result >>>> >>>> df3 = cogroup(df1, df2, key='some_key').apply(my_udf) >>>> >>>> I have searched around the problem and some people have suggested to >>>> join the tables first. However, it's often not the same pattern and hard to >>>> get it to work by using joins. >>>> >>>> I wonder what are people's thought on this? >>>> >>>> Li >>>> >>>>