Just to add to this I’ve also implemented my own cogroup previously and would 
welcome a cogroup for datafame.

My specific use case was that I had a large amount of time series data. Spark 
has very limited support for time series (specifically as-of joins), but pandas 
has good support.

My solution was to take my two dataframes and perform a group by and collect 
list on each. The resulting arrays could be passed into a udf where they could 
be marshaled into a couple of pandas dataframes and processed using pandas 
excellent time series functionality.

If cogroup was available natively on dataframes this would have been a bit 
nicer. The ideal would have been some pandas udf version of cogroup that gave 
me a pandas dataframe for each spark dataframe in the cogroup!

Chris 

> On 26 Feb 2019, at 00:38, Jonathan Winandy <jonathan.wina...@gmail.com> wrote:
> 
> For info, in our team have defined our own cogroup on dataframe in the past 
> on different projects using different methods (rdd[row] based or union all 
> collect list based). 
> 
> I might be biased, but find the approach very useful in project to simplify 
> and speed up transformations, and remove a lot of intermediate stages 
> (distinct + join => just cogroup). 
> 
> Plus spark 2.4 introduced a lot of new operator for nested data. That's a 
> win! 
> 
> 
>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote:
>> I am wondering do other people have opinion/use case on cogroup?
>> 
>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote:
>>> Alessandro,
>>> 
>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full 
>>> outer join" .
>>> 
>>> Two issues I see with equity outer join is:
>>> (1) equity outer join will give n * m rows for each key (n and m being the 
>>> corresponding number of rows in df1 and df2 for each key)
>>> (2) User needs to do some extra processing to transform n * m back to the 
>>> desired shape (two sub dataframes with n and m rows) 
>>> 
>>> I think full outer join is an inefficient way to implement cogroup. If the 
>>> end goal is to have two separate dataframes for each key, why joining them 
>>> first and then unjoin them?
>>> 
>>> 
>>> 
>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando 
>>>> <alessandro.solima...@gmail.com> wrote:
>>>> Hello,
>>>> I fail to see how an equi-join on the key columns is different than the 
>>>> cogroup you propose.
>>>> 
>>>> I think the accepted answer can shed some light:
>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>> 
>>>> Now you apply an udf on each iterable, one per key value (obtained with 
>>>> cogroup).
>>>> 
>>>> You can achieve the same by: 
>>>> 1) join df1 and df2 on the key you want, 
>>>> 2) apply "groupby" on such key
>>>> 3) finally apply a udaf (you can have a look here if you are not familiar 
>>>> with them 
>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), that 
>>>> will process each group "in isolation".
>>>> 
>>>> HTH,
>>>> Alessandro
>>>> 
>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has 
>>>>> been very helpful in integrating Spark with our existing pandas-heavy 
>>>>> libraries.
>>>>> 
>>>>> Recently, we have found more and more cases where groupby().apply() is 
>>>>> not sufficient - In some cases, we want to group two dataframes by the 
>>>>> same key, and apply a function which takes two pd.DataFrame (also returns 
>>>>> a pd.DataFrame) for each key. This feels very much like the "cogroup" 
>>>>> operation in the RDD API.
>>>>> 
>>>>> It would be great to be able to do sth like this: (not actual API, just 
>>>>> to explain the use case):
>>>>> 
>>>>> @pandas_udf(return_schema, ...)
>>>>> def my_udf(pdf1, pdf2)
>>>>>       # pdf1 and pdf2 are the subset of the original dataframes that is 
>>>>> associated with a particular key
>>>>>       result = ... # some code that uses pdf1 and pdf2
>>>>>       return result   
>>>>> 
>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>> 
>>>>> I have searched around the problem and some people have suggested to join 
>>>>> the tables first. However, it's often not the same pattern and hard to 
>>>>> get it to work by using joins.
>>>>> 
>>>>> I wonder what are people's thought on this? 
>>>>> 
>>>>> Li
>>>>> 

Reply via email to