Based on execution on small test cases, it appears that the construction
below does what I intend. (Yes, all those Tuple1()s were superfluous.)
var lines = ssc.textFileStream(dirArg)
var linesArray = lines.map( line => (line.split("\t")))
var newState = linesArray.map( lineArray => ((lineArray(4),
(1, Time((lineArray(0).toDouble*1000).toLong),
Time((lineArray(0).toDouble*1000).toLong))) ))
val updateDhcpState = (newValues: Seq[(Int, Time, Time)], state:
Option[(Int, Time, Time)]) =>
Option[(Int, Time, Time)]
{
val newCount = newValues.map( x => x._1).sum
val newMinTime = newValues.map( x => x._2).min
val newMaxTime = newValues.map( x => x._3).max
val (count, minTime, maxTime) = state.getOrElse((0,
Time(Int.MaxValue), Time(Int.MinValue)))
(count+newCount, Seq(minTime, newMinTime).min, Seq(maxTime,
newMaxTime).max)
}
var DhcpSvrCum = newState.updateStateByKey[(Int, Time,
Time)](updateDhcpState)
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/does-updateStateByKey-accept-a-state-that-is-a-tuple-tp17756p17828.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]