Hi Team, I am trying to implement below kafka-console-consumer command(works well & output the intended json data) functionality in the form of program using spark streaming.
kafka-console-consumer.sh --zookeeper host.xxxx.com:2181,host.xxxx.com:2181 --topic mytopic --formatter CustomAvroMessageFormatter --property "formatter-schema-file= schema.txt" > /var/tmp/myfile.json& I am able to read message from above topic using KafkaUtils spark streaming programatically as below using spark scala code which works well: object ConsumeTest { def main(args: Array[String]) { val sc = new SparkContext("local[*]", "ConsumeKafkaMsg") sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc, Seconds(1)) //To read from server val kafkaParams = Map("metadata.broker.list" -> "brokername:9092") val topics = List("mytopic").toSet val lines = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2) lines.print() ssc.start() ssc.awaitTermination() } } However above program read message in raw format something similar to below: ��Cߣ�ߕ'윺~�_,��M˶/��Ѯ!�Vcusomtername client 2X3XXXXXX-sasadsad-4673-212c-dsdsadsad value ,"question"logName successstԇ���V against above command use custom kafka message formatter to convert raw format to json format using avro schema. I am unable to find out how to use command equivalent to message formatter in my above program which is important to achieve. Below is the probable avro schema(schema.txt) for reference(actually v complex what is available to process): { "type" : "record", "namespace" : "mynamespace", "name" : "myname", "fields" : [{ "name":"field1", "type":{ "type":"record", "name":"Eventfield1", "fields":[{.....}] }] ] } Please help to implement the same. Regards, Kush