There is a known issue with VectorAssembler which causes it to fail in
streaming if any of the input columns are of VectorType & don't have size
information, https://issues.apache.org/jira/browse/SPARK-22346.

This can be fixed by adding size information to the vector columns, I've
made a PR to add a transformer to spark to help with this,
https://github.com/apache/spark/pull/19746. It would be awesome if you
could take a look and see if this would fix your issue.

On Sun, Nov 12, 2017 at 5:37 PM Davis Varghese <vergh...@gmail.com> wrote:

> Bago,
>
> Finally I am able to create one which fails consistently. I think the issue
> is caused by the VectorAssembler in the model. In the new code, I have 2
> features(1 text and 1 number) and I have to run through a VectorAssembler
> before giving to LogisticRegression. Code and test data below
>
> import java.util.Arrays;
> import java.util.List;
> import org.apache.spark.ml.Pipeline;
> import org.apache.spark.ml.PipelineModel;
> import org.apache.spark.ml.PipelineStage;
> import org.apache.spark.ml.classification.LogisticRegression;
> import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
> import org.apache.spark.ml.feature.CountVectorizer;
> import org.apache.spark.ml.feature.CountVectorizerModel;
> import org.apache.spark.ml.feature.IndexToString;
> import org.apache.spark.ml.feature.StringIndexer;
> import org.apache.spark.ml.feature.StringIndexerModel;
> import org.apache.spark.ml.feature.Tokenizer;
> import org.apache.spark.ml.feature.VectorAssembler;
> import org.apache.spark.ml.param.ParamMap;
> import org.apache.spark.ml.tuning.ParamGridBuilder;
> import org.apache.spark.ml.tuning.TrainValidationSplit;
> import org.apache.spark.ml.tuning.TrainValidationSplitModel;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.streaming.StreamingQuery;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.Metadata;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
>
> /**
>  * A simple text classification pipeline that recognizes "spark" from input
> text.
>  */
> public class StreamingIssueCountVectorizerSplitFailed {
>
>   public static void main(String[] args) throws Exception {
>     SparkSession sparkSession =
> SparkSession.builder().appName("StreamingIssueCountVectorizer")
>         .master("local[2]")
>         .getOrCreate();
>
>     List<Row> _trainData = Arrays.asList(
>         RowFactory.create("sunny fantastic day", 1, "Positive"),
>         RowFactory.create("fantastic morning match", 1, "Positive"),
>         RowFactory.create("good morning", 1, "Positive"),
>         RowFactory.create("boring evening", 5, "Negative"),
>         RowFactory.create("tragic evening event", 5, "Negative"),
>         RowFactory.create("today is bad ", 5, "Negative")
>     );
>     List<Row> _testData = Arrays.asList(
>         RowFactory.create("sunny morning", 1),
>         RowFactory.create("bad evening", 5)
>     );
>     StructType schema = new StructType(new StructField[]{
>         new StructField("tweet", DataTypes.StringType, false,
> Metadata.empty()),
>         new StructField("time", DataTypes.IntegerType, false,
> Metadata.empty()),
>         new StructField("sentiment", DataTypes.StringType, true,
> Metadata.empty())
>     });
>     StructType testSchema = new StructType(new StructField[]{
>         new StructField("tweet", DataTypes.StringType, false,
> Metadata.empty()),
>         new StructField("time", DataTypes.IntegerType, false,
> Metadata.empty())
>     });
>
>     Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
> schema);
>     Dataset<Row> testData = sparkSession.createDataFrame(_testData,
> testSchema);
>     StringIndexerModel labelIndexerModel = new StringIndexer()
>         .setInputCol("sentiment")
>         .setOutputCol("label")
>         .setHandleInvalid("skip")
>         .fit(trainData);
>     Tokenizer tokenizer = new Tokenizer()
>         .setInputCol("tweet")
>         .setOutputCol("words");
>     CountVectorizer countVectorizer = new CountVectorizer()
>         .setInputCol(tokenizer.getOutputCol())
>         .setOutputCol("wordfeatures")
>         .setVocabSize(3)
>         .setMinDF(2)
>         .setMinTF(2)
>         .setBinary(true);
>
>     VectorAssembler vectorAssembler = new VectorAssembler()
>         .setInputCols(new String[]{"wordfeatures", "time"}).
>             setOutputCol("features");
>
>     Dataset<Row> words = tokenizer.transform(trainData);
>     CountVectorizerModel countVectorizerModel = countVectorizer.fit(words);
>
>     LogisticRegression lr = new LogisticRegression()
>         .setMaxIter(10)
>         .setRegParam(0.001);
>
>     IndexToString labelConverter = new IndexToString()
>         .setInputCol("prediction")
>         .setOutputCol("predicted")
>         .setLabels(labelIndexerModel.labels());
>
>     countVectorizerModel.setMinTF(1);
>
>     Pipeline pipeline = new Pipeline()
>         .setStages(
>             new PipelineStage[]{labelIndexerModel, tokenizer,
> countVectorizerModel, vectorAssembler,
>                 lr, labelConverter});
>     ParamMap[] paramGrid = new ParamGridBuilder()
>         .addGrid(lr.regParam(), new double[]{0.1, 0.01})
>         .addGrid(lr.fitIntercept())
>         .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
>         .build();
>
>     MulticlassClassificationEvaluator evaluator = new
> MulticlassClassificationEvaluator();
>     evaluator.setLabelCol("label");
>     evaluator.setPredictionCol("prediction");
>
>     TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
>         .setEstimator(pipeline)
>         .setEvaluator(evaluator)
>         .setEstimatorParamMaps(paramGrid)
>         .setTrainRatio(0.7);
>
>     // Fit the pipeline to training documents.
>     TrainValidationSplitModel trainValidationSplitModel =
> trainValidationSplit.fit(trainData);
>
>
>
> trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model");
>
>     TrainValidationSplitModel _loadedModel = TrainValidationSplitModel
>         .load("/tmp/CountSplit.model");
>     PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel();
>
>     //Test on non-streaming data
>     Dataset<Row> predicted = loadedModel.transform(testData);
>     predicted.show();
>     List<Row> _rows = predicted.select("tweet",
> "predicted").collectAsList();
>     for (Row r : _rows) {
>       System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
>     }
>
>     //Test on streaming data
>
>     Dataset<Row> lines = sparkSession.readStream().option("sep", ",")
>         .schema(testSchema).option("header", "true").option("inferSchema",
> "true")
>         .format("com.databricks.spark.csv")
>         .load("file:///home/davis/Documents/Bugs/StreamingTwitter1");
>
>     StreamingQuery query = loadedModel.transform(lines).writeStream()
>         .outputMode("append")
>         .format("console")
>         .start();
>
>     query.awaitTermination();
>
>   }
> }
>
> *##Test data csv file*
> tweet,time
> Today is a bright sunny day,2
> How is everyone feeling in office?,2
> I want beef cake. Where is it?,2
> The weather sucks today,2
> I like Vat69.,5
> I don't care,5
> Wassup,5
> Skyfall sucks!,5
>
>
> *Output*
> *--------*
>
>
>
> +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
> |        tweet|time|           words| wordfeatures|         features|
> rawPrediction|         probability|prediction|predicted|
>
> +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
> |sunny morning|   1|[sunny,
>
> morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765...|[0.96548740116159...|
> 0.0| Positive|
> |  bad evening|   5|  [bad,
>
> evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340...|[0.01152820807912...|
> 1.0| Negative|
>
> +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
>
> [sunny morning], prediction=Positive
> [bad evening], prediction=Negative
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries
> with streaming sources must be executed with writeStream.start();;
> FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1]
>         at
>
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>         at
>
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
>         at
>
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>         at
>
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at
>
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
>         at
>
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at
>
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
>         at
>
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at
>
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
>         at
>
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at
>
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
>         at
>
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
>         at
>
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
>         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
>         at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
>         at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
>         at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
>         at
>
> org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
>         at
> org.apache.spark.ml.feature.VectorAssembler.org
> $apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
>         at
>
> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
>         at
>
> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
>         at
>
> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
>         at scala.Option.getOrElse(Option.scala:121)
>         at
>
> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
>         at
>
> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
>         at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>         at
> scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
>         at
>
> org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
>         at
> org.apache.spark.ml
> .PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
>         at
> org.apache.spark.ml
> .PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
>         at
>
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>         at
>
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
>         at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
>         at
>
> StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerSplit.java:164)
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to