Alessandro,

Thanks for the reply. I assume by "equi-join", you mean "equality  full
outer join" .

Two issues I see with equity outer join is:
(1) equity outer join will give n * m rows for each key (n and m being the
corresponding number of rows in df1 and df2 for each key)
(2) User needs to do some extra processing to transform n * m back to the
desired shape (two sub dataframes with n and m rows)

I think full outer join is an inefficient way to implement cogroup. If the
end goal is to have two separate dataframes for each key, why joining them
first and then unjoin them?



On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
alessandro.solima...@gmail.com> wrote:

> Hello,
> I fail to see how an equi-join on the key columns is different than the
> cogroup you propose.
>
> I think the accepted answer can shed some light:
>
> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>
> Now you apply an udf on each iterable, one per key value (obtained with
> cogroup).
>
> You can achieve the same by:
> 1) join df1 and df2 on the key you want,
> 2) apply "groupby" on such key
> 3) finally apply a udaf (you can have a look here if you are not familiar
> with them
> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), that
> will process each group "in isolation".
>
> HTH,
> Alessandro
>
> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote:
>
>> Hi,
>>
>> We have been using Pyspark's groupby().apply() quite a bit and it has
>> been very helpful in integrating Spark with our existing pandas-heavy
>> libraries.
>>
>> Recently, we have found more and more cases where groupby().apply() is
>> not sufficient - In some cases, we want to group two dataframes by the same
>> key, and apply a function which takes two pd.DataFrame (also returns a
>> pd.DataFrame) for each key. This feels very much like the "cogroup"
>> operation in the RDD API.
>>
>> It would be great to be able to do sth like this: (not actual API, just
>> to explain the use case):
>>
>> @pandas_udf(return_schema, ...)
>> def my_udf(pdf1, pdf2)
>>      # pdf1 and pdf2 are the subset of the original dataframes that is
>> associated with a particular key
>>      result = ... # some code that uses pdf1 and pdf2
>>      return result
>>
>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>
>> I have searched around the problem and some people have suggested to join
>> the tables first. However, it's often not the same pattern and hard to get
>> it to work by using joins.
>>
>> I wonder what are people's thought on this?
>>
>> Li
>>
>>

Reply via email to