It would certainly be useful for our domain to have some sort of native cbind(). Is there a fundamental disapproval of adding that functionality, or is it just a matter of nobody implementing it?
On Wed, Apr 20, 2022 at 16:28 Sean Owen <sro...@gmail.com> wrote: > Good lead, pandas on Spark concat() is worth trying. It looks like it uses > a join, but not 100% sure from the source. > The SQL concat() function is indeed a different thing. > > On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen <bjornjorgen...@gmail.com> > wrote: > >> Sorry for asking. But why does`t concat work? >> >> Pandas on spark have ps.concat >> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299> >> which >> takes 2 dataframes and concat them to 1 dataframe. >> It seems >> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat> >> like the pyspark version takes 2 columns and concat it to one column. >> >> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen <sro...@gmail.com>: >> >>> cbind? yeah though the answer is typically a join. I don't know if >>> there's a better option in a SQL engine, as SQL doesn't have anything to >>> offer except join and pivot either (? right?) >>> Certainly, the dominant data storage paradigm is wide tables, whereas >>> you're starting with effectively a huge number of tiny slim tables, which >>> is the impedance mismatch here. >>> >>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson <aedav...@ucsc.edu> >>> wrote: >>> >>>> Thanks Sean >>>> >>>> >>>> >>>> I imagine this is a fairly common problem in data science. Any idea how >>>> other solve? For example I wonder if running join something like BigQuery >>>> might work better? I do not know much about the implementation. >>>> >>>> >>>> >>>> No one tool will solve all problems. Once I get the matrix I think it >>>> spark will work well for our need >>>> >>>> >>>> >>>> Kind regards >>>> >>>> >>>> >>>> Andy >>>> >>>> >>>> >>>> *From: *Sean Owen <sro...@gmail.com> >>>> *Date: *Monday, April 18, 2022 at 6:58 PM >>>> *To: *Andrew Davidson <aedav...@ucsc.edu> >>>> *Cc: *"user @spark" <user@spark.apache.org> >>>> *Subject: *Re: How is union() implemented? Need to implement column >>>> bind >>>> >>>> >>>> >>>> A join is the natural answer, but this is a 10114-way join, which >>>> probably chokes readily just to even plan it, let alone all the shuffling >>>> and shuffling of huge data. You could tune your way out of it maybe, but >>>> not optimistic. It's just huge. >>>> >>>> >>>> >>>> You could go off-road and lower-level to take advantage of the >>>> structure of the data. You effectively want "column bind". There is no such >>>> operation in Spark. (union is 'row bind'.) You could do this with >>>> zipPartition, which is in the RDD API, and to my surprise, not in the >>>> Python API but exists in Scala. And R (!). If you can read several RDDs of >>>> data, you can use this method to pair all their corresponding values and >>>> ultimately get rows of 10114 values out. In fact that is how sparklyr >>>> implements cbind on Spark, FWIW: >>>> https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html >>>> >>>> >>>> >>>> The issue I see is that you can only zip a few at a time; you don't >>>> want to zip 10114 of them. Perhaps you have to do that iteratively, and I >>>> don't know if that is going to face the same issues with huge huge plans. >>>> >>>> >>>> >>>> I like the pivot idea. If you can read the individual files as data >>>> rows (maybe list all the file names, parallelize with Spark, write a UDF >>>> that reads the data for that file to generate the rows). If you can emit >>>> (file, index, value) and groupBy index, pivot on file (I think?) that >>>> should be about it? I think it doesn't need additional hashing or whatever. >>>> Not sure how fast it is but that seems more direct than the join, as well. >>>> >>>> >>>> >>>> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson >>>> <aedav...@ucsc.edu.invalid> wrote: >>>> >>>> Hi have a hard problem >>>> >>>> >>>> >>>> I have 10114 column vectors each in a separate file. The file has 2 >>>> columns, the row id, and numeric values. The row ids are identical and in >>>> sort order. All the column vectors have the same number of rows. There are >>>> over 5 million rows. I need to combine them into a single table. The row >>>> ids are very long strings. The column names are about 20 chars long. >>>> >>>> >>>> >>>> My current implementation uses join. This takes a long time on a >>>> cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often >>>> crashes. I mean totally dead start over. Checkpoints do not seem help, It >>>> still crashes and need to be restarted from scratch. What is really >>>> surprising is the final file size is only 213G ! The way got the file >>>> was to copy all the column vectors to a single BIG IRON machine and used >>>> unix cut and paste. Took about 44 min to run once I got all the data moved >>>> around. It was very tedious and error prone. I had to move a lot data >>>> around. Not a particularly reproducible process. I will need to rerun >>>> this three more times on different data sets of about the same size >>>> >>>> >>>> >>>> I noticed that spark has a union function(). It implements row bind. >>>> Any idea how it is implemented? Is it just map reduce under the covers? >>>> >>>> >>>> >>>> My thought was >>>> >>>> 1. load each col vector >>>> >>>> 2. maybe I need to replace the really long row id strings with >>>> integers >>>> >>>> 3. convert column vectors into row vectors using piviot (Ie >>>> matrix transpose.) >>>> >>>> 4. union all the row vectors into a single table >>>> >>>> 5. piviot the table back so I have the correct column vectors >>>> >>>> >>>> >>>> I could replace the row ids and column name with integers if needed, >>>> and restore them later >>>> >>>> >>>> >>>> Maybe I would be better off using many small machines? I assume memory >>>> is the limiting resource not cpu. I notice that memory usage will reach >>>> 100%. I added several TB’s of local ssd. I am not convinced that spark is >>>> using the local disk >>>> >>>> >>>> >>>> >>>> >>>> will this perform better than join? >>>> >>>> >>>> >>>> · The rows before the final pivot will be very very wide (over 5 >>>> million columns) >>>> >>>> · There will only be 10114 rows before the pivot >>>> >>>> >>>> >>>> I assume the pivots will shuffle all the data. I assume the Colum >>>> vectors are trivial. The file table pivot will be expensive however will >>>> only need to be done once >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> Comments and suggestions appreciated >>>> >>>> >>>> >>>> Andy >>>> >>>> >>>> >>>> >>>> >>>> >> >> -- >> Bjørn Jørgensen >> Vestre Aspehaug 4 >> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail&source=g>, >> 6010 Ålesund >> Norge >> >> +47 480 94 297 >> > -- It's dark in this basement.