as the following code modified form StateflNetwork in exampile package
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordBiggest3Vaules <hostname>
<port>")
System.exit(1)
}
/**
* state is min(max(3))
*/
val updateFunc = (key:String,values: Seq[Seq[Int]], state: Seq[Int]) => {
values(0)
}
val newUpdateFunc = (iterator: Iterator[(String, Seq[Seq[Int]], Seq[Int])]) => {
iterator.flatMap(t => updateFunc(t._1,t._2, t._3).map(s => (t._1, s)))
}
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", Seq(1)), ("world",
Seq(1))))
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, scala.util.Random.nextInt(1000)))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the
words)
val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism))
stateDstream.print()
ssc.start()
ssc.awaitTermination()
the compile error:
Error:(77, 52) overloaded method value updateStateByKey with alternatives:
(updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] =>
Iterator[(String, Seq[Int])],partitioner:
org.apache.spark.Partitioner,rememberPartitioner: Boolean,initialRDD:
org.apache.spark.rdd.RDD[(String, Seq[Int])])(implicit evidence$7:
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
Seq[Int])] <and>
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner:
org.apache.spark.Partitioner,initialRDD: org.apache.spark.rdd.RDD[(String,
Seq[Int])])(implicit evidence$6:
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
Seq[Int])] <and>
(updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] =>
Iterator[(String, Seq[Int])],partitioner:
org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit evidence$5:
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
Seq[Int])] <and>
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner:
org.apache.spark.Partitioner)(implicit evidence$4:
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
Seq[Int])] <and>
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],numPartitions:
Int)(implicit evidence$3:
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
Seq[Int])] <and>
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]])(implicit
evidence$2:
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
Seq[Int])]
cannot be applied to (Iterator[(String, Seq[Seq[Int]], Seq[Int])] =>
Iterator[(String, Int)], org.apache.spark.HashPartitioner, Boolean,
org.apache.spark.rdd.RDD[(String, Seq[Int])])
val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc,
Ricky Ou(欧 锐)
部 门:苏宁云商 IT总部技术支撑研发中心大
数据中心数据平台开发部
tel :18551600418
email : [email protected]
From: Dean Wampler
Date: 2015-12-23 00:46
To: [email protected]
CC: user; [email protected]
Subject: Re: spark streaming updateStateByKey state is nonsupport other type
except ClassTag such as list?
There are ClassTags for Array, List, and Map, as well as for Int, etc. that you
might have inside those collections. What do you mean by sql? Could you post
more of your code?
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com
On Mon, Dec 21, 2015 at 8:51 PM, [email protected] <[email protected]>
wrote:
spark streaming updateStateByKey state no support Array type without classTag?
how to slove the problem?
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
ClassTag not support other type eg:hashmap ,list ,sql.
my usecase as following:
save the lastest three click log with collecting goods from different
topic with same member ID, then the system will recommend related products
according to lastest three click log with collecting goods.
I want to use updateStateByKey state to save it ,however updateStateByKey
state is nonsupport other type except ClassTag such as list.
thanks for
your help
Ricky Ou(欧 锐)