I may be misunderstanding, but you need to take each kafka message, and turn it into multiple items in the transformed rdd?
so something like (pseudocode): stream.flatMap { message => val items = new ArrayBuffer var parser = null message.split("\n").foreach { line => if // it's a header parser = someParserBasedOn(line) else items += parser.parse(line) } items.iterator } On Mon, Nov 7, 2016 at 4:22 PM, coolgar <karllbunn...@gmail.com> wrote: > I'm using apache spark streaming with the kafka direct consumer. The data > stream I'm receiving is log data that includes a header with each block of > messages. Each DStream can therefore have many blocks of messages, each with > it's own header. > > The header is used to know how to interpret the following fields in the > block of messages. My challenge is that I'm building up (K,V) pairs that are > processed by reduceByKey() and I use this header to know how to parse the > fields that follow the header into the (K,V) pairs. > > So each message received by kakfa may appear as follows (# denotes the > header field, \n denotes new line): > #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5 > field6 field7\data4 data5 data6 data7\n... > > Is there a way, without collecting all data back to the driver, to "grab" > the header and use it to subsequently process the messages that follow the > header until a new #fields comes along, rinse, repeat? > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org