Thanks Chiwan. It worked.

Now I have this simple streaming program in Spark Scala that gets streaming
data via Kafka. It is pretty simple. Please see attached.

I am trying to make it work with Flink + Kafka

Any hints will be appreciated.

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 18 April 2016 at 02:43, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Mich,
>
> You can add external dependencies to Scala shell using `--addclasspath`
> option. There is more detail description in documentation [1].
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/scala_shell.html#adding-external-dependencies
>
> Regards,
> Chiwan Park
>
> > On Apr 17, 2016, at 6:04 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > IN Spark shell I can load Kafka jar file through spark-shell option --jar
> >
> > spark-shell --master spark://50.140.197.217:7077 --jars
> ,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar
> >
> > This works fine.
> >
> > In Flink I have added the jar file
> /home/hduser/jars/flink-connector-kafka-0.10.1.jar to the CLASSPATH.
> >
> > However I don't get any support for it within flink shell
> >
> > Scala-Flink> import org.apache.flink.streaming.connectors.kafka
> > <console>:54: error: object connectors is not a member of package
> org.apache.flink.streaming
> >             import org.apache.flink.streaming.connectors.kafka
> >
> >
> > Any ideas will be appreciated
> >                                               ^
> >
> > Dr Mich Talebzadeh
> >
> > LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> > http://talebzadehmich.wordpress.com
> >
>
>
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
object TestStream_assembly {
  def main(args: Array[String]) {
  val conf = new SparkConf().
               setAppName("TestStream_assembly").
               setMaster("local[2]").
               set("spark.driver.allowMultipleContexts", "true").
               set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val ssc = new StreamingContext(conf, Seconds(55))

val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", 
"schema.registry.url" -> "http://rhes564:8081";, "zookeeper.connect" -> 
"rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topic)
messages.cache()
//
// Get the lines
//
val lines = messages.map(_._2)
// Check for message
val showResults = lines.filter(_.contains("Sending messages")).flatMap(line => 
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}

Reply via email to