Ha! So easy, how could I miss it?!
Thanks!

Sent with AquaMail for Android
http://www.aqua-mail.com

On March 10, 2016 9:32:38 PM Todd Nist <tsind...@gmail.com> wrote:

The updateStateByKey can be supplied an initialRDD to populate it with.  Per 
code 
(https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445).

Provided here for your convenience.


  /**
   * Return a new "state" DStream where the state for each key is updated by 
applying
   * the given function on the previous state of the key and the new values of 
the key.
   * org.apache.spark.Partitioner is used to control the partitioning of each 
RDD.
   * @param updateFunc State update function. If `this` function returns None, 
then
   *                   corresponding state key-value pair will be eliminated.
   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new
   *                    DStream.
   * @param initialRDD initial state value of each key.
   * @tparam S State type
   */
  def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S],
      partitioner: Partitioner,
      initialRDD: RDD[(K, S)]
    ): DStream[(K, S)] = ssc.withScope {
    val cleanedUpdateF = sparkContext.clean(updateFunc)
    val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
      iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
    }
    updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)
  }

Simple example by Aniket Bhatnagar from an earlier thread on the forum.


def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long] = {
  val prevCount = prevStateOpt.getOrElse(0L)
  val newCount = prevCount + events.size
  Some(newCount)
}
val interval = 60 * 1000
val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_ * 
interval).map(n => (n % interval, n / interval))
val counts = eventsStream.map(event => {
  (event.timestamp - event.timestamp % interval, event)
}).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new 
HashPartitioner(3), initialRDD = initialRDD)
counts.print()

HTH.

-Todd


On Thu, Mar 10, 2016 at 1:35 AM, Zalzberg, Idan (Agoda) 
<idan.zalzb...@agoda.com<mailto:idan.zalzb...@agoda.com>> wrote:
Hi,

I have a spark-streaming application that basically keeps track of a 
string->string dictionary.

So I have messages coming in with updates, like:
"A"->"B"
And I need to update the dictionary.

This seems like a simple use case for the updateStateByKey method.

However, my issue is that when the app starts I need to "initialize" the 
dictionary with data from a hive table, that has all the historical key/values 
with the dictionary.

The only way I could think of is doing something like:

val rdd =... //get data from hive
def process(input: DStream[(String, String)]) = {
    input.join(rdd).updateStateByKey(update)
  }

So the join operation will be done on every incoming buffer, where in fact I 
only need it on initialization.

Any idea how to achieve that?

Thanks

________________________________
This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to