Apologies for not leaving feedback yet. I'm a little swamped this week with the Spark Summit, but this is at the top of my list to get to for next week.
Bryan On Thu, Apr 18, 2019 at 4:18 AM Chris Martin <ch...@cmartinit.co.uk> wrote: > Yes, totally agreed with Li here. > > For clarity, I'm happy to do the work to implement this, but it would be > good to get feedback from the community in general and some of the Spark > committers in particular. > > thanks, > > Chris > > On Wed, Apr 17, 2019 at 9:17 PM Li Jin <ice.xell...@gmail.com> wrote: > >> I have left some comments. This looks a good proposal to me. >> >> As a heavy pyspark user, this is a pattern that we see over and over >> again and I think could be pretty high value to other pyspark users as >> well. The fact that Chris and I come to same ideas sort of verifies my >> intuition. Also, this isn't really something new, RDD has cogroup function >> from very early on. >> >> With that being said, I'd like to call out again for community's feedback >> on the proposal. >> >> On Mon, Apr 15, 2019 at 4:57 PM Chris Martin <ch...@cmartinit.co.uk> >> wrote: >> >>> Ah sorry- I've updated the link which should give you access. Can you >>> try again now? >>> >>> thanks, >>> >>> Chris >>> >>> >>> >>> On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ice.xell...@gmail.com> wrote: >>> >>>> Hi Chris, >>>> >>>> Thanks! The permission to the google doc is maybe not set up properly. >>>> I cannot view the doc by default. >>>> >>>> Li >>>> >>>> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk> >>>> wrote: >>>> >>>>> I've updated the jira so that the main body is now inside a google >>>>> doc. Anyone should be able to comment- if you want/need write access >>>>> please drop me a mail and I can add you. >>>>> >>>>> Ryan- regarding your specific point regarding why I'm not proposing to >>>>> add this to the Scala API, I think the main point is that Scala users can >>>>> already use Cogroup for Datasets. For Scala this is probably a better >>>>> solution as (as far as I know) there is no Scala DataFrame library that >>>>> could be used in place of Pandas for manipulating local DataFrames. As a >>>>> result you'd probably be left with dealing with Iterators of Row objects, >>>>> which almost certainly isn't what you'd want. This is similar to the >>>>> existing grouped map Pandas Udfs for which there is no equivalent Scala >>>>> Api. >>>>> >>>>> I do think there might be a place for allowing a (Scala) DataSet >>>>> Cogroup to take some sort of grouping expression as the grouping key >>>>> (this >>>>> would mean that you wouldn't have to marshal the key into a JVM object and >>>>> could possible lend itself to some catalyst optimisations) but I don't >>>>> think that this should be done as part of this SPIP. >>>>> >>>>> thanks, >>>>> >>>>> Chris >>>>> >>>>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote: >>>>> >>>>>> I agree, it would be great to have a document to comment on. >>>>>> >>>>>> The main thing that stands out right now is that this is only for >>>>>> PySpark and states that it will not be added to the Scala API. Why not >>>>>> make >>>>>> this available since most of the work would be done? >>>>>> >>>>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xell...@gmail.com> wrote: >>>>>> >>>>>>> Thank you Chris, this looks great. >>>>>>> >>>>>>> Would you mind share a google doc version of the proposal? I believe >>>>>>> that's the preferred way of discussing proposals (Other people please >>>>>>> correct me if I am wrong). >>>>>>> >>>>>>> Li >>>>>>> >>>>>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> As promised I’ve raised SPARK-27463 for this. >>>>>>>> >>>>>>>> All feedback welcome! >>>>>>>> >>>>>>>> Chris >>>>>>>> >>>>>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Thanks Bryan and Li, that is much appreciated. Hopefully should >>>>>>>> have the SPIP ready in the next couple of days. >>>>>>>> >>>>>>>> thanks, >>>>>>>> >>>>>>>> Chris >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't >>>>>>>>> be too difficult to extend the currently functionality to transfer >>>>>>>>> multiple >>>>>>>>> DataFrames. For the SPIP, I would keep it more high-level and I don't >>>>>>>>> think it's necessary to include details of the Python worker, we can >>>>>>>>> hash >>>>>>>>> that out after the SPIP is approved. >>>>>>>>> >>>>>>>>> Bryan >>>>>>>>> >>>>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Chris, look forward to it. >>>>>>>>>> >>>>>>>>>> I think sending multiple dataframes to the python worker requires >>>>>>>>>> some changes but shouldn't be too difficult. We can probably sth >>>>>>>>>> like: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat] >>>>>>>>>> >>>>>>>>>> In: >>>>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70 >>>>>>>>>> >>>>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema. >>>>>>>>>> >>>>>>>>>> Li >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Just to say, I really do think this is useful and am currently >>>>>>>>>>> working on a SPIP to formally propose this. One concern I do have, >>>>>>>>>>> however, >>>>>>>>>>> is that the current arrow serialization code is tied to passing >>>>>>>>>>> through a >>>>>>>>>>> single dataframe as the udf parameter and so any modification to >>>>>>>>>>> allow >>>>>>>>>>> multiple dataframes may not be straightforward. If anyone has any >>>>>>>>>>> ideas as >>>>>>>>>>> to how this might be achieved in an elegant manner I’d be happy to >>>>>>>>>>> hear >>>>>>>>>>> them! >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Chris >>>>>>>>>>> >>>>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>> Thank you both for the reply. Chris and I have very similar use >>>>>>>>>>> cases for cogroup. >>>>>>>>>>> >>>>>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid >>>>>>>>>>> things like collect list and reshaping data between Spark and >>>>>>>>>>> Pandas. >>>>>>>>>>> Cogroup feels very similar and can be an extension to the groupby >>>>>>>>>>> apply + >>>>>>>>>>> pandas UDF functionality. >>>>>>>>>>> >>>>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on >>>>>>>>>>> this? >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote: >>>>>>>>>>> >>>>>>>>>>>> Just to add to this I’ve also implemented my own cogroup >>>>>>>>>>>> previously and would welcome a cogroup for datafame. >>>>>>>>>>>> >>>>>>>>>>>> My specific use case was that I had a large amount of time >>>>>>>>>>>> series data. Spark has very limited support for time series >>>>>>>>>>>> (specifically >>>>>>>>>>>> as-of joins), but pandas has good support. >>>>>>>>>>>> >>>>>>>>>>>> My solution was to take my two dataframes and perform a group >>>>>>>>>>>> by and collect list on each. The resulting arrays could be passed >>>>>>>>>>>> into a >>>>>>>>>>>> udf where they could be marshaled into a couple of pandas >>>>>>>>>>>> dataframes and >>>>>>>>>>>> processed using pandas excellent time series functionality. >>>>>>>>>>>> >>>>>>>>>>>> If cogroup was available natively on dataframes this would have >>>>>>>>>>>> been a bit nicer. The ideal would have been some pandas udf >>>>>>>>>>>> version of >>>>>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe >>>>>>>>>>>> in the >>>>>>>>>>>> cogroup! >>>>>>>>>>>> >>>>>>>>>>>> Chris >>>>>>>>>>>> >>>>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy < >>>>>>>>>>>> jonathan.wina...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> For info, in our team have defined our own cogroup on dataframe >>>>>>>>>>>> in the past on different projects using different methods >>>>>>>>>>>> (rdd[row] based >>>>>>>>>>>> or union all collect list based). >>>>>>>>>>>> >>>>>>>>>>>> I might be biased, but find the approach very useful in project >>>>>>>>>>>> to simplify and speed up transformations, and remove a lot of >>>>>>>>>>>> intermediate >>>>>>>>>>>> stages (distinct + join => just cogroup). >>>>>>>>>>>> >>>>>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested >>>>>>>>>>>> data. That's a win! >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I am wondering do other people have opinion/use case on >>>>>>>>>>>>> cogroup? >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Alessandro, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean >>>>>>>>>>>>>> "equality full outer join" . >>>>>>>>>>>>>> >>>>>>>>>>>>>> Two issues I see with equity outer join is: >>>>>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n >>>>>>>>>>>>>> and m being the corresponding number of rows in df1 and df2 for >>>>>>>>>>>>>> each key) >>>>>>>>>>>>>> (2) User needs to do some extra processing to transform n * m >>>>>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows) >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think full outer join is an inefficient way to implement >>>>>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for >>>>>>>>>>>>>> each key, >>>>>>>>>>>>>> why joining them first and then unjoin them? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando < >>>>>>>>>>>>>> alessandro.solima...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>> I fail to see how an equi-join on the key columns is >>>>>>>>>>>>>>> different than the cogroup you propose. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think the accepted answer can shed some light: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Now you apply an udf on each iterable, one per key value >>>>>>>>>>>>>>> (obtained with cogroup). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> You can achieve the same by: >>>>>>>>>>>>>>> 1) join df1 and df2 on the key you want, >>>>>>>>>>>>>>> 2) apply "groupby" on such key >>>>>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are >>>>>>>>>>>>>>> not familiar with them >>>>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), >>>>>>>>>>>>>>> that will process each group "in isolation". >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> HTH, >>>>>>>>>>>>>>> Alessandro >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit >>>>>>>>>>>>>>>> and it has been very helpful in integrating Spark with our >>>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>> pandas-heavy libraries. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Recently, we have found more and more cases where >>>>>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want >>>>>>>>>>>>>>>> to group two >>>>>>>>>>>>>>>> dataframes by the same key, and apply a function which takes >>>>>>>>>>>>>>>> two >>>>>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This >>>>>>>>>>>>>>>> feels very >>>>>>>>>>>>>>>> much like the "cogroup" operation in the RDD API. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It would be great to be able to do sth like this: (not >>>>>>>>>>>>>>>> actual API, just to explain the use case): >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @pandas_udf(return_schema, ...) >>>>>>>>>>>>>>>> def my_udf(pdf1, pdf2) >>>>>>>>>>>>>>>> # pdf1 and pdf2 are the subset of the original >>>>>>>>>>>>>>>> dataframes that is associated with a particular key >>>>>>>>>>>>>>>> result = ... # some code that uses pdf1 and pdf2 >>>>>>>>>>>>>>>> return result >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> df3 = cogroup(df1, df2, key='some_key').apply(my_udf) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I have searched around the problem and some people have >>>>>>>>>>>>>>>> suggested to join the tables first. However, it's often not >>>>>>>>>>>>>>>> the same >>>>>>>>>>>>>>>> pattern and hard to get it to work by using joins. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I wonder what are people's thought on this? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Li >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Software Engineer >>>>>> Netflix >>>>>> >>>>>