Hi,

 As promised I’ve raised SPARK-27463 for this.

All feedback welcome!

Chris 

> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
> 
> Thanks Bryan and Li, that is much appreciated.  Hopefully should have the 
> SPIP ready in the next couple of days.
> 
> thanks,
> 
> Chris
> 
> 
> 
> 
>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com> wrote:
>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too 
>> difficult to extend the currently functionality to transfer multiple 
>> DataFrames.  For the SPIP, I would keep it more high-level and I don't think 
>> it's necessary to include details of the Python worker, we can hash that out 
>> after the SPIP is approved.
>> 
>> Bryan
>> 
>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com> wrote:
>>> Thanks Chris, look forward to it.
>>> 
>>> I think sending multiple dataframes to the python worker requires some 
>>> changes but shouldn't be too difficult. We can probably sth like:
>>> 
>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>> 
>>> In: 
>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>> 
>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>> 
>>> Li
>>> 
>>> 
>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>> Hi,
>>>> 
>>>> Just to say, I really do think this is useful and am currently working on 
>>>> a SPIP to formally propose this. One concern I do have, however, is that 
>>>> the current arrow serialization code is tied to passing through a single 
>>>> dataframe as the udf parameter and so any modification to allow multiple 
>>>> dataframes may not be straightforward.  If anyone has any ideas as to how 
>>>> this might be achieved in an elegant manner I’d be happy to hear them!
>>>> 
>>>> Thanks,
>>>> 
>>>> Chris 
>>>> 
>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote:
>>>>> 
>>>>> Thank you both for the reply. Chris and I have very similar use cases for 
>>>>> cogroup. 
>>>>> 
>>>>> One of the goals for groupby apply + pandas UDF was to avoid things like 
>>>>> collect list and reshaping data between Spark and Pandas. Cogroup feels 
>>>>> very similar and can be an extension to the groupby apply + pandas UDF 
>>>>> functionality.
>>>>> 
>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>> 
>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>> Just to add to this I’ve also implemented my own cogroup previously and 
>>>>>> would welcome a cogroup for datafame.
>>>>>> 
>>>>>> My specific use case was that I had a large amount of time series data. 
>>>>>> Spark has very limited support for time series (specifically as-of 
>>>>>> joins), but pandas has good support.
>>>>>> 
>>>>>> My solution was to take my two dataframes and perform a group by and 
>>>>>> collect list on each. The resulting arrays could be passed into a udf 
>>>>>> where they could be marshaled into a couple of pandas dataframes and 
>>>>>> processed using pandas excellent time series functionality.
>>>>>> 
>>>>>> If cogroup was available natively on dataframes this would have been a 
>>>>>> bit nicer. The ideal would have been some pandas udf version of cogroup 
>>>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>>> 
>>>>>> Chris 
>>>>>> 
>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jonathan.wina...@gmail.com> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> For info, in our team have defined our own cogroup on dataframe in the 
>>>>>>> past on different projects using different methods (rdd[row] based or 
>>>>>>> union all collect list based). 
>>>>>>> 
>>>>>>> I might be biased, but find the approach very useful in project to 
>>>>>>> simplify and speed up transformations, and remove a lot of intermediate 
>>>>>>> stages (distinct + join => just cogroup). 
>>>>>>> 
>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data. That's 
>>>>>>> a win! 
>>>>>>> 
>>>>>>> 
>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote:
>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>> 
>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>>>> Alessandro,
>>>>>>>>> 
>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality  
>>>>>>>>> full outer join" .
>>>>>>>>> 
>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m 
>>>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>> (2) User needs to do some extra processing to transform n * m back to 
>>>>>>>>> the desired shape (two sub dataframes with n and m rows) 
>>>>>>>>> 
>>>>>>>>> I think full outer join is an inefficient way to implement cogroup. 
>>>>>>>>> If the end goal is to have two separate dataframes for each key, why 
>>>>>>>>> joining them first and then unjoin them?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando 
>>>>>>>>>> <alessandro.solima...@gmail.com> wrote:
>>>>>>>>>> Hello,
>>>>>>>>>> I fail to see how an equi-join on the key columns is different than 
>>>>>>>>>> the cogroup you propose.
>>>>>>>>>> 
>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>> 
>>>>>>>>>> Now you apply an udf on each iterable, one per key value (obtained 
>>>>>>>>>> with cogroup).
>>>>>>>>>> 
>>>>>>>>>> You can achieve the same by: 
>>>>>>>>>> 1) join df1 and df2 on the key you want, 
>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not 
>>>>>>>>>> familiar with them 
>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), 
>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>> 
>>>>>>>>>> HTH,
>>>>>>>>>> Alessandro
>>>>>>>>>> 
>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it 
>>>>>>>>>>> has been very helpful in integrating Spark with our existing 
>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>> 
>>>>>>>>>>> Recently, we have found more and more cases where groupby().apply() 
>>>>>>>>>>> is not sufficient - In some cases, we want to group two dataframes 
>>>>>>>>>>> by the same key, and apply a function which takes two pd.DataFrame 
>>>>>>>>>>> (also returns a pd.DataFrame) for each key. This feels very much 
>>>>>>>>>>> like the "cogroup" operation in the RDD API.
>>>>>>>>>>> 
>>>>>>>>>>> It would be great to be able to do sth like this: (not actual API, 
>>>>>>>>>>> just to explain the use case):
>>>>>>>>>>> 
>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>       # pdf1 and pdf2 are the subset of the original dataframes 
>>>>>>>>>>> that is associated with a particular key
>>>>>>>>>>>       result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>       return result   
>>>>>>>>>>> 
>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>> 
>>>>>>>>>>> I have searched around the problem and some people have suggested 
>>>>>>>>>>> to join the tables first. However, it's often not the same pattern 
>>>>>>>>>>> and hard to get it to work by using joins.
>>>>>>>>>>> 
>>>>>>>>>>> I wonder what are people's thought on this? 
>>>>>>>>>>> 
>>>>>>>>>>> Li

Reply via email to