c, kafkaParams, topics)
>>
>> val messages= KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>>
>>
>>
>> //val directKafkaStream = KafkaUtils.createDirectStream[
>> //[key class], [valu
, [value class], [key decoder class], [value decoder
class] ](
//streamingContext, [map of Kafka parameters], [set of topics to
consume])
}
}
Thanks
Sri
--
View this message in context:
http://apache-spark-user-list.1001560.n3
-createDirectStream-issue-tp23456p23467.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h
ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>> at
>>> java.io.ObjectInputStream.read
.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>> --
>>
>>
>> Best regards and thanks in advance for any help.
>>
>>
>> ----------
>> If you reply to this email, your message
ler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
&g
che.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
>
> scala.collection.m
eduler.abortStage(DAGScheduler.scala:1256)
--
Best regards and thanks in advance for any help.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html
Sent from the Apache Spa