Thank you both for the reply. Chris and I have very similar use cases for cogroup.
One of the goals for groupby apply + pandas UDF was to avoid things like collect list and reshaping data between Spark and Pandas. Cogroup feels very similar and can be an extension to the groupby apply + pandas UDF functionality. I wonder if any PMC/committers have any thoughts/opinions on this? On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote: > Just to add to this I’ve also implemented my own cogroup previously and > would welcome a cogroup for datafame. > > My specific use case was that I had a large amount of time series data. > Spark has very limited support for time series (specifically as-of joins), > but pandas has good support. > > My solution was to take my two dataframes and perform a group by and > collect list on each. The resulting arrays could be passed into a udf where > they could be marshaled into a couple of pandas dataframes and processed > using pandas excellent time series functionality. > > If cogroup was available natively on dataframes this would have been a bit > nicer. The ideal would have been some pandas udf version of cogroup that > gave me a pandas dataframe for each spark dataframe in the cogroup! > > Chris > > On 26 Feb 2019, at 00:38, Jonathan Winandy <jonathan.wina...@gmail.com> > wrote: > > For info, in our team have defined our own cogroup on dataframe in the > past on different projects using different methods (rdd[row] based or union > all collect list based). > > I might be biased, but find the approach very useful in project to > simplify and speed up transformations, and remove a lot of intermediate > stages (distinct + join => just cogroup). > > Plus spark 2.4 introduced a lot of new operator for nested data. That's a > win! > > > On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote: > >> I am wondering do other people have opinion/use case on cogroup? >> >> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote: >> >>> Alessandro, >>> >>> Thanks for the reply. I assume by "equi-join", you mean "equality full >>> outer join" . >>> >>> Two issues I see with equity outer join is: >>> (1) equity outer join will give n * m rows for each key (n and m being >>> the corresponding number of rows in df1 and df2 for each key) >>> (2) User needs to do some extra processing to transform n * m back to >>> the desired shape (two sub dataframes with n and m rows) >>> >>> I think full outer join is an inefficient way to implement cogroup. If >>> the end goal is to have two separate dataframes for each key, why joining >>> them first and then unjoin them? >>> >>> >>> >>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando < >>> alessandro.solima...@gmail.com> wrote: >>> >>>> Hello, >>>> I fail to see how an equi-join on the key columns is different than the >>>> cogroup you propose. >>>> >>>> I think the accepted answer can shed some light: >>>> >>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark >>>> >>>> Now you apply an udf on each iterable, one per key value (obtained with >>>> cogroup). >>>> >>>> You can achieve the same by: >>>> 1) join df1 and df2 on the key you want, >>>> 2) apply "groupby" on such key >>>> 3) finally apply a udaf (you can have a look here if you are not >>>> familiar with them >>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), >>>> that will process each group "in isolation". >>>> >>>> HTH, >>>> Alessandro >>>> >>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> We have been using Pyspark's groupby().apply() quite a bit and it has >>>>> been very helpful in integrating Spark with our existing pandas-heavy >>>>> libraries. >>>>> >>>>> Recently, we have found more and more cases where groupby().apply() is >>>>> not sufficient - In some cases, we want to group two dataframes by the >>>>> same >>>>> key, and apply a function which takes two pd.DataFrame (also returns a >>>>> pd.DataFrame) for each key. This feels very much like the "cogroup" >>>>> operation in the RDD API. >>>>> >>>>> It would be great to be able to do sth like this: (not actual API, >>>>> just to explain the use case): >>>>> >>>>> @pandas_udf(return_schema, ...) >>>>> def my_udf(pdf1, pdf2) >>>>> # pdf1 and pdf2 are the subset of the original dataframes that is >>>>> associated with a particular key >>>>> result = ... # some code that uses pdf1 and pdf2 >>>>> return result >>>>> >>>>> df3 = cogroup(df1, df2, key='some_key').apply(my_udf) >>>>> >>>>> I have searched around the problem and some people have suggested to >>>>> join the tables first. However, it's often not the same pattern and hard >>>>> to >>>>> get it to work by using joins. >>>>> >>>>> I wonder what are people's thought on this? >>>>> >>>>> Li >>>>> >>>>>