I'm trying to implement a Spark Streaming program to calculate the number of
instances of a given key encountered and the minimum and maximum times at
which it was encountered. updateStateByKey seems to be just the thing, but
when I define the "state" to be a tuple, I get compile errors I'm not
finding a way around. Perhaps it's something simple, but I'm stumped.
var lines = ssc.textFileStream(dirArg)
var linesArray = lines.map( line => (line.split("\t")))
var newState = linesArray.map( lineArray => (lineArray(4), 1,
Time((lineArray(0).toDouble*1000).toInt),
Time((lineArray(0).toDouble*1000).toInt)))
val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state:
Option[(Int, Time, Time)]) =>
{
val newCount = newValues.map( x => x._1).sum
val newMinTime = newValues.map( x => x._2).min
val newMaxTime = newValues.map( x => x._3).max
val (count, minTime, maxTime) = state.getOrElse((0,
Time(Int.MaxValue), Time(Int.MinValue)))
Some((count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
newMaxTime).max))
//(count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
newMaxTime).max)
}
var DhcpSvrCum = newState.updateStateByKey[(Int, Time,
Time)](updateDhcpState)
The error I get is
[info] Compiling 3 Scala sources to
/Users/spr/Documents/.../target/scala-2.10/classes...
[error] /Users/spr/Documents/...StatefulDhcpServersHisto.scala:95: value
updateStateByKey is not a member of
org.apache.spark.streaming.dstream.DStream[(String, Int,
org.apache.spark.streaming.Time, org.apache.spark.streaming.Time)]
[error] var DhcpSvrCum = newState.updateStateByKey[(Int, Time,
Time)](updateDhcpState)
I don't understand why the String is being prepended to the tuple I expect
(Int, Time, Time). In the main example (StatefulNetworkWordCount, here
<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala>
), the data is a stream of (String, Int) tuples created by
val wordDstream = words.map(x => (x, 1))
and the updateFunc ignores the String key in its definition
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
Is there some special-casing of a key with a simple (non-tuple) value? How
could this work with a tuple value?
Thanks in advance.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]