For info, in our team have defined our own cogroup on dataframe in the past
on different projects using different methods (rdd[row] based or union all
collect list based).

I might be biased, but find the approach very useful in project to simplify
and speed up transformations, and remove a lot of intermediate stages
(distinct + join => just cogroup).

Plus spark 2.4 introduced a lot of new operator for nested data. That's a
win!


On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote:

> I am wondering do other people have opinion/use case on cogroup?
>
> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote:
>
>> Alessandro,
>>
>> Thanks for the reply. I assume by "equi-join", you mean "equality  full
>> outer join" .
>>
>> Two issues I see with equity outer join is:
>> (1) equity outer join will give n * m rows for each key (n and m being
>> the corresponding number of rows in df1 and df2 for each key)
>> (2) User needs to do some extra processing to transform n * m back to the
>> desired shape (two sub dataframes with n and m rows)
>>
>> I think full outer join is an inefficient way to implement cogroup. If
>> the end goal is to have two separate dataframes for each key, why joining
>> them first and then unjoin them?
>>
>>
>>
>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>> alessandro.solima...@gmail.com> wrote:
>>
>>> Hello,
>>> I fail to see how an equi-join on the key columns is different than the
>>> cogroup you propose.
>>>
>>> I think the accepted answer can shed some light:
>>>
>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>
>>> Now you apply an udf on each iterable, one per key value (obtained with
>>> cogroup).
>>>
>>> You can achieve the same by:
>>> 1) join df1 and df2 on the key you want,
>>> 2) apply "groupby" on such key
>>> 3) finally apply a udaf (you can have a look here if you are not
>>> familiar with them
>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>> that will process each group "in isolation".
>>>
>>> HTH,
>>> Alessandro
>>>
>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We have been using Pyspark's groupby().apply() quite a bit and it has
>>>> been very helpful in integrating Spark with our existing pandas-heavy
>>>> libraries.
>>>>
>>>> Recently, we have found more and more cases where groupby().apply() is
>>>> not sufficient - In some cases, we want to group two dataframes by the same
>>>> key, and apply a function which takes two pd.DataFrame (also returns a
>>>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>>>> operation in the RDD API.
>>>>
>>>> It would be great to be able to do sth like this: (not actual API, just
>>>> to explain the use case):
>>>>
>>>> @pandas_udf(return_schema, ...)
>>>> def my_udf(pdf1, pdf2)
>>>>      # pdf1 and pdf2 are the subset of the original dataframes that is
>>>> associated with a particular key
>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>      return result
>>>>
>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>
>>>> I have searched around the problem and some people have suggested to
>>>> join the tables first. However, it's often not the same pattern and hard to
>>>> get it to work by using joins.
>>>>
>>>> I wonder what are people's thought on this?
>>>>
>>>> Li
>>>>
>>>>

Reply via email to