Hi,

after seeing that IDF needed refactoring to use ML vectors instead of MLLib
ones, I have created a Jira ticket in
 https://issues.apache.org/jira/browse/SPARK-22531
<https://issues.apache.org/jira/browse/SPARK-22531> and submitted a PR for
it.
If anyone can have a look and suggest any changes it would be really
appreciated.

Thank you.


2017-11-15 1:11 GMT+00:00 Bago Amirbekian <b...@databricks.com>:

> There is a known issue with VectorAssembler which causes it to fail in
> streaming if any of the input columns are of VectorType & don't have size
> information, https://issues.apache.org/jira/browse/SPARK-22346.
>
> This can be fixed by adding size information to the vector columns, I've
> made a PR to add a transformer to spark to help with this,
> https://github.com/apache/spark/pull/19746. It would be awesome if you
> could take a look and see if this would fix your issue.
>
> On Sun, Nov 12, 2017 at 5:37 PM Davis Varghese <vergh...@gmail.com> wrote:
>
>> Bago,
>>
>> Finally I am able to create one which fails consistently. I think the
>> issue
>> is caused by the VectorAssembler in the model. In the new code, I have 2
>> features(1 text and 1 number) and I have to run through a VectorAssembler
>> before giving to LogisticRegression. Code and test data below
>>
>> import java.util.Arrays;
>> import java.util.List;
>> import org.apache.spark.ml.Pipeline;
>> import org.apache.spark.ml.PipelineModel;
>> import org.apache.spark.ml.PipelineStage;
>> import org.apache.spark.ml.classification.LogisticRegression;
>> import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
>> import org.apache.spark.ml.feature.CountVectorizer;
>> import org.apache.spark.ml.feature.CountVectorizerModel;
>> import org.apache.spark.ml.feature.IndexToString;
>> import org.apache.spark.ml.feature.StringIndexer;
>> import org.apache.spark.ml.feature.StringIndexerModel;
>> import org.apache.spark.ml.feature.Tokenizer;
>> import org.apache.spark.ml.feature.VectorAssembler;
>> import org.apache.spark.ml.param.ParamMap;
>> import org.apache.spark.ml.tuning.ParamGridBuilder;
>> import org.apache.spark.ml.tuning.TrainValidationSplit;
>> import org.apache.spark.ml.tuning.TrainValidationSplitModel;
>> import org.apache.spark.sql.Dataset;
>> import org.apache.spark.sql.Row;
>> import org.apache.spark.sql.RowFactory;
>> import org.apache.spark.sql.SparkSession;
>> import org.apache.spark.sql.streaming.StreamingQuery;
>> import org.apache.spark.sql.types.DataTypes;
>> import org.apache.spark.sql.types.Metadata;
>> import org.apache.spark.sql.types.StructField;
>> import org.apache.spark.sql.types.StructType;
>>
>> /**
>>  * A simple text classification pipeline that recognizes "spark" from
>> input
>> text.
>>  */
>> public class StreamingIssueCountVectorizerSplitFailed {
>>
>>   public static void main(String[] args) throws Exception {
>>     SparkSession sparkSession =
>> SparkSession.builder().appName("StreamingIssueCountVectorizer")
>>         .master("local[2]")
>>         .getOrCreate();
>>
>>     List<Row> _trainData = Arrays.asList(
>>         RowFactory.create("sunny fantastic day", 1, "Positive"),
>>         RowFactory.create("fantastic morning match", 1, "Positive"),
>>         RowFactory.create("good morning", 1, "Positive"),
>>         RowFactory.create("boring evening", 5, "Negative"),
>>         RowFactory.create("tragic evening event", 5, "Negative"),
>>         RowFactory.create("today is bad ", 5, "Negative")
>>     );
>>     List<Row> _testData = Arrays.asList(
>>         RowFactory.create("sunny morning", 1),
>>         RowFactory.create("bad evening", 5)
>>     );
>>     StructType schema = new StructType(new StructField[]{
>>         new StructField("tweet", DataTypes.StringType, false,
>> Metadata.empty()),
>>         new StructField("time", DataTypes.IntegerType, false,
>> Metadata.empty()),
>>         new StructField("sentiment", DataTypes.StringType, true,
>> Metadata.empty())
>>     });
>>     StructType testSchema = new StructType(new StructField[]{
>>         new StructField("tweet", DataTypes.StringType, false,
>> Metadata.empty()),
>>         new StructField("time", DataTypes.IntegerType, false,
>> Metadata.empty())
>>     });
>>
>>     Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
>> schema);
>>     Dataset<Row> testData = sparkSession.createDataFrame(_testData,
>> testSchema);
>>     StringIndexerModel labelIndexerModel = new StringIndexer()
>>         .setInputCol("sentiment")
>>         .setOutputCol("label")
>>         .setHandleInvalid("skip")
>>         .fit(trainData);
>>     Tokenizer tokenizer = new Tokenizer()
>>         .setInputCol("tweet")
>>         .setOutputCol("words");
>>     CountVectorizer countVectorizer = new CountVectorizer()
>>         .setInputCol(tokenizer.getOutputCol())
>>         .setOutputCol("wordfeatures")
>>         .setVocabSize(3)
>>         .setMinDF(2)
>>         .setMinTF(2)
>>         .setBinary(true);
>>
>>     VectorAssembler vectorAssembler = new VectorAssembler()
>>         .setInputCols(new String[]{"wordfeatures", "time"}).
>>             setOutputCol("features");
>>
>>     Dataset<Row> words = tokenizer.transform(trainData);
>>     CountVectorizerModel countVectorizerModel =
>> countVectorizer.fit(words);
>>
>>     LogisticRegression lr = new LogisticRegression()
>>         .setMaxIter(10)
>>         .setRegParam(0.001);
>>
>>     IndexToString labelConverter = new IndexToString()
>>         .setInputCol("prediction")
>>         .setOutputCol("predicted")
>>         .setLabels(labelIndexerModel.labels());
>>
>>     countVectorizerModel.setMinTF(1);
>>
>>     Pipeline pipeline = new Pipeline()
>>         .setStages(
>>             new PipelineStage[]{labelIndexerModel, tokenizer,
>> countVectorizerModel, vectorAssembler,
>>                 lr, labelConverter});
>>     ParamMap[] paramGrid = new ParamGridBuilder()
>>         .addGrid(lr.regParam(), new double[]{0.1, 0.01})
>>         .addGrid(lr.fitIntercept())
>>         .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
>>         .build();
>>
>>     MulticlassClassificationEvaluator evaluator = new
>> MulticlassClassificationEvaluator();
>>     evaluator.setLabelCol("label");
>>     evaluator.setPredictionCol("prediction");
>>
>>     TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
>>         .setEstimator(pipeline)
>>         .setEvaluator(evaluator)
>>         .setEstimatorParamMaps(paramGrid)
>>         .setTrainRatio(0.7);
>>
>>     // Fit the pipeline to training documents.
>>     TrainValidationSplitModel trainValidationSplitModel =
>> trainValidationSplit.fit(trainData);
>>
>>
>> trainValidationSplitModel.write().overwrite().save("/
>> tmp/CountSplit.model");
>>
>>     TrainValidationSplitModel _loadedModel = TrainValidationSplitModel
>>         .load("/tmp/CountSplit.model");
>>     PipelineModel loadedModel = (PipelineModel)
>> (_loadedModel).bestModel();
>>
>>     //Test on non-streaming data
>>     Dataset<Row> predicted = loadedModel.transform(testData);
>>     predicted.show();
>>     List<Row> _rows = predicted.select("tweet",
>> "predicted").collectAsList();
>>     for (Row r : _rows) {
>>       System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
>>     }
>>
>>     //Test on streaming data
>>
>>     Dataset<Row> lines = sparkSession.readStream().option("sep", ",")
>>         .schema(testSchema).option("header",
>> "true").option("inferSchema",
>> "true")
>>         .format("com.databricks.spark.csv")
>>         .load("file:///home/davis/Documents/Bugs/StreamingTwitter1");
>>
>>     StreamingQuery query = loadedModel.transform(lines).writeStream()
>>         .outputMode("append")
>>         .format("console")
>>         .start();
>>
>>     query.awaitTermination();
>>
>>   }
>> }
>>
>> *##Test data csv file*
>> tweet,time
>> Today is a bright sunny day,2
>> How is everyone feeling in office?,2
>> I want beef cake. Where is it?,2
>> The weather sucks today,2
>> I like Vat69.,5
>> I don't care,5
>> Wassup,5
>> Skyfall sucks!,5
>>
>>
>> *Output*
>> *--------*
>>
>>
>> +-------------+----+----------------+-------------+---------
>> --------+--------------------+--------------------+----------+---------+
>> |        tweet|time|           words| wordfeatures|         features|
>> rawPrediction|         probability|prediction|predicted|
>> +-------------+----+----------------+-------------+---------
>> --------+--------------------+--------------------+----------+---------+
>> |sunny morning|   1|[sunny,
>> morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765..
>> .|[0.96548740116159...|
>> 0.0| Positive|
>> |  bad evening|   5|  [bad,
>> evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340..
>> .|[0.01152820807912...|
>> 1.0| Negative|
>> +-------------+----+----------------+-------------+---------
>> --------+--------------------+--------------------+----------+---------+
>>
>> [sunny morning], prediction=Positive
>> [bad evening], prediction=Negative
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> Queries
>> with streaming sources must be executed with writeStream.start();;
>> FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1]
>>         at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.
>> org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$
>> throwError(UnsupportedOperationChecker.scala:297)
>>         at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$
>> anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
>>         at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$
>> anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
>> TreeNode.scala:127)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> foreachUp$1.apply(TreeNode.scala:126)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> foreachUp$1.apply(TreeNode.scala:126)
>>         at scala.collection.immutable.List.foreach(List.scala:381)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
>> TreeNode.scala:126)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> foreachUp$1.apply(TreeNode.scala:126)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> foreachUp$1.apply(TreeNode.scala:126)
>>         at scala.collection.immutable.List.foreach(List.scala:381)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
>> TreeNode.scala:126)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> foreachUp$1.apply(TreeNode.scala:126)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> foreachUp$1.apply(TreeNode.scala:126)
>>         at scala.collection.immutable.List.foreach(List.scala:381)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
>> TreeNode.scala:126)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> foreachUp$1.apply(TreeNode.scala:126)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
>> foreachUp$1.apply(TreeNode.scala:126)
>>         at scala.collection.immutable.List.foreach(List.scala:381)
>>         at
>> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
>> TreeNode.scala:126)
>>         at
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.
>> checkForBatch(UnsupportedOperationChecker.scala:34)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.assertSupported(
>> QueryExecution.scala:63)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(
>> QueryExecution.scala:74)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.
>> withCachedData(QueryExecution.scala:72)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(
>> QueryExecution.scala:78)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.
>> optimizedPlan(QueryExecution.scala:78)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(
>> QueryExecution.scala:84)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.
>> scala:80)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(
>> QueryExecution.scala:89)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.
>> executedPlan(QueryExecution.scala:89)
>>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
>>         at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
>>         at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
>>         at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
>>         at
>> org.apache.spark.ml.feature.VectorAssembler.first$
>> lzycompute$1(VectorAssembler.scala:57)
>>         at
>> org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$
>> VectorAssembler$$first$1(VectorAssembler.scala:57)
>>         at
>> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$
>> anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
>>         at
>> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(
>> VectorAssembler.scala:88)
>>         at
>> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(
>> VectorAssembler.scala:88)
>>         at scala.Option.getOrElse(Option.scala:121)
>>         at
>> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.
>> apply(VectorAssembler.scala:88)
>>         at
>> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.
>> apply(VectorAssembler.scala:58)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
>> TraversableLike.scala:241)
>>         at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
>> TraversableLike.scala:241)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.
>> scala:33)
>>         at scala.collection.mutable.ArrayOps$ofRef.foreach(
>> ArrayOps.scala:186)
>>         at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>>         at scala.collection.mutable.ArrayOps$ofRef.flatMap(
>> ArrayOps.scala:186)
>>         at
>> org.apache.spark.ml.feature.VectorAssembler.transform(
>> VectorAssembler.scala:58)
>>         at
>> org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.
>> scala:305)
>>         at
>> org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.
>> scala:305)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.
>> scala:57)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.
>> scala:66)
>>         at scala.collection.mutable.ArrayOps$ofRef.foldLeft(
>> ArrayOps.scala:186)
>>         at org.apache.spark.ml.PipelineModel.transform(
>> Pipeline.scala:305)
>>         at
>> StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerS
>> plit.java:164)
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

Reply via email to