I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
     if  // it's a header
        parser = someParserBasedOn(line)
    else
       items += parser.parse(line)
 }
 items.iterator
}

On Mon, Nov 7, 2016 at 4:22 PM, coolgar <karllbunn...@gmail.com> wrote:
> I'm using apache spark streaming with the kafka direct consumer. The data
> stream I'm receiving is log data that includes a header with each block of
> messages. Each DStream can therefore have many blocks of messages, each with
> it's own header.
>
> The header is used to know how to interpret the following fields in the
> block of messages. My challenge is that I'm building up (K,V) pairs that are
> processed by reduceByKey() and I use this header to know how to parse the
> fields that follow the header into the (K,V) pairs.
>
> So each message received by kakfa may appear as follows (# denotes the
> header field, \n denotes new line):
> #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
> field6 field7\data4 data5 data6 data7\n...
>
> Is there a way, without collecting all data back to the driver, to "grab"
> the header and use it to subsequently process the messages that follow the
> header until a new #fields comes along, rinse, repeat?
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to