Hi Mich, Thanks a lot for your response. I am basically trying to get some
older code(streaming job to read from kafka) in 2.0.1 spark to work in
3.0,1. The specific area where I am having problem (KafkaCluster) has most
likely to do with get/ set commit offsets in kafka



// Create message Dstream for each (topic, schema class)


    val msgStreams = config.getTopicSchemaClassMap.map {


      case (kafkaTopic, schemaClass) => {


        val consumerOffsets = *getConsumerOffsets*(kafkaTopic)


        val msgDStream = (KafkaUtils.createDirectStream[Array[Byte],
Array[Byte], DefaultDecoder, DefaultDecoder,

          Tuple2[Array[Byte],Array[Byte]]]


          (ssc, kafkaParams, consumerOffsets,


          (msg: MessageAndMetadata[Array[Byte], Array[Byte]]) => (msg.key,
msg.message)

          ))


        (kafkaTopic, schemaClass, msgDStream)


      }


    }




*The getConsumerOffsets  *method  internally used KafkaCluter which is
probably deprecated.



Do You think I need to mimic the code shown here to get/set offsets rather
than use kafkaCluster?


https://spark.apache.org/docs/3.0.0-preview/streaming-kafka-0-10-integration.html


Thanks

Kiran

On Mon, Jun 7, 2021 at 1:04 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Hi,
>
> Are you trying to read topics from Kafka in spark 3.0.1?
>
> Have you checked Spark 3.0.1 documentation?
>
> Integrating Spark with Kafka is pretty straight forward. with 3.0.1 and
> higher
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 6 Jun 2021 at 21:18, Kiran Biswal <biswalki...@gmail.com> wrote:
>
>> *I am using spark 3.0.1 AND Kafka 0.10 AND Scala 2.12. Getting an error
>> related to KafkaCluster (not found: type KafkaCluster). Is this class
>> deprecated? How do I find a replacement?*
>>
>> *I am upgrading from spark 2.0.1 to spark 3.0.1*
>>
>> *In spark 2.0.1 KafkaCluster was supported*
>>
>> https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/streaming/kafka/KafkaCluster.html
>>
>> just looking for ideas how to achieve same functionality in spark 3.0.1.
>> Any thoughts and examples will be highly appreciated.
>>
>> Thanks
>> Kiran
>>
>

Reply via email to