I just tested, your observation in DataFrame API is correct. It behaves
weirdly in case of multiple column join.  (Maybe we should report a Jira?)

Solution: You can go back to our good old composite key field concatenation
method. Not ideal, but workaround. (Of course you can use realSQL as well,
as shown below)

set up Data:

    a =
[[1993,1,100],[1993,2,200],[1994,1,1000],[1994,3,3000],[2000,1,10000]]
    b = [[1993,1,"A"],[1994,1,"AA"],[2000,1,"AAA"]]
    YM1 = sc.parallelize(a).map(lambda tup: Row(yr=int(tup[0]),mn =
int(tup[1]), price = int(tup[2]),joiningKey=str(tup[0])+"~"+str(tup[1])))
    YM2 = sc.parallelize(b).map(lambda tup: Row(yr=int(tup[0]),mn =
int(tup[1]), name = str(tup[2]),joiningKey=str(tup[0])+"~"+str(tup[1])))
    print YM1.collect()
    print YM2.collect()

    YM1DF = ssc.createDataFrame(YM1)
    YM2DF = ssc.createDataFrame(YM2)

    print YM1DF.printSchema()
    print YM2DF.printSchema()
   ------------------------------------------------
This DOES NOT WORK ---

    YMJN = YM1DF.join(YM2DF,YM1DF.yr==YM2DF.yr and
YM1DF.mn==YM2DF.mn,"inner")
    print YMJN.printSchema()
    for l in YMJN.collect():
        print l

Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1',
mn=1, name=u'A', yr=1993)
Row(joiningKey=u'1994~1', mn=1, price=100, yr=1994, joiningKey=u'1994~1',
mn=1, name=u'AA', yr=1994)
Row(joiningKey=u'2000~1', mn=1, price=100, yr=2000, joiningKey=u'2000~1',
mn=1, name=u'AAA', yr=2000)
Row(joiningKey=u'1993~1', mn=1, price=1000, yr=1993, joiningKey=u'1993~1',
mn=1, name=u'A', yr=1993)
Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1',
mn=1, name=u'AA', yr=1994)
Row(joiningKey=u'2000~1', mn=1, price=1000, yr=2000, joiningKey=u'2000~1',
mn=1, name=u'AAA', yr=2000)
Row(joiningKey=u'1993~1', mn=1, price=10000, yr=1993, joiningKey=u'1993~1',
mn=1, name=u'A', yr=1993)
Row(joiningKey=u'1994~1', mn=1, price=10000, yr=1994, joiningKey=u'1994~1',
mn=1, name=u'AA', yr=1994)
Row(joiningKey=u'2000~1', mn=1, price=10000, yr=2000, joiningKey=u'2000~1',
mn=1, name=u'AAA', yr=2000)

-----------------------------------------

SQL Solution - works as expected

    YM1DF.registerTempTable("ymdf1")
    YM2DF.registerTempTable("ymdf2")
    YMJNS = ssc.sql("select * from ymdf1 inner join ymdf2 on
ymdf1.yr=ymdf2.yr and ymdf1.mn=ymdf2.mn")
    print YMJNS.printSchema()
    for l in YMJNS.collect():
        print l

Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1',
mn=1, name=u'AA', yr=1994)
Row(joiningKey=u'2000~1', mn=1, price=10000, yr=2000, joiningKey=u'2000~1',
mn=1, name=u'AAA', yr=2000)
Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1',
mn=1, name=u'A', yr=1993)

-----------------------------------------------------------------

Field concat method, works as well....

    YMJNA = YM1DF.join(YM2DF,YM1DF.joiningKey==YM2DF.joiningKey,"inner")
    print YMJNA.printSchema()
    for l in YMJNA.collect():
        print l

Row(joiningKey=u'1994~1', mn=1, price=1000, yr=1994, joiningKey=u'1994~1',
mn=1, name=u'AA', yr=1994)
Row(joiningKey=u'1993~1', mn=1, price=100, yr=1993, joiningKey=u'1993~1',
mn=1, name=u'A', yr=1993)
Row(joiningKey=u'2000~1', mn=1, price=10000, yr=2000, joiningKey=u'2000~1',
mn=1, name=u'AAA', yr=2000)

On Sat, Apr 25, 2015 at 10:18 AM, Ali Bajwa <ali.ba...@gmail.com> wrote:

> Any ideas on this? Any sample code to join 2 data frames on two columns?
>
> Thanks
> Ali
>
> On Apr 23, 2015, at 1:05 PM, Ali Bajwa <ali.ba...@gmail.com> wrote:
>
> > Hi experts,
> >
> > Sorry if this is a n00b question or has already been answered...
> >
> > Am trying to use the data frames API in python to join 2 dataframes
> > with more than 1 column. The example I've seen in the documentation
> > only shows a single column - so I tried this:
> >
> > ****Example code****
> >
> > import pandas as pd
> > from pyspark.sql import SQLContext
> > hc = SQLContext(sc)
> > A = pd.DataFrame({'year': ['1993', '2005', '1994'], 'month': ['5',
> > '12', '12'], 'value': [100, 200, 300]})
> > a = hc.createDataFrame(A)
> > B = pd.DataFrame({'year': ['1993', '1993'], 'month': ['12', '12'],
> > 'value': [101, 102]})
> > b = hc.createDataFrame(B)
> >
> > print "Pandas"  # try with Pandas
> > print A
> > print B
> > print pd.merge(A, B, on=['year', 'month'], how='inner')
> >
> > print "Spark"
> > print a.toPandas()
> > print b.toPandas()
> > print a.join(b, a.year==b.year and a.month==b.month, 'inner').toPandas()
> >
> >
> > *****Output****
> >
> > Pandas
> >  month  value  year
> > 0     5    100  1993
> > 1    12    200  2005
> > 2    12    300  1994
> >
> >  month  value  year
> > 0    12    101  1993
> > 1    12    102  1993
> >
> > Empty DataFrame
> >
> > Columns: [month, value_x, year, value_y]
> >
> > Index: []
> >
> > Spark
> >  month  value  year
> > 0     5    100  1993
> > 1    12    200  2005
> > 2    12    300  1994
> >
> >  month  value  year
> > 0    12    101  1993
> > 1    12    102  1993
> >
> > month  value  year month  value  year
> > 0    12    200  2005    12    102  1993
> > 1    12    200  2005    12    101  1993
> > 2    12    300  1994    12    102  1993
> > 3    12    300  1994    12    101  1993
> >
> > It looks like Spark returns some results where an inner join should
> > return nothing.
> >
> > Am I doing the join with two columns in the wrong way? If yes, what is
> > the right syntax for this?
> >
> > Thanks!
> > Ali
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to