Hi Team,

I am trying to implement below kafka-console-consumer command(works well &
output the intended json data) functionality in the form of program using
spark streaming.

kafka-console-consumer.sh --zookeeper host.xxxx.com:2181,host.xxxx.com:2181
--topic mytopic --formatter CustomAvroMessageFormatter --property
"formatter-schema-file= schema.txt" > /var/tmp/myfile.json&

I am able to read message from above topic using KafkaUtils spark streaming
programatically as below using spark scala code which works well:

object ConsumeTest {

def main(args: Array[String]) {
  val sc = new SparkContext("local[*]", "ConsumeKafkaMsg")
  sc.setLogLevel("ERROR")
  val ssc = new StreamingContext(sc, Seconds(1))

  //To read from server
  val kafkaParams = Map("metadata.broker.list" -> "brokername:9092")
  val topics = List("mytopic").toSet

  val lines = KafkaUtils.createDirectStream[
   String, String, StringDecoder, StringDecoder](ssc, kafkaParams,
topics).map(_._2)

  lines.print()

  ssc.start()
  ssc.awaitTermination()
  }

}

However above program read message in raw format something similar to below:

��Cߣ�ߕ'윺~�_,��M˶/��Ѯ!򇜾�Vcusomtername client
2X3XXXXXX-sasadsad-4673-212c-dsdsadsad
value
,"question"logName
successstԇ���V

against above command use custom kafka message formatter to convert raw
format to json format using avro schema. I am unable to find out how to use
command equivalent to message formatter in my above program which is
important to achieve.

Below is the probable avro schema(schema.txt) for reference(actually v
complex what is available to process):

{
  "type" : "record",
  "namespace" : "mynamespace",
  "name" : "myname",
  "fields" : [{
    "name":"field1",
    "type":{
      "type":"record",
      "name":"Eventfield1",
      "fields":[{.....}]
    }]
  ]
}

Please help to implement the same.

Regards,
Kush

Reply via email to