Thank you Chris, this looks great. Would you mind share a google doc version of the proposal? I believe that's the preferred way of discussing proposals (Other people please correct me if I am wrong).
Li On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote: > Hi, > > As promised I’ve raised SPARK-27463 for this. > > All feedback welcome! > > Chris > > On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote: > > Thanks Bryan and Li, that is much appreciated. Hopefully should have the > SPIP ready in the next couple of days. > > thanks, > > Chris > > > > > On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com> wrote: > >> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too >> difficult to extend the currently functionality to transfer multiple >> DataFrames. For the SPIP, I would keep it more high-level and I don't >> think it's necessary to include details of the Python worker, we can hash >> that out after the SPIP is approved. >> >> Bryan >> >> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com> wrote: >> >>> Thanks Chris, look forward to it. >>> >>> I think sending multiple dataframes to the python worker requires some >>> changes but shouldn't be too difficult. We can probably sth like: >>> >>> >>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat] >>> >>> In: >>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70 >>> >>> And have ArrowPythonRunner take multiple input iterator/schema. >>> >>> Li >>> >>> >>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote: >>> >>>> Hi, >>>> >>>> Just to say, I really do think this is useful and am currently working >>>> on a SPIP to formally propose this. One concern I do have, however, is that >>>> the current arrow serialization code is tied to passing through a single >>>> dataframe as the udf parameter and so any modification to allow multiple >>>> dataframes may not be straightforward. If anyone has any ideas as to how >>>> this might be achieved in an elegant manner I’d be happy to hear them! >>>> >>>> Thanks, >>>> >>>> Chris >>>> >>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote: >>>> >>>> Thank you both for the reply. Chris and I have very similar use cases >>>> for cogroup. >>>> >>>> One of the goals for groupby apply + pandas UDF was to avoid things >>>> like collect list and reshaping data between Spark and Pandas. Cogroup >>>> feels very similar and can be an extension to the groupby apply + pandas >>>> UDF functionality. >>>> >>>> I wonder if any PMC/committers have any thoughts/opinions on this? >>>> >>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote: >>>> >>>>> Just to add to this I’ve also implemented my own cogroup previously >>>>> and would welcome a cogroup for datafame. >>>>> >>>>> My specific use case was that I had a large amount of time series >>>>> data. Spark has very limited support for time series (specifically as-of >>>>> joins), but pandas has good support. >>>>> >>>>> My solution was to take my two dataframes and perform a group by and >>>>> collect list on each. The resulting arrays could be passed into a udf >>>>> where >>>>> they could be marshaled into a couple of pandas dataframes and processed >>>>> using pandas excellent time series functionality. >>>>> >>>>> If cogroup was available natively on dataframes this would have been a >>>>> bit nicer. The ideal would have been some pandas udf version of cogroup >>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup! >>>>> >>>>> Chris >>>>> >>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jonathan.wina...@gmail.com> >>>>> wrote: >>>>> >>>>> For info, in our team have defined our own cogroup on dataframe in the >>>>> past on different projects using different methods (rdd[row] based or >>>>> union >>>>> all collect list based). >>>>> >>>>> I might be biased, but find the approach very useful in project to >>>>> simplify and speed up transformations, and remove a lot of intermediate >>>>> stages (distinct + join => just cogroup). >>>>> >>>>> Plus spark 2.4 introduced a lot of new operator for nested data. >>>>> That's a win! >>>>> >>>>> >>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote: >>>>> >>>>>> I am wondering do other people have opinion/use case on cogroup? >>>>>> >>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote: >>>>>> >>>>>>> Alessandro, >>>>>>> >>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality >>>>>>> full outer join" . >>>>>>> >>>>>>> Two issues I see with equity outer join is: >>>>>>> (1) equity outer join will give n * m rows for each key (n and m >>>>>>> being the corresponding number of rows in df1 and df2 for each key) >>>>>>> (2) User needs to do some extra processing to transform n * m back >>>>>>> to the desired shape (two sub dataframes with n and m rows) >>>>>>> >>>>>>> I think full outer join is an inefficient way to implement cogroup. >>>>>>> If the end goal is to have two separate dataframes for each key, why >>>>>>> joining them first and then unjoin them? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando < >>>>>>> alessandro.solima...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> I fail to see how an equi-join on the key columns is different than >>>>>>>> the cogroup you propose. >>>>>>>> >>>>>>>> I think the accepted answer can shed some light: >>>>>>>> >>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark >>>>>>>> >>>>>>>> Now you apply an udf on each iterable, one per key value (obtained >>>>>>>> with cogroup). >>>>>>>> >>>>>>>> You can achieve the same by: >>>>>>>> 1) join df1 and df2 on the key you want, >>>>>>>> 2) apply "groupby" on such key >>>>>>>> 3) finally apply a udaf (you can have a look here if you are not >>>>>>>> familiar with them >>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), >>>>>>>> that will process each group "in isolation". >>>>>>>> >>>>>>>> HTH, >>>>>>>> Alessandro >>>>>>>> >>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it >>>>>>>>> has been very helpful in integrating Spark with our existing >>>>>>>>> pandas-heavy >>>>>>>>> libraries. >>>>>>>>> >>>>>>>>> Recently, we have found more and more cases where >>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group >>>>>>>>> two >>>>>>>>> dataframes by the same key, and apply a function which takes two >>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels >>>>>>>>> very >>>>>>>>> much like the "cogroup" operation in the RDD API. >>>>>>>>> >>>>>>>>> It would be great to be able to do sth like this: (not actual API, >>>>>>>>> just to explain the use case): >>>>>>>>> >>>>>>>>> @pandas_udf(return_schema, ...) >>>>>>>>> def my_udf(pdf1, pdf2) >>>>>>>>> # pdf1 and pdf2 are the subset of the original dataframes >>>>>>>>> that is associated with a particular key >>>>>>>>> result = ... # some code that uses pdf1 and pdf2 >>>>>>>>> return result >>>>>>>>> >>>>>>>>> df3 = cogroup(df1, df2, key='some_key').apply(my_udf) >>>>>>>>> >>>>>>>>> I have searched around the problem and some people have suggested >>>>>>>>> to join the tables first. However, it's often not the same pattern >>>>>>>>> and hard >>>>>>>>> to get it to work by using joins. >>>>>>>>> >>>>>>>>> I wonder what are people's thought on this? >>>>>>>>> >>>>>>>>> Li >>>>>>>>> >>>>>>>>>