Dear all, after some fiddling I have arrived at this solution:
/** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val joinedDF = leftDF.as('left).join(rightDF.as('right), leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter") import joinedDF.sqlContext.implicits._ val leftColumns = leftDF.columns .map((cn: String) => $"left.$cn") val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)) .map((cn: String) => $"right.$cn") joinedDF.select( leftColumns ++ rightColumns: _*) } Comments welcome!!!! Alternatives I tried: - Not Working: If at least the right alias for rightDF is present, one could try joinedDF.drop("right." + columnname) but his does not work (no column is dropped). Unfortunately, drop does not support arguments of type Column / ColumnNames. *@Michael: Should I create a feature request in Jira for drop supporting Columns?* - Working: Without using aliases via as(...), but using column renaming instead: rightDF.withColumnRenamed( communColumnName, "right_" + commoncolumnName) to rename the right dataframe column and then do the join criterion as leftDF(commonColumnName) === rightDF("right_" + commonColumnName) In my opinion not so neat. Opinions? Things I observed: - Column handling does not seem consistent - select(....) supports alias, while drop( ... ) only supports strings. - DataFrame.apply( .... ) and DataFrame.col do also not support alias. - Thus the only way to handly ambiguous columnNames is via select at the moment. Can someone please confirm this! - Alias information is not displayed via DataFrame.printSchema. (or at least I did not find a way of how to) Cheers, Martin 2015-07-31 22:51 GMT+02:00 Martin Senne <martin.se...@googlemail.com>: > Dear Michael, dear all, > > a minimal example is listed below. > > After some further analysis I could figure out, that the problem is > related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use > columns of the left and right dataframes when doing the select on the > joined table. > > /** > * Customized left outer join on common column. > */ > def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: > DataFrame, commonColumnName: String): DataFrame = { > > val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn)) > val rightColumns = rightDF.columns.filterNot(cn => > cn.equals(commonColumnName)).map(cn => rightDF(cn)) > > leftDF.join(rightDF, leftDF(commonColumnName) === > rightDF(commonColumnName), "leftouter") > .select(leftColumns ++ rightColumns: _*) > } > > As the column "y" of the right table has nullable=false, this is then also > transferred to the joined-Table y-Column, as I use rightDF( "y" ). > > Thus, I need to use columns of the joined table for the select. > > *Question now: The joined table has column names "x", "a", "x", "y". How do I > discard the second x column?* > > All my approaches failed (assuming here, that joinedDF is the joined > DataFrame. > > > - Using joinedDFdrop( "x" ) discards both "x" columns. > - Using joinedDF("x") does not work as it is ambigious > - Also using rightDF.as( "aliasname") in order to differentiate the > column "x" (from left DataFrame) with "x" (from right DataFrame) did not > work out, as I found no way as use select( $"aliasname.x") really > programmatically. Could someone sketch the code? > > Any help welcome, thanks > > > Martin > > > > ======================================== > import org.apache.spark.sql.types.{StructField, StructType} > import org.apache.spark.{SparkContext, SparkConf} > import org.apache.spark.sql.{DataFrame, SQLContext} > > object OtherEntities { > > case class Record( x:Int, a: String) > case class Mapping( x: Int, y: Int ) > > val records = Seq( Record(1, "hello"), Record(2, "bob")) > val mappings = Seq( Mapping(2, 5) ) > } > > object MinimalShowcase { > > /** > * Customized left outer join on common column. > */ > def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: > DataFrame, commonColumnName: String): DataFrame = { > > val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn)) > val rightColumns = rightDF.columns.filterNot(cn => > cn.equals(commonColumnName)).map(cn => rightDF(cn)) > > leftDF.join(rightDF, leftDF(commonColumnName) === > rightDF(commonColumnName), "leftouter") > .select(leftColumns ++ rightColumns: _*) > } > > > /** > * Set, if a column is nullable. > * @param df source DataFrame > * @param cn is the column name to change > * @param nullable is the flag to set, such that the column is either > nullable or not > */ > def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) > : DataFrame = { > > val schema = df.schema > val newSchema = StructType(schema.map { > case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, > nullable = nullable, m) > case y: StructField => y > }) > df.sqlContext.createDataFrame( df.rdd, newSchema) > } > > > def main (args: Array[String]) { > val conf = new SparkConf() > .setAppName("Minimal") > .setMaster("local[*]") > > val sc = new SparkContext(conf) > val sqlContext = new SQLContext(sc) > // used to implicitly convert an RDD to a DataFrame. > import sqlContext.implicits._ > > val recordDF = sc.parallelize(OtherEntities.records, 4).toDF() > val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() > val mappingWithNullDF = setNullableStateOfColumn(mappingDF, "y", true) > > val joinedDF = recordDF.join(mappingDF, recordDF("x") === mappingDF("x"), > "leftouter") > println("joinedDF:") > joinedDF.show > joinedDF.printSchema > joinedDF.filter(joinedDF("y").isNotNull).show > > // joinedDF: > // +-+-----+----+----+ > // |x| a| x| y| > // +-+-----+----+----+ > // |1|hello|null|null| > // |2| bob| 2| 5| > // +-+-----+----+----+ > // > // root > // |-- x: integer (nullable = false) > // |-- a: string (nullable = true) > // |-- x: integer (nullable = true) > // |-- y: integer (nullable = true) > // > // +-+---+-+-+ > // |x| a|x|y| > // +-+---+-+-+ > // |2|bob|2|5| > // +-+---+-+-+ > > > val extrajoinedDF = leftOuterJoinWithRemovalOfEqualColumn(recordDF, > mappingDF, "x") > println("extrajoinedDF:") > extrajoinedDF.show > extrajoinedDF.printSchema > extrajoinedDF.filter(extrajoinedDF("y").isNotNull).show > > // extrajoinedDF: > // +-+-----+----+ > // |x| a| y| > // +-+-----+----+ > // |1|hello|null| > // |2| bob| 5| > // +-+-----+----+ > // > // root > // |-- x: integer (nullable = false) > // |-- a: string (nullable = true) > // |-- y: integer (nullable = false) > // > // +-+-----+----+ > // |x| a| y| > // +-+-----+----+ > // |1|hello|null| > // |2| bob| 5| > // +-+-----+----+ > > > > val joined2DF = recordDF.join(mappingWithNullDF, recordDF("x") === > mappingWithNullDF("x"), "leftouter") > println("joined2DF:") > joined2DF.show > joined2DF.printSchema > joined2DF.filter(joined2DF("y").isNotNull).show > > // joined2DF: > // +-+-----+----+----+ > // |x| a| x| y| > // +-+-----+----+----+ > // |1|hello|null|null| > // |2| bob| 2| 5| > // +-+-----+----+----+ > // > // root > // |-- x: integer (nullable = false) > // |-- a: string (nullable = true) > // |-- x: integer (nullable = true) > // |-- y: integer (nullable = true) > // > // +-+---+-+-+ > // |x| a|x|y| > // +-+---+-+-+ > // |2|bob|2|5| > // +-+---+-+-+ > > } > } > > > > 2015-07-31 1:56 GMT+02:00 Martin Senne <martin.se...@googlemail.com>: > >> Dear Michael, dear all, >> >> distinguishing those records that have a match in mapping from those that >> don't is the crucial point. >> >> Record(x : Int, a: String) >> Mapping(x: Int, y: Int) >> >> Thus >> >> Record(1, "hello") >> Record(2, "bob") >> Mapping(2, 5) >> >> yield (2, "bob", 5) on an inner join. >> BUT I'm also interested in (1, "hello", null) as there is no counterpart >> in mapping (this is the left outer join part) >> >> I need to distinguish 1 and 2 because of later inserts (case 1, hello) or >> updates (case 2, bon). >> >> Cheers and thanks, >> >> Martin >> >> Am 30.07.2015 22:58 schrieb "Michael Armbrust" <mich...@databricks.com>: >> > >> > Perhaps I'm missing what you are trying to accomplish, but if you'd >> like to avoid the null values do an inner join instead of an outer join. >> > >> > Additionally, I'm confused about how the result >> of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values >> in the column y. This doesn't really have anything to do with nullable, >> which is only a hint to the system so that we can avoid null checking when >> we know that there are no null values. If you provide the full code i can >> try and see if this is a bug. >> > >> > On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne < >> martin.se...@googlemail.com> wrote: >> >> >> >> Dear Michael, dear all, >> >> >> >> motivation: >> >> >> >> object OtherEntities { >> >> >> >> case class Record( x:Int, a: String) >> >> case class Mapping( x: Int, y: Int ) >> >> >> >> val records = Seq( Record(1, "hello"), Record(2, "bob")) >> >> val mappings = Seq( Mapping(2, 5) ) >> >> } >> >> >> >> Now I want to perform an left outer join on records and mappings (with >> the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") .... >> shorthand is in leftOuterJoinWithRemovalOfEqualColumn >> >> >> >> val sqlContext = new SQLContext(sc) >> >> // used to implicitly convert an RDD to a DataFrame. >> >> import sqlContext.implicits._ >> >> >> >> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF() >> >> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() >> >> >> >> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( >> mappingDF, "x") >> >> >> >> joinedDF.filter(joinedDF("y").isNotNull).show >> >> >> >> >> >> Currently, the output is >> >> >> >> >> +-+-----+----+ >> >> >> |x| a| y| >> >> +-+-----+----+ >> >> |1|hello|null| >> >> |2| bob| 5| >> >> +-+-----+----+ >> >> >> >> instead of >> >> >> >> >> +-+---+-+ >> >> >> |x| a|y| >> >> +-+---+-+ >> >> |2|bob|5| >> >> +-+---+-+ >> >> >> >> The last output can be achieved by the method of changing >> nullable=false to nullable=true described in my first post. >> >> >> >> Thus, I need this schema modification as to make outer joins work. >> >> >> >> Cheers and thanks, >> >> >> >> Martin >> >> >> >> >> >> >> >> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mich...@databricks.com>: >> >>> >> >>> We don't yet updated nullability information based on predicates as >> we don't actually leverage this information in many places yet. Why do you >> want to update the schema? >> >>> >> >>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 < >> martin.se...@googlemail.com> wrote: >> >>>> >> >>>> Hi all, >> >>>> >> >>>> 1. *Columns in dataframes can be nullable and not nullable. Having a >> >>>> nullable column of Doubles, I can use the following Scala code to >> filter all >> >>>> "non-null" rows:* >> >>>> >> >>>> val df = ..... // some code that creates a DataFrame >> >>>> df.filter( df("columnname").isNotNull() ) >> >>>> >> >>>> +-+-----+----+ >> >>>> |x| a| y| >> >>>> +-+-----+----+ >> >>>> |1|hello|null| >> >>>> |2| bob| 5| >> >>>> +-+-----+----+ >> >>>> >> >>>> root >> >>>> |-- x: integer (nullable = false) >> >>>> |-- a: string (nullable = true) >> >>>> |-- y: integer (nullable = true) >> >>>> >> >>>> And with the filter expression >> >>>> +-+---+-+ >> >>>> |x| a|y| >> >>>> +-+---+-+ >> >>>> |2|bob|5| >> >>>> +-+---+-+ >> >>>> >> >>>> >> >>>> Unfortunetaly and while this is a true for a nullable column >> (according to >> >>>> df.printSchema), it is not true for a column that is not nullable: >> >>>> >> >>>> >> >>>> +-+-----+----+ >> >>>> |x| a| y| >> >>>> +-+-----+----+ >> >>>> |1|hello|null| >> >>>> |2| bob| 5| >> >>>> +-+-----+----+ >> >>>> >> >>>> root >> >>>> |-- x: integer (nullable = false) >> >>>> |-- a: string (nullable = true) >> >>>> |-- y: integer (nullable = false) >> >>>> >> >>>> +-+-----+----+ >> >>>> |x| a| y| >> >>>> +-+-----+----+ >> >>>> |1|hello|null| >> >>>> |2| bob| 5| >> >>>> +-+-----+----+ >> >>>> >> >>>> such that the output is not affected by the filter. Is this intended? >> >>>> >> >>>> >> >>>> 2. *What is the cheapest (in sense of performance) to turn a >> non-nullable >> >>>> column into a nullable column? >> >>>> A came uo with this:* >> >>>> >> >>>> /** >> >>>> * Set, if a column is nullable. >> >>>> * @param df source DataFrame >> >>>> * @param cn is the column name to change >> >>>> * @param nullable is the flag to set, such that the column is >> either >> >>>> nullable or not >> >>>> */ >> >>>> def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: >> >>>> Boolean) : DataFrame = { >> >>>> >> >>>> val schema = df.schema >> >>>> val newSchema = StructType(schema.map { >> >>>> case StructField( c, t, _, m) if c.equals(cn) => StructField( >> c, t, >> >>>> nullable = nullable, m) >> >>>> case y: StructField => y >> >>>> }) >> >>>> df.sqlContext.createDataFrame( df.rdd, newSchema) >> >>>> } >> >>>> >> >>>> Is there a cheaper solution? >> >>>> >> >>>> 3. *Any comments?* >> >>>> >> >>>> Cheers and thx in advance, >> >>>> >> >>>> Martin >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> -- >> >>>> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html >> >>>> Sent from the Apache Spark User List mailing list archive at >> Nabble.com. >> >>>> >> >>>> --------------------------------------------------------------------- >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >>>> For additional commands, e-mail: user-h...@spark.apache.org >> >>>> >> >>> >> >> >> > >> >> >