Hi,
I should have used transform instead of map

val x: DStream[(String, Record)] =
kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{sqlContext.read.json(x).as[Record].map(r=>(r.iid,r))}.rdd)

but I'm still unable to call mapWithState on x.

any idea why ?

Thank you,
Daniel



On Tue, Nov 8, 2016 at 7:46 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm trying to make a stateful stream of Tuple2[String, Dataset[Record]] :
>
> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
> val stateStream: DStream[RDD[(String, Record)]] = kafkaStream.map(x=> {  
> sqlContext.read.json(x._2).as[Record]}).map(x=>{x.map(r=>(r.iid,r)).rdd})
>
>
> Because stateStream is a DStream[RDD[(String, Record)]] I can't call 
> mapWithState on it.
> How can I map it to a DStream[(String,Record)] ?
>
> Thank you,
> Daniel
>
>

Reply via email to