Thank you both for the reply. Chris and I have very similar use cases for
cogroup.

One of the goals for groupby apply + pandas UDF was to avoid things like
collect list and reshaping data between Spark and Pandas. Cogroup feels
very similar and can be an extension to the groupby apply + pandas UDF
functionality.

I wonder if any PMC/committers have any thoughts/opinions on this?

On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:

> Just to add to this I’ve also implemented my own cogroup previously and
> would welcome a cogroup for datafame.
>
> My specific use case was that I had a large amount of time series data.
> Spark has very limited support for time series (specifically as-of joins),
> but pandas has good support.
>
> My solution was to take my two dataframes and perform a group by and
> collect list on each. The resulting arrays could be passed into a udf where
> they could be marshaled into a couple of pandas dataframes and processed
> using pandas excellent time series functionality.
>
> If cogroup was available natively on dataframes this would have been a bit
> nicer. The ideal would have been some pandas udf version of cogroup that
> gave me a pandas dataframe for each spark dataframe in the cogroup!
>
> Chris
>
> On 26 Feb 2019, at 00:38, Jonathan Winandy <jonathan.wina...@gmail.com>
> wrote:
>
> For info, in our team have defined our own cogroup on dataframe in the
> past on different projects using different methods (rdd[row] based or union
> all collect list based).
>
> I might be biased, but find the approach very useful in project to
> simplify and speed up transformations, and remove a lot of intermediate
> stages (distinct + join => just cogroup).
>
> Plus spark 2.4 introduced a lot of new operator for nested data. That's a
> win!
>
>
> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote:
>
>> I am wondering do other people have opinion/use case on cogroup?
>>
>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote:
>>
>>> Alessandro,
>>>
>>> Thanks for the reply. I assume by "equi-join", you mean "equality  full
>>> outer join" .
>>>
>>> Two issues I see with equity outer join is:
>>> (1) equity outer join will give n * m rows for each key (n and m being
>>> the corresponding number of rows in df1 and df2 for each key)
>>> (2) User needs to do some extra processing to transform n * m back to
>>> the desired shape (two sub dataframes with n and m rows)
>>>
>>> I think full outer join is an inefficient way to implement cogroup. If
>>> the end goal is to have two separate dataframes for each key, why joining
>>> them first and then unjoin them?
>>>
>>>
>>>
>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>> alessandro.solima...@gmail.com> wrote:
>>>
>>>> Hello,
>>>> I fail to see how an equi-join on the key columns is different than the
>>>> cogroup you propose.
>>>>
>>>> I think the accepted answer can shed some light:
>>>>
>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>
>>>> Now you apply an udf on each iterable, one per key value (obtained with
>>>> cogroup).
>>>>
>>>> You can achieve the same by:
>>>> 1) join df1 and df2 on the key you want,
>>>> 2) apply "groupby" on such key
>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>> familiar with them
>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>> that will process each group "in isolation".
>>>>
>>>> HTH,
>>>> Alessandro
>>>>
>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has
>>>>> been very helpful in integrating Spark with our existing pandas-heavy
>>>>> libraries.
>>>>>
>>>>> Recently, we have found more and more cases where groupby().apply() is
>>>>> not sufficient - In some cases, we want to group two dataframes by the 
>>>>> same
>>>>> key, and apply a function which takes two pd.DataFrame (also returns a
>>>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>>>> operation in the RDD API.
>>>>>
>>>>> It would be great to be able to do sth like this: (not actual API,
>>>>> just to explain the use case):
>>>>>
>>>>> @pandas_udf(return_schema, ...)
>>>>> def my_udf(pdf1, pdf2)
>>>>>      # pdf1 and pdf2 are the subset of the original dataframes that is
>>>>> associated with a particular key
>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>      return result
>>>>>
>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>
>>>>> I have searched around the problem and some people have suggested to
>>>>> join the tables first. However, it's often not the same pattern and hard 
>>>>> to
>>>>> get it to work by using joins.
>>>>>
>>>>> I wonder what are people's thought on this?
>>>>>
>>>>> Li
>>>>>
>>>>>

Reply via email to