Apologies for not leaving feedback yet. I'm a little swamped this week with
the Spark Summit, but this is at the top of my list to get to for next week.

Bryan

On Thu, Apr 18, 2019 at 4:18 AM Chris Martin <ch...@cmartinit.co.uk> wrote:

> Yes, totally agreed with Li here.
>
> For clarity, I'm happy to do the work to implement this, but it would be
> good to get feedback from the community in general and some of the Spark
> committers in particular.
>
> thanks,
>
> Chris
>
> On Wed, Apr 17, 2019 at 9:17 PM Li Jin <ice.xell...@gmail.com> wrote:
>
>> I have left some comments. This looks a good proposal to me.
>>
>> As a heavy pyspark user, this is a pattern that we see over and over
>> again and I think could be pretty high value to other pyspark users as
>> well. The fact that Chris and I come to same ideas sort of verifies my
>> intuition. Also, this isn't really something new, RDD has cogroup function
>> from very early on.
>>
>> With that being said, I'd like to call out again for community's feedback
>> on the proposal.
>>
>> On Mon, Apr 15, 2019 at 4:57 PM Chris Martin <ch...@cmartinit.co.uk>
>> wrote:
>>
>>> Ah sorry- I've updated the link which should give you access.  Can you
>>> try again now?
>>>
>>> thanks,
>>>
>>> Chris
>>>
>>>
>>>
>>> On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ice.xell...@gmail.com> wrote:
>>>
>>>> Hi Chris,
>>>>
>>>> Thanks! The permission to the google doc is maybe not set up properly.
>>>> I cannot view the doc by default.
>>>>
>>>> Li
>>>>
>>>> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk>
>>>> wrote:
>>>>
>>>>> I've updated the jira so that the main body is now inside a google
>>>>> doc.  Anyone should be able to comment- if you want/need write access
>>>>> please drop me a mail and I can add you.
>>>>>
>>>>> Ryan- regarding your specific point regarding why I'm not proposing to
>>>>> add this to the Scala API, I think the main point is that Scala users can
>>>>> already use Cogroup for Datasets.  For Scala this is probably a better
>>>>> solution as (as far as I know) there is no Scala DataFrame library that
>>>>> could be used in place of Pandas for manipulating  local DataFrames. As a
>>>>> result you'd probably be left with dealing with Iterators of Row objects,
>>>>> which almost certainly isn't what you'd want. This is similar to the
>>>>> existing grouped map Pandas Udfs for which there is no equivalent Scala 
>>>>> Api.
>>>>>
>>>>> I do think there might be a place for allowing a (Scala) DataSet
>>>>> Cogroup to take some sort of grouping expression as the grouping key  
>>>>> (this
>>>>> would mean that you wouldn't have to marshal the key into a JVM object and
>>>>> could possible lend itself to some catalyst optimisations) but I don't
>>>>> think that this should be done as part of this SPIP.
>>>>>
>>>>> thanks,
>>>>>
>>>>> Chris
>>>>>
>>>>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> I agree, it would be great to have a document to comment on.
>>>>>>
>>>>>> The main thing that stands out right now is that this is only for
>>>>>> PySpark and states that it will not be added to the Scala API. Why not 
>>>>>> make
>>>>>> this available since most of the work would be done?
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you Chris, this looks great.
>>>>>>>
>>>>>>> Would you mind share a google doc version of the proposal? I believe
>>>>>>> that's the preferred way of discussing proposals (Other people please
>>>>>>> correct me if I am wrong).
>>>>>>>
>>>>>>> Li
>>>>>>>
>>>>>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>>>>
>>>>>>>> All feedback welcome!
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should
>>>>>>>> have the SPIP ready in the next couple of days.
>>>>>>>>
>>>>>>>> thanks,
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't
>>>>>>>>> be too difficult to extend the currently functionality to transfer 
>>>>>>>>> multiple
>>>>>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>>>>>> think it's necessary to include details of the Python worker, we can 
>>>>>>>>> hash
>>>>>>>>> that out after the SPIP is approved.
>>>>>>>>>
>>>>>>>>> Bryan
>>>>>>>>>
>>>>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Chris, look forward to it.
>>>>>>>>>>
>>>>>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>>>>>> some changes but shouldn't be too difficult. We can probably sth 
>>>>>>>>>> like:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>>>>>
>>>>>>>>>> In:
>>>>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>>>>>
>>>>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>>>>>
>>>>>>>>>> Li
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>>>>>> working on a SPIP to formally propose this. One concern I do have, 
>>>>>>>>>>> however,
>>>>>>>>>>> is that the current arrow serialization code is tied to passing 
>>>>>>>>>>> through a
>>>>>>>>>>> single dataframe as the udf parameter and so any modification to 
>>>>>>>>>>> allow
>>>>>>>>>>> multiple dataframes may not be straightforward.  If anyone has any 
>>>>>>>>>>> ideas as
>>>>>>>>>>> to how this might be achieved in an elegant manner I’d be happy to 
>>>>>>>>>>> hear
>>>>>>>>>>> them!
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Chris
>>>>>>>>>>>
>>>>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thank you both for the reply. Chris and I have very similar use
>>>>>>>>>>> cases for cogroup.
>>>>>>>>>>>
>>>>>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid
>>>>>>>>>>> things like collect list and reshaping data between Spark and 
>>>>>>>>>>> Pandas.
>>>>>>>>>>> Cogroup feels very similar and can be an extension to the groupby 
>>>>>>>>>>> apply +
>>>>>>>>>>> pandas UDF functionality.
>>>>>>>>>>>
>>>>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on
>>>>>>>>>>> this?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Just to add to this I’ve also implemented my own cogroup
>>>>>>>>>>>> previously and would welcome a cogroup for datafame.
>>>>>>>>>>>>
>>>>>>>>>>>> My specific use case was that I had a large amount of time
>>>>>>>>>>>> series data. Spark has very limited support for time series 
>>>>>>>>>>>> (specifically
>>>>>>>>>>>> as-of joins), but pandas has good support.
>>>>>>>>>>>>
>>>>>>>>>>>> My solution was to take my two dataframes and perform a group
>>>>>>>>>>>> by and collect list on each. The resulting arrays could be passed 
>>>>>>>>>>>> into a
>>>>>>>>>>>> udf where they could be marshaled into a couple of pandas 
>>>>>>>>>>>> dataframes and
>>>>>>>>>>>> processed using pandas excellent time series functionality.
>>>>>>>>>>>>
>>>>>>>>>>>> If cogroup was available natively on dataframes this would have
>>>>>>>>>>>> been a bit nicer. The ideal would have been some pandas udf 
>>>>>>>>>>>> version of
>>>>>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe 
>>>>>>>>>>>> in the
>>>>>>>>>>>> cogroup!
>>>>>>>>>>>>
>>>>>>>>>>>> Chris
>>>>>>>>>>>>
>>>>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>>>>>> jonathan.wina...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> For info, in our team have defined our own cogroup on dataframe
>>>>>>>>>>>> in the past on different projects using different methods 
>>>>>>>>>>>> (rdd[row] based
>>>>>>>>>>>> or union all collect list based).
>>>>>>>>>>>>
>>>>>>>>>>>> I might be biased, but find the approach very useful in project
>>>>>>>>>>>> to simplify and speed up transformations, and remove a lot of 
>>>>>>>>>>>> intermediate
>>>>>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>>>>>
>>>>>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested
>>>>>>>>>>>> data. That's a win!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I am wondering do other people have opinion/use case on
>>>>>>>>>>>>> cogroup?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Alessandro,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean
>>>>>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n
>>>>>>>>>>>>>> and m being the corresponding number of rows in df1 and df2 for 
>>>>>>>>>>>>>> each key)
>>>>>>>>>>>>>> (2) User needs to do some extra processing to transform n * m
>>>>>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for 
>>>>>>>>>>>>>> each key,
>>>>>>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>>>>>>> alessandro.solima...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>> I fail to see how an equi-join on the key columns is
>>>>>>>>>>>>>>> different than the cogroup you propose.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are
>>>>>>>>>>>>>>> not familiar with them
>>>>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> HTH,
>>>>>>>>>>>>>>> Alessandro
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit
>>>>>>>>>>>>>>>> and it has been very helpful in integrating Spark with our 
>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want 
>>>>>>>>>>>>>>>> to group two
>>>>>>>>>>>>>>>> dataframes by the same key, and apply a function which takes 
>>>>>>>>>>>>>>>> two
>>>>>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This 
>>>>>>>>>>>>>>>> feels very
>>>>>>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It would be great to be able to do sth like this: (not
>>>>>>>>>>>>>>>> actual API, just to explain the use case):
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original
>>>>>>>>>>>>>>>> dataframes that is associated with a particular key
>>>>>>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>>>>>>      return result
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>>>>>>> suggested to join the tables first. However, it's often not 
>>>>>>>>>>>>>>>> the same
>>>>>>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Li
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>

Reply via email to