Dear Michael, dear all,
a minimal example is listed below.
After some further analysis I could figure out, that the problem is related
to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of
the left and right dataframes when doing the select on the joined table.
/**
* Customized left outer join on common column.
*/
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {
val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName)).map(cn => rightDF(cn))
leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), "leftouter")
.select(leftColumns ++ rightColumns: _*)
}
As the column "y" of the right table has nullable=false, this is then
also transferred to the joined-Table y-Column, as I use rightDF( "y"
).
Thus, I need to use columns of the joined table for the select.
*Question now: The joined table has column names "x", "a", "x", "y".
How do I discard the second x column?*
All my approaches failed (assuming here, that joinedDF is the joined DataFrame.
- Using joinedDFdrop( "x" ) discards both "x" columns.
- Using joinedDF("x") does not work as it is ambigious
- Also using rightDF.as( "aliasname") in order to differentiate the
column "x" (from left DataFrame) with "x" (from right DataFrame) did not
work out, as I found no way as use select( $"aliasname.x") really
programmatically. Could someone sketch the code?
Any help welcome, thanks
Martin
========================================
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SQLContext}
object OtherEntities {
case class Record( x:Int, a: String)
case class Mapping( x: Int, y: Int )
val records = Seq( Record(1, "hello"), Record(2, "bob"))
val mappings = Seq( Mapping(2, 5) )
}
object MinimalShowcase {
/**
* Customized left outer join on common column.
*/
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {
val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName)).map(cn => rightDF(cn))
leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), "leftouter")
.select(leftColumns ++ rightColumns: _*)
}
/**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is
either nullable or not
*/
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
Boolean) : DataFrame = {
val schema = df.schema
val newSchema = StructType(schema.map {
case StructField( c, t, _, m) if c.equals(cn) => StructField( c,
t, nullable = nullable, m)
case y: StructField => y
})
df.sqlContext.createDataFrame( df.rdd, newSchema)
}
def main (args: Array[String]) {
val conf = new SparkConf()
.setAppName("Minimal")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
val mappingWithNullDF = setNullableStateOfColumn(mappingDF, "y", true)
val joinedDF = recordDF.join(mappingDF, recordDF("x") ===
mappingDF("x"), "leftouter")
println("joinedDF:")
joinedDF.show
joinedDF.printSchema
joinedDF.filter(joinedDF("y").isNotNull).show
// joinedDF:
// +-+-----+----+----+
// |x| a| x| y|
// +-+-----+----+----+
// |1|hello|null|null|
// |2| bob| 2| 5|
// +-+-----+----+----+
//
// root
// |-- x: integer (nullable = false)
// |-- a: string (nullable = true)
// |-- x: integer (nullable = true)
// |-- y: integer (nullable = true)
//
// +-+---+-+-+
// |x| a|x|y|
// +-+---+-+-+
// |2|bob|2|5|
// +-+---+-+-+
val extrajoinedDF =
leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, "x")
println("extrajoinedDF:")
extrajoinedDF.show
extrajoinedDF.printSchema
extrajoinedDF.filter(extrajoinedDF("y").isNotNull).show
// extrajoinedDF:
// +-+-----+----+
// |x| a| y|
// +-+-----+----+
// |1|hello|null|
// |2| bob| 5|
// +-+-----+----+
//
// root
// |-- x: integer (nullable = false)
// |-- a: string (nullable = true)
// |-- y: integer (nullable = false)
//
// +-+-----+----+
// |x| a| y|
// +-+-----+----+
// |1|hello|null|
// |2| bob| 5|
// +-+-----+----+
val joined2DF = recordDF.join(mappingWithNullDF, recordDF("x") ===
mappingWithNullDF("x"), "leftouter")
println("joined2DF:")
joined2DF.show
joined2DF.printSchema
joined2DF.filter(joined2DF("y").isNotNull).show
// joined2DF:
// +-+-----+----+----+
// |x| a| x| y|
// +-+-----+----+----+
// |1|hello|null|null|
// |2| bob| 2| 5|
// +-+-----+----+----+
//
// root
// |-- x: integer (nullable = false)
// |-- a: string (nullable = true)
// |-- x: integer (nullable = true)
// |-- y: integer (nullable = true)
//
// +-+---+-+-+
// |x| a|x|y|
// +-+---+-+-+
// |2|bob|2|5|
// +-+---+-+-+
}
}
2015-07-31 1:56 GMT+02:00 Martin Senne <[email protected]>:
> Dear Michael, dear all,
>
> distinguishing those records that have a match in mapping from those that
> don't is the crucial point.
>
> Record(x : Int, a: String)
> Mapping(x: Int, y: Int)
>
> Thus
>
> Record(1, "hello")
> Record(2, "bob")
> Mapping(2, 5)
>
> yield (2, "bob", 5) on an inner join.
> BUT I'm also interested in (1, "hello", null) as there is no counterpart
> in mapping (this is the left outer join part)
>
> I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
> updates (case 2, bon).
>
> Cheers and thanks,
>
> Martin
>
> Am 30.07.2015 22:58 schrieb "Michael Armbrust" <[email protected]>:
> >
> > Perhaps I'm missing what you are trying to accomplish, but if you'd like
> to avoid the null values do an inner join instead of an outer join.
> >
> > Additionally, I'm confused about how the result
> of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values
> in the column y. This doesn't really have anything to do with nullable,
> which is only a hint to the system so that we can avoid null checking when
> we know that there are no null values. If you provide the full code i can
> try and see if this is a bug.
> >
> > On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne <
> [email protected]> wrote:
> >>
> >> Dear Michael, dear all,
> >>
> >> motivation:
> >>
> >> object OtherEntities {
> >>
> >> case class Record( x:Int, a: String)
> >> case class Mapping( x: Int, y: Int )
> >>
> >> val records = Seq( Record(1, "hello"), Record(2, "bob"))
> >> val mappings = Seq( Mapping(2, 5) )
> >> }
> >>
> >> Now I want to perform an left outer join on records and mappings (with
> the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") ....
> shorthand is in leftOuterJoinWithRemovalOfEqualColumn
> >>
> >> val sqlContext = new SQLContext(sc)
> >> // used to implicitly convert an RDD to a DataFrame.
> >> import sqlContext.implicits._
> >>
> >> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
> >> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
> >>
> >> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
> mappingDF, "x")
> >>
> >> joinedDF.filter(joinedDF("y").isNotNull).show
> >>
> >>
> >> Currently, the output is
> >>
> >>
> +-+-----+----+
>
> >> |x| a| y|
> >> +-+-----+----+
> >> |1|hello|null|
> >> |2| bob| 5|
> >> +-+-----+----+
> >>
> >> instead of
> >>
> >>
> +-+---+-+
>
> >> |x| a|y|
> >> +-+---+-+
> >> |2|bob|5|
> >> +-+---+-+
> >>
> >> The last output can be achieved by the method of changing
> nullable=false to nullable=true described in my first post.
> >>
> >> Thus, I need this schema modification as to make outer joins work.
> >>
> >> Cheers and thanks,
> >>
> >> Martin
> >>
> >>
> >>
> >> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <[email protected]>:
> >>>
> >>> We don't yet updated nullability information based on predicates as we
> don't actually leverage this information in many places yet. Why do you
> want to update the schema?
> >>>
> >>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
> [email protected]> wrote:
> >>>>
> >>>> Hi all,
> >>>>
> >>>> 1. *Columns in dataframes can be nullable and not nullable. Having a
> >>>> nullable column of Doubles, I can use the following Scala code to
> filter all
> >>>> "non-null" rows:*
> >>>>
> >>>> val df = ..... // some code that creates a DataFrame
> >>>> df.filter( df("columnname").isNotNull() )
> >>>>
> >>>> +-+-----+----+
> >>>> |x| a| y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2| bob| 5|
> >>>> +-+-----+----+
> >>>>
> >>>> root
> >>>> |-- x: integer (nullable = false)
> >>>> |-- a: string (nullable = true)
> >>>> |-- y: integer (nullable = true)
> >>>>
> >>>> And with the filter expression
> >>>> +-+---+-+
> >>>> |x| a|y|
> >>>> +-+---+-+
> >>>> |2|bob|5|
> >>>> +-+---+-+
> >>>>
> >>>>
> >>>> Unfortunetaly and while this is a true for a nullable column
> (according to
> >>>> df.printSchema), it is not true for a column that is not nullable:
> >>>>
> >>>>
> >>>> +-+-----+----+
> >>>> |x| a| y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2| bob| 5|
> >>>> +-+-----+----+
> >>>>
> >>>> root
> >>>> |-- x: integer (nullable = false)
> >>>> |-- a: string (nullable = true)
> >>>> |-- y: integer (nullable = false)
> >>>>
> >>>> +-+-----+----+
> >>>> |x| a| y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2| bob| 5|
> >>>> +-+-----+----+
> >>>>
> >>>> such that the output is not affected by the filter. Is this intended?
> >>>>
> >>>>
> >>>> 2. *What is the cheapest (in sense of performance) to turn a
> non-nullable
> >>>> column into a nullable column?
> >>>> A came uo with this:*
> >>>>
> >>>> /**
> >>>> * Set, if a column is nullable.
> >>>> * @param df source DataFrame
> >>>> * @param cn is the column name to change
> >>>> * @param nullable is the flag to set, such that the column is
> either
> >>>> nullable or not
> >>>> */
> >>>> def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
> >>>> Boolean) : DataFrame = {
> >>>>
> >>>> val schema = df.schema
> >>>> val newSchema = StructType(schema.map {
> >>>> case StructField( c, t, _, m) if c.equals(cn) => StructField(
> c, t,
> >>>> nullable = nullable, m)
> >>>> case y: StructField => y
> >>>> })
> >>>> df.sqlContext.createDataFrame( df.rdd, newSchema)
> >>>> }
> >>>>
> >>>> Is there a cheaper solution?
> >>>>
> >>>> 3. *Any comments?*
> >>>>
> >>>> Cheers and thx in advance,
> >>>>
> >>>> Martin
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
> >>>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe, e-mail: [email protected]
> >>>> For additional commands, e-mail: [email protected]
> >>>>
> >>>
> >>
> >
>
>