Hi Micheal
I really appreciate your help. I The following code works. Is there a way
this example can be added to the distribution to make it easier for future
java programmers? It look me a long time get to this simple solution.
I'll need to tweak this example a little to work with the new PipeLine save
functionality. We need the current sqlContext to register our UDF. I see if
I can pass this in the Param Map. I¹ll throw and exception is some one use
transform(df)
public class StemmerTransformer extends Transformer implements Serializable
{
void registerUDF() {
if (udf == null) {
udf = new UDF();
DataType returnType =
DataTypes.createArrayType(DataTypes.StringType);
sqlContext.udf().register(udfName, udf, returnType);
}
}
@Override
public DataFrame transform(DataFrame df) {
df.printSchema();
df.show();
registerUDF();
DataFrame ret = df.selectExpr("*", "StemUDF(rawInput) as
filteredOutput");
return ret;
}
class UDF implements UDF1<WrappedArray<String>, List<String>> {
private static final long serialVersionUID = 1L;
@Override
public List<String> call(WrappedArray<String> wordsArg) throws
Exception {
List<String> words = JavaConversions.asJavaList(wordsArg);
ArrayList<String> ret = new ArrayList<String>(words.size());
for (String word : words) {
// TODO replace test code
ret.add(word + "_stemed");
}
return ret;
}
}
}
root
|-- rawInput: array (nullable = false)
| |-- element: string (containsNull = true)
+--------------------+
| rawInput|
+--------------------+
|[I, saw, the, red...|
|[Mary, had, a, li...|
|[greet, greeting,...|
+--------------------+
root
|-- rawInput: array (nullable = false)
| |-- element: string (containsNull = true)
|-- filteredOutput: array (nullable = true)
| |-- element: string (containsNull = true)
+----------------------------------+----------------------------------------
-----------------------+
|rawInput |filteredOutput
|
+----------------------------------+----------------------------------------
-----------------------+
|[I, saw, the, red, baloon] |[I_stemed, saw_stemed, the_stemed,
red_stemed, baloon_stemed] |
|[Mary, had, a, little, lamb] |[Mary_stemed, had_stemed, a_stemed,
little_stemed, lamb_stemed]|
|[greet, greeting, greets, greeted]|[greet_stemed, greeting_stemed,
greets_stemed, greeted_stemed] |
+----------------------------------+----------------------------------------
-----------------------+
From: Michael Armbrust <[email protected]>
Date: Tuesday, January 5, 2016 at 12:58 PM
To: Andrew Davidson <[email protected]>
Cc: "user @spark" <[email protected]>
Subject: Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing
>> I am trying to implement org.apache.spark.ml <http://org.apache.spark.ml>
>> .Transformer interface in Java 8.
>> My understanding is the sudo code for transformers is something like
>> @Override
>>
>> public DataFrame transform(DataFrame df) {
>>
>> 1. Select the input column
>>
>> 2. Create a new column
>>
>> 3. Append the new column to the df argument and return
>>
>> }
>
>
> The following line can be used inside of the transform function to return a
> Dataframe that has been augmented with a new column using the stem lambda
> function (defined as a UDF below).
> return df.withColumn("filteredInput", expr("stem(rawInput)"));
> This is producing a new column called filterInput (that is appended to
> whatever columns are already there) by passing the column rawInput to your
> arbitrary lambda function.
>
>> Based on my experience the current DataFrame api is very limited. You can not
>> apply a complicated lambda function. As a work around I convert the data
>> frame to a JavaRDD, apply my complicated lambda, and then convert the
>> resulting RDD back to a Data Frame.
>
>
> This is exactly what this code is doing. You are defining an arbitrary lambda
> function as a UDF. The difference here, when compared to a JavaRDD map, is
> that you can use this UDF to append columns without having to manually append
> the new data to some existing object.
> sqlContext.udf().register("stem", new UDF1<String, String>() {
> @Override
> public String call(String str) {
> return // TODO: stemming code here
> }
> }, DataTypes.StringType);
>> Now I select the ³new column² from the Data Frame and try to call
>> df.withColumn().
>>
>>
>>
>> I can try an implement this as a UDF. How ever I need to use several 3rd
>> party jars. Any idea how insure the workers will have the required jar files?
>> If I was submitting a normal java app I would create an uber jar will this
>> work with UDFs?
>
>
> Yeah, UDFs are run the same way as your RDD lambda functions.