Hi Michael
I am not sure you under stand my code correct.
I am trying to implement org.apache.spark.ml.Transformer interface in Java
8.
My understanding is the sudo code for transformers is something like
@Override
public DataFrame transform(DataFrame df) {
1. Select the input column
2. Create a new column
3. Append the new column to the df argument and return
}
Based on my experience the current DataFrame api is very limited. You can
not apply a complicated lambda function. As a work around I convert the data
frame to a JavaRDD, apply my complicated lambda, and then convert the
resulting RDD back to a Data Frame.
Now I select the ³new column² from the Data Frame and try to call
df.withColumn().
I can try an implement this as a UDF. How ever I need to use several 3rd
party jars. Any idea how insure the workers will have the required jar
files? If I was submitting a normal java app I would create an uber jar will
this work with UDFs?
Kind regards
Andy
From: Michael Armbrust <[email protected]>
Date: Monday, January 4, 2016 at 11:14 PM
To: Andrew Davidson <[email protected]>
Cc: "user @spark" <[email protected]>
Subject: Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing
> Its not really possible to convert an RDD to a Column. You can think of a
> Column as an expression that produces a single output given some set of input
> columns. If I understand your code correctly, I think this might be easier to
> express as a UDF:
> sqlContext.udf().register("stem", new UDF1<String, String>() {
> @Override
> public String call(String str) {
> return // TODO: stemming code here
> }
> }, DataTypes.StringType);
> DataFrame transformed = df.withColumn("filteredInput",
> expr("stem(rawInput)"));
>
> On Mon, Jan 4, 2016 at 8:08 PM, Andy Davidson <[email protected]>
> wrote:
>> I am having a heck of a time writing a simple transformer in Java. I assume
>> that my Transformer is supposed to append a new column to the dataFrame
>> argument. Any idea why I get the following exception in Java 8 when I try to
>> call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new
>> DataFrame
>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.
>> html> by adding a column or replacing the existing column that has the same
>> name.²
>>
>>
>> Also do transformers always run in the driver? If not I assume workers do not
>> have the sqlContext. Any idea how I can convert an javaRDD<> to a Column with
>> out a sqlContext?
>>
>> Kind regards
>>
>> Andy
>>
>> P.s. I am using spark 1.6.0
>>
>> org.apache.spark.sql.AnalysisException: resolved attribute(s)
>> filteredOutput#1 missing from rawInput#0 in operator !Project
>> [rawInput#0,filteredOutput#1 AS filteredOutput#2];
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Check
>> Analysis.scala:38)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:4
>> 4)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1
>> .apply(CheckAnalysis.scala:183)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1
>> .apply(CheckAnalysis.scala:50)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Chec
>> kAnalysis.scala:50)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:
>> 44)
>> at
>> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.s
>> cala:34)
>> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
>> at org.apache.spark.sql.DataFrame.org
>> <http://org.apache.spark.sql.DataFrame.org>
>> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
>> at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
>> at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
>> at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
>> at com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)
>>
>>
>>
>> public class StemmerTransformer extends Transformer implements Serializable {
>>
>> String inputCol; // unit test sets to rawInput
>> String outputCol; // unit test sets to filteredOutput
>>
>>
>>
>>
>> public StemmerTransformer(SQLContext sqlContext) {
>>
>> // will only work if transformers execute in the driver
>>
>> this.sqlContext = sqlContext;
>>
>> }
>>
>>
>> @Override
>>
>> public DataFrame transform(DataFrame df) {
>>
>> df.printSchema();
>>
>> df.show();
>>
>>
>>
>> JavaRDD<Row> inRowRDD = df.select(inputCol).javaRDD();
>>
>> JavaRDD<Row> outRowRDD = inRowRDD.map((Row row) -> {
>>
>> // TODO add stemming code
>>
>> // Create a new Row
>>
>> Row ret = RowFactory.create("TODO");
>>
>> return ret;
>>
>> });
>>
>>
>>
>> //can we create a Col from a JavaRDD<Row>?
>>
>>
>>
>> List<StructField> fields = new ArrayList<StructField>();
>>
>> boolean nullable = true;
>>
>> fields.add(DataTypes.createStructField(outputCol,
>> DataTypes.StringType, nullable));
>>
>>
>>
>> StructType schema = DataTypes.createStructType(fields);
>>
>> DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema);
>>
>> outputDF.printSchema();
>>
>> outputDF.show();
>>
>> Column newCol = outputDF.col(outputCol);
>>
>>
>>
>> return df.withColumn(outputCol, newCol);
>>
>> }
>>
>>
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>>
>> SLF4J: Actual binding is of type
>> [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
>>
>> WARN 03:58:46 main o.a.h.u.NativeCodeLoader <clinit> line:62 Unable to load
>> native-hadoop library for your platform... using builtin-java classes where
>> applicable
>>
>> root
>>
>> |-- rawInput: array (nullable = false)
>>
>> | |-- element: string (containsNull = true)
>>
>>
>>
>> +--------------------+
>>
>> | rawInput|
>>
>> +--------------------+
>>
>> |[I, saw, the, red...|
>>
>> |[Mary, had, a, li...|
>>
>> |[greet, greeting,...|
>>
>> +--------------------+
>>
>>
>>
>> root
>>
>> |-- filteredOutput: string (nullable = true)
>>
>>
>>
>> +--------------+
>>
>> |filteredOutput|
>>
>> +--------------+
>>
>> | TODO|
>>
>> | TODO|
>>
>> | TODO|
>>
>> +--------------+
>>
>>
>