Hi, I should have used transform instead of map val x: DStream[(String, Record)] = kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{sqlContext.read.json(x).as[Record].map(r=>(r.iid,r))}.rdd)
but I'm still unable to call mapWithState on x. any idea why ? Thank you, Daniel On Tue, Nov 8, 2016 at 7:46 PM, Daniel Haviv < daniel.ha...@veracity-group.com> wrote: > Hi, > I'm trying to make a stateful stream of Tuple2[String, Dataset[Record]] : > > val kafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) > val stateStream: DStream[RDD[(String, Record)]] = kafkaStream.map(x=> { > sqlContext.read.json(x._2).as[Record]}).map(x=>{x.map(r=>(r.iid,r)).rdd}) > > > Because stateStream is a DStream[RDD[(String, Record)]] I can't call > mapWithState on it. > How can I map it to a DStream[(String,Record)] ? > > Thank you, > Daniel > >