Thanks Chiwan. It worked. Now I have this simple streaming program in Spark Scala that gets streaming data via Kafka. It is pretty simple. Please see attached.
I am trying to make it work with Flink + Kafka Any hints will be appreciated. Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 18 April 2016 at 02:43, Chiwan Park <chiwanp...@apache.org> wrote: > Hi Mich, > > You can add external dependencies to Scala shell using `--addclasspath` > option. There is more detail description in documentation [1]. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/scala_shell.html#adding-external-dependencies > > Regards, > Chiwan Park > > > On Apr 17, 2016, at 6:04 PM, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > > > > Hi, > > > > IN Spark shell I can load Kafka jar file through spark-shell option --jar > > > > spark-shell --master spark://50.140.197.217:7077 --jars > ,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar > > > > This works fine. > > > > In Flink I have added the jar file > /home/hduser/jars/flink-connector-kafka-0.10.1.jar to the CLASSPATH. > > > > However I don't get any support for it within flink shell > > > > Scala-Flink> import org.apache.flink.streaming.connectors.kafka > > <console>:54: error: object connectors is not a member of package > org.apache.flink.streaming > > import org.apache.flink.streaming.connectors.kafka > > > > > > Any ideas will be appreciated > > ^ > > > > Dr Mich Talebzadeh > > > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > http://talebzadehmich.wordpress.com > > > >
import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import _root_.kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils // object TestStream_assembly { def main(args: Array[String]) { val conf = new SparkConf(). setAppName("TestStream_assembly"). setMaster("local[2]"). set("spark.driver.allowMultipleContexts", "true"). set("spark.hadoop.validateOutputSpecs", "false") val sc = new SparkContext(conf) // Create sqlContext based on HiveContext val sqlContext = new HiveContext(sc) import sqlContext.implicits._ val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val ssc = new StreamingContext(conf, Seconds(55)) val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) val topic = Set("newtopic") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) messages.cache() // // Get the lines // val lines = messages.map(_._2) // Check for message val showResults = lines.filter(_.contains("Sending messages")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000) ssc.start() ssc.awaitTermination() //ssc.stop() } }