Hi Team,
I am trying to implement below kafka-console-consumer command(works well &
output the intended json data) functionality in the form of program using
spark streaming.
kafka-console-consumer.sh --zookeeper host.xxxx.com:2181,host.xxxx.com:2181
--topic mytopic --formatter CustomAvroMessageFormatter --property
"formatter-schema-file= schema.txt" > /var/tmp/myfile.json&
I am able to read message from above topic using KafkaUtils spark streaming
programatically as below using spark scala code which works well:
object ConsumeTest {
def main(args: Array[String]) {
val sc = new SparkContext("local[*]", "ConsumeKafkaMsg")
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(1))
//To read from server
val kafkaParams = Map("metadata.broker.list" -> "brokername:9092")
val topics = List("mytopic").toSet
val lines = KafkaUtils.createDirectStream[
String, String, StringDecoder, StringDecoder](ssc, kafkaParams,
topics).map(_._2)
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
However above program read message in raw format something similar to below:
��Cߣ�ߕ'윺~�_,��M˶/��Ѯ!�Vcusomtername client
2X3XXXXXX-sasadsad-4673-212c-dsdsadsad
value
,"question"logName
successstԇ���V
against above command use custom kafka message formatter to convert raw
format to json format using avro schema. I am unable to find out how to use
command equivalent to message formatter in my above program which is
important to achieve.
Below is the probable avro schema(schema.txt) for reference(actually v
complex what is available to process):
{
"type" : "record",
"namespace" : "mynamespace",
"name" : "myname",
"fields" : [{
"name":"field1",
"type":{
"type":"record",
"name":"Eventfield1",
"fields":[{.....}]
}]
]
}
Please help to implement the same.
Regards,
Kush