Hi Mich, Thanks a lot for your response. I am basically trying to get some older code(streaming job to read from kafka) in 2.0.1 spark to work in 3.0,1. The specific area where I am having problem (KafkaCluster) has most likely to do with get/ set commit offsets in kafka
// Create message Dstream for each (topic, schema class) val msgStreams = config.getTopicSchemaClassMap.map { case (kafkaTopic, schemaClass) => { val consumerOffsets = *getConsumerOffsets*(kafkaTopic) val msgDStream = (KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, Tuple2[Array[Byte],Array[Byte]]] (ssc, kafkaParams, consumerOffsets, (msg: MessageAndMetadata[Array[Byte], Array[Byte]]) => (msg.key, msg.message) )) (kafkaTopic, schemaClass, msgDStream) } } *The getConsumerOffsets *method internally used KafkaCluter which is probably deprecated. Do You think I need to mimic the code shown here to get/set offsets rather than use kafkaCluster? https://spark.apache.org/docs/3.0.0-preview/streaming-kafka-0-10-integration.html Thanks Kiran On Mon, Jun 7, 2021 at 1:04 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > Hi, > > Are you trying to read topics from Kafka in spark 3.0.1? > > Have you checked Spark 3.0.1 documentation? > > Integrating Spark with Kafka is pretty straight forward. with 3.0.1 and > higher > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 6 Jun 2021 at 21:18, Kiran Biswal <biswalki...@gmail.com> wrote: > >> *I am using spark 3.0.1 AND Kafka 0.10 AND Scala 2.12. Getting an error >> related to KafkaCluster (not found: type KafkaCluster). Is this class >> deprecated? How do I find a replacement?* >> >> *I am upgrading from spark 2.0.1 to spark 3.0.1* >> >> *In spark 2.0.1 KafkaCluster was supported* >> >> https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/streaming/kafka/KafkaCluster.html >> >> just looking for ideas how to achieve same functionality in spark 3.0.1. >> Any thoughts and examples will be highly appreciated. >> >> Thanks >> Kiran >> >