Thanks Chris, look forward to it. I think sending multiple dataframes to the python worker requires some changes but shouldn't be too difficult. We can probably sth like:
[numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat] In: https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70 And have ArrowPythonRunner take multiple input iterator/schema. Li On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote: > Hi, > > Just to say, I really do think this is useful and am currently working on > a SPIP to formally propose this. One concern I do have, however, is that > the current arrow serialization code is tied to passing through a single > dataframe as the udf parameter and so any modification to allow multiple > dataframes may not be straightforward. If anyone has any ideas as to how > this might be achieved in an elegant manner I’d be happy to hear them! > > Thanks, > > Chris > > On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote: > > Thank you both for the reply. Chris and I have very similar use cases for > cogroup. > > One of the goals for groupby apply + pandas UDF was to avoid things like > collect list and reshaping data between Spark and Pandas. Cogroup feels > very similar and can be an extension to the groupby apply + pandas UDF > functionality. > > I wonder if any PMC/committers have any thoughts/opinions on this? > > On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote: > >> Just to add to this I’ve also implemented my own cogroup previously and >> would welcome a cogroup for datafame. >> >> My specific use case was that I had a large amount of time series data. >> Spark has very limited support for time series (specifically as-of joins), >> but pandas has good support. >> >> My solution was to take my two dataframes and perform a group by and >> collect list on each. The resulting arrays could be passed into a udf where >> they could be marshaled into a couple of pandas dataframes and processed >> using pandas excellent time series functionality. >> >> If cogroup was available natively on dataframes this would have been a >> bit nicer. The ideal would have been some pandas udf version of cogroup >> that gave me a pandas dataframe for each spark dataframe in the cogroup! >> >> Chris >> >> On 26 Feb 2019, at 00:38, Jonathan Winandy <jonathan.wina...@gmail.com> >> wrote: >> >> For info, in our team have defined our own cogroup on dataframe in the >> past on different projects using different methods (rdd[row] based or union >> all collect list based). >> >> I might be biased, but find the approach very useful in project to >> simplify and speed up transformations, and remove a lot of intermediate >> stages (distinct + join => just cogroup). >> >> Plus spark 2.4 introduced a lot of new operator for nested data. That's a >> win! >> >> >> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote: >> >>> I am wondering do other people have opinion/use case on cogroup? >>> >>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote: >>> >>>> Alessandro, >>>> >>>> Thanks for the reply. I assume by "equi-join", you mean "equality full >>>> outer join" . >>>> >>>> Two issues I see with equity outer join is: >>>> (1) equity outer join will give n * m rows for each key (n and m being >>>> the corresponding number of rows in df1 and df2 for each key) >>>> (2) User needs to do some extra processing to transform n * m back to >>>> the desired shape (two sub dataframes with n and m rows) >>>> >>>> I think full outer join is an inefficient way to implement cogroup. If >>>> the end goal is to have two separate dataframes for each key, why joining >>>> them first and then unjoin them? >>>> >>>> >>>> >>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando < >>>> alessandro.solima...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> I fail to see how an equi-join on the key columns is different than >>>>> the cogroup you propose. >>>>> >>>>> I think the accepted answer can shed some light: >>>>> >>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark >>>>> >>>>> Now you apply an udf on each iterable, one per key value (obtained >>>>> with cogroup). >>>>> >>>>> You can achieve the same by: >>>>> 1) join df1 and df2 on the key you want, >>>>> 2) apply "groupby" on such key >>>>> 3) finally apply a udaf (you can have a look here if you are not >>>>> familiar with them >>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), >>>>> that will process each group "in isolation". >>>>> >>>>> HTH, >>>>> Alessandro >>>>> >>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> We have been using Pyspark's groupby().apply() quite a bit and it has >>>>>> been very helpful in integrating Spark with our existing pandas-heavy >>>>>> libraries. >>>>>> >>>>>> Recently, we have found more and more cases where groupby().apply() >>>>>> is not sufficient - In some cases, we want to group two dataframes by the >>>>>> same key, and apply a function which takes two pd.DataFrame (also >>>>>> returns a >>>>>> pd.DataFrame) for each key. This feels very much like the "cogroup" >>>>>> operation in the RDD API. >>>>>> >>>>>> It would be great to be able to do sth like this: (not actual API, >>>>>> just to explain the use case): >>>>>> >>>>>> @pandas_udf(return_schema, ...) >>>>>> def my_udf(pdf1, pdf2) >>>>>> # pdf1 and pdf2 are the subset of the original dataframes that >>>>>> is associated with a particular key >>>>>> result = ... # some code that uses pdf1 and pdf2 >>>>>> return result >>>>>> >>>>>> df3 = cogroup(df1, df2, key='some_key').apply(my_udf) >>>>>> >>>>>> I have searched around the problem and some people have suggested to >>>>>> join the tables first. However, it's often not the same pattern and hard >>>>>> to >>>>>> get it to work by using joins. >>>>>> >>>>>> I wonder what are people's thought on this? >>>>>> >>>>>> Li >>>>>> >>>>>>