Hi Chris, Thanks! The permission to the google doc is maybe not set up properly. I cannot view the doc by default.
Li On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk> wrote: > I've updated the jira so that the main body is now inside a google doc. > Anyone should be able to comment- if you want/need write access please drop > me a mail and I can add you. > > Ryan- regarding your specific point regarding why I'm not proposing to add > this to the Scala API, I think the main point is that Scala users can > already use Cogroup for Datasets. For Scala this is probably a better > solution as (as far as I know) there is no Scala DataFrame library that > could be used in place of Pandas for manipulating local DataFrames. As a > result you'd probably be left with dealing with Iterators of Row objects, > which almost certainly isn't what you'd want. This is similar to the > existing grouped map Pandas Udfs for which there is no equivalent Scala Api. > > I do think there might be a place for allowing a (Scala) DataSet Cogroup > to take some sort of grouping expression as the grouping key (this would > mean that you wouldn't have to marshal the key into a JVM object and could > possible lend itself to some catalyst optimisations) but I don't think that > this should be done as part of this SPIP. > > thanks, > > Chris > > On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote: > >> I agree, it would be great to have a document to comment on. >> >> The main thing that stands out right now is that this is only for PySpark >> and states that it will not be added to the Scala API. Why not make this >> available since most of the work would be done? >> >> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xell...@gmail.com> wrote: >> >>> Thank you Chris, this looks great. >>> >>> Would you mind share a google doc version of the proposal? I believe >>> that's the preferred way of discussing proposals (Other people please >>> correct me if I am wrong). >>> >>> Li >>> >>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote: >>> >>>> Hi, >>>> >>>> As promised I’ve raised SPARK-27463 for this. >>>> >>>> All feedback welcome! >>>> >>>> Chris >>>> >>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote: >>>> >>>> Thanks Bryan and Li, that is much appreciated. Hopefully should have >>>> the SPIP ready in the next couple of days. >>>> >>>> thanks, >>>> >>>> Chris >>>> >>>> >>>> >>>> >>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com> wrote: >>>> >>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be >>>>> too difficult to extend the currently functionality to transfer multiple >>>>> DataFrames. For the SPIP, I would keep it more high-level and I don't >>>>> think it's necessary to include details of the Python worker, we can hash >>>>> that out after the SPIP is approved. >>>>> >>>>> Bryan >>>>> >>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com> wrote: >>>>> >>>>>> Thanks Chris, look forward to it. >>>>>> >>>>>> I think sending multiple dataframes to the python worker requires >>>>>> some changes but shouldn't be too difficult. We can probably sth like: >>>>>> >>>>>> >>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat] >>>>>> >>>>>> In: >>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70 >>>>>> >>>>>> And have ArrowPythonRunner take multiple input iterator/schema. >>>>>> >>>>>> Li >>>>>> >>>>>> >>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Just to say, I really do think this is useful and am currently >>>>>>> working on a SPIP to formally propose this. One concern I do have, >>>>>>> however, >>>>>>> is that the current arrow serialization code is tied to passing through >>>>>>> a >>>>>>> single dataframe as the udf parameter and so any modification to allow >>>>>>> multiple dataframes may not be straightforward. If anyone has any >>>>>>> ideas as >>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear >>>>>>> them! >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Chris >>>>>>> >>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote: >>>>>>> >>>>>>> Thank you both for the reply. Chris and I have very similar use >>>>>>> cases for cogroup. >>>>>>> >>>>>>> One of the goals for groupby apply + pandas UDF was to avoid things >>>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup >>>>>>> feels very similar and can be an extension to the groupby apply + pandas >>>>>>> UDF functionality. >>>>>>> >>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this? >>>>>>> >>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote: >>>>>>> >>>>>>>> Just to add to this I’ve also implemented my own cogroup previously >>>>>>>> and would welcome a cogroup for datafame. >>>>>>>> >>>>>>>> My specific use case was that I had a large amount of time series >>>>>>>> data. Spark has very limited support for time series (specifically >>>>>>>> as-of >>>>>>>> joins), but pandas has good support. >>>>>>>> >>>>>>>> My solution was to take my two dataframes and perform a group by >>>>>>>> and collect list on each. The resulting arrays could be passed into a >>>>>>>> udf >>>>>>>> where they could be marshaled into a couple of pandas dataframes and >>>>>>>> processed using pandas excellent time series functionality. >>>>>>>> >>>>>>>> If cogroup was available natively on dataframes this would have >>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of >>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in the >>>>>>>> cogroup! >>>>>>>> >>>>>>>> Chris >>>>>>>> >>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy < >>>>>>>> jonathan.wina...@gmail.com> wrote: >>>>>>>> >>>>>>>> For info, in our team have defined our own cogroup on dataframe in >>>>>>>> the past on different projects using different methods (rdd[row] based >>>>>>>> or >>>>>>>> union all collect list based). >>>>>>>> >>>>>>>> I might be biased, but find the approach very useful in project to >>>>>>>> simplify and speed up transformations, and remove a lot of intermediate >>>>>>>> stages (distinct + join => just cogroup). >>>>>>>> >>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data. >>>>>>>> That's a win! >>>>>>>> >>>>>>>> >>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I am wondering do other people have opinion/use case on cogroup? >>>>>>>>> >>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Alessandro, >>>>>>>>>> >>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean >>>>>>>>>> "equality full outer join" . >>>>>>>>>> >>>>>>>>>> Two issues I see with equity outer join is: >>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m >>>>>>>>>> being the corresponding number of rows in df1 and df2 for each key) >>>>>>>>>> (2) User needs to do some extra processing to transform n * m >>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows) >>>>>>>>>> >>>>>>>>>> I think full outer join is an inefficient way to implement >>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for each >>>>>>>>>> key, >>>>>>>>>> why joining them first and then unjoin them? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando < >>>>>>>>>> alessandro.solima...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> I fail to see how an equi-join on the key columns is different >>>>>>>>>>> than the cogroup you propose. >>>>>>>>>>> >>>>>>>>>>> I think the accepted answer can shed some light: >>>>>>>>>>> >>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark >>>>>>>>>>> >>>>>>>>>>> Now you apply an udf on each iterable, one per key value >>>>>>>>>>> (obtained with cogroup). >>>>>>>>>>> >>>>>>>>>>> You can achieve the same by: >>>>>>>>>>> 1) join df1 and df2 on the key you want, >>>>>>>>>>> 2) apply "groupby" on such key >>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not >>>>>>>>>>> familiar with them >>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), >>>>>>>>>>> that will process each group "in isolation". >>>>>>>>>>> >>>>>>>>>>> HTH, >>>>>>>>>>> Alessandro >>>>>>>>>>> >>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and >>>>>>>>>>>> it has been very helpful in integrating Spark with our existing >>>>>>>>>>>> pandas-heavy libraries. >>>>>>>>>>>> >>>>>>>>>>>> Recently, we have found more and more cases where >>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to >>>>>>>>>>>> group two >>>>>>>>>>>> dataframes by the same key, and apply a function which takes two >>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This >>>>>>>>>>>> feels very >>>>>>>>>>>> much like the "cogroup" operation in the RDD API. >>>>>>>>>>>> >>>>>>>>>>>> It would be great to be able to do sth like this: (not actual >>>>>>>>>>>> API, just to explain the use case): >>>>>>>>>>>> >>>>>>>>>>>> @pandas_udf(return_schema, ...) >>>>>>>>>>>> def my_udf(pdf1, pdf2) >>>>>>>>>>>> # pdf1 and pdf2 are the subset of the original dataframes >>>>>>>>>>>> that is associated with a particular key >>>>>>>>>>>> result = ... # some code that uses pdf1 and pdf2 >>>>>>>>>>>> return result >>>>>>>>>>>> >>>>>>>>>>>> df3 = cogroup(df1, df2, key='some_key').apply(my_udf) >>>>>>>>>>>> >>>>>>>>>>>> I have searched around the problem and some people have >>>>>>>>>>>> suggested to join the tables first. However, it's often not the >>>>>>>>>>>> same >>>>>>>>>>>> pattern and hard to get it to work by using joins. >>>>>>>>>>>> >>>>>>>>>>>> I wonder what are people's thought on this? >>>>>>>>>>>> >>>>>>>>>>>> Li >>>>>>>>>>>> >>>>>>>>>>>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> >