thanks I sorted this out.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 25 April 2016 at 15:20, Cody Koeninger <c...@koeninger.org> wrote:

> Show the full relevant code including imports.
>
> On Fri, Apr 22, 2016 at 4:46 PM, Mich Talebzadeh
> <mich.talebza...@gmail.com> wrote:
> > Hi Cody,
> >
> > This is my first attempt on using offset ranges (this may not mean much
> in
> > my context at the moment)
> >
> > val ssc = new StreamingContext(conf, Seconds(10))
> > ssc.checkpoint("checkpoint")
> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092",
> > "schema.registry.url" -> "http://rhes564:8081";, "zookeeper.connect" ->
> > "rhes564:2181", "group.id" -> "StreamTest" )
> > val topics = Set("newtopic", "newtopic")
> > val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topics)
> > dstream.cache()
> > val lines = dstream.map(_._2)
> > val showResults = lines.filter(_.contains("statement
> cache")).flatMap(line
> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> > // Define the offset ranges to read in the batch job. Just one offset
> range
> > val offsetRanges = Array(
> >   OffsetRange("newtopic", 0, 110, 220)
> > )
> > // Create the RDD based on the offset ranges
> > val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
> > StringDecoder](sc, kafkaParams, offsetRanges)
> >
> >
> > This comes back with error
> >
> > [info] Compiling 1 Scala source to
> > /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
> > [error]
> >
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
> > not found: value OffsetRange
> > [error]   OffsetRange("newtopic", 0, 110, 220),
> > [error]   ^
> > [error] one error found
> > [error] (compile:compileIncremental) Compilation failed
> >
> > Any ideas will be appreciated
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 22 April 2016 at 22:04, Cody Koeninger <c...@koeninger.org> wrote:
> >>
> >> Spark streaming as it exists today is always microbatch.
> >>
> >> You can certainly filter messages using spark streaming.
> >>
> >>
> >> On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh
> >> <mich.talebza...@gmail.com> wrote:
> >> > yep actually using createDirectStream sounds a better way of doing it.
> >> > Am I
> >> > correct that createDirectStream was introduced to overcome
> >> > micro-batching
> >> > limitations?
> >> >
> >> > In a nutshell I want to pickup all the messages and keep signal
> >> > according to
> >> > pre-built criteria (say indicating a buy signal) and ignore the
> >> > pedestals
> >> >
> >> > Thanks
> >> >
> >> > Dr Mich Talebzadeh
> >> >
> >> >
> >> >
> >> > LinkedIn
> >> >
> >> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >
> >> >
> >> >
> >> > http://talebzadehmich.wordpress.com
> >> >
> >> >
> >> >
> >> >
> >> > On 22 April 2016 at 21:56, Cody Koeninger <c...@koeninger.org> wrote:
> >> >>
> >> >> You can still do sliding windows with createDirectStream, just do
> your
> >> >> map / extraction of fields before the window.
> >> >>
> >> >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
> >> >> <mich.talebza...@gmail.com> wrote:
> >> >> > Hi Cody,
> >> >> >
> >> >> > I want to use sliding windows for Complex Event Processing
> >> >> > micro-batching
> >> >> >
> >> >> > Dr Mich Talebzadeh
> >> >> >
> >> >> >
> >> >> >
> >> >> > LinkedIn
> >> >> >
> >> >> >
> >> >> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >> >
> >> >> >
> >> >> >
> >> >> > http://talebzadehmich.wordpress.com
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org>
> wrote:
> >> >> >>
> >> >> >> Why are you wanting to convert?
> >> >> >>
> >> >> >> As far as doing the conversion, createStream doesn't take the same
> >> >> >> arguments, look at the docs.
> >> >> >>
> >> >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
> >> >> >> <mich.talebza...@gmail.com> wrote:
> >> >> >> > Hi,
> >> >> >> >
> >> >> >> > What is the best way of converting this program of that uses
> >> >> >> > KafkaUtils.createDirectStream to Sliding window using
> >> >> >> >
> >> >> >> > val dstream = KafkaUtils.createDirectStream[String, String,
> >> >> >> > StringDecoder,
> >> >> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >> >> >
> >> >> >> > to
> >> >> >> >
> >> >> >> > val dstream = KafkaUtils.createStream[String, String,
> >> >> >> > StringDecoder,
> >> >> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >> >> >
> >> >> >> >
> >> >> >> > The program below works
> >> >> >> >
> >> >> >> >
> >> >> >> > import org.apache.spark.SparkContext
> >> >> >> > import org.apache.spark.SparkConf
> >> >> >> > import org.apache.spark.sql.Row
> >> >> >> > import org.apache.spark.sql.hive.HiveContext
> >> >> >> > import org.apache.spark.sql.types._
> >> >> >> > import org.apache.spark.sql.SQLContext
> >> >> >> > import org.apache.spark.sql.functions._
> >> >> >> > import _root_.kafka.serializer.StringDecoder
> >> >> >> > import org.apache.spark.streaming._
> >> >> >> > import org.apache.spark.streaming.kafka.KafkaUtils
> >> >> >> > //
> >> >> >> > object CEP_assembly {
> >> >> >> >   def main(args: Array[String]) {
> >> >> >> >   val conf = new SparkConf().
> >> >> >> >                setAppName("CEP_assembly").
> >> >> >> >                setMaster("local[2]").
> >> >> >> >                set("spark.driver.allowMultipleContexts",
> "true").
> >> >> >> >                set("spark.hadoop.validateOutputSpecs", "false")
> >> >> >> >   val sc = new SparkContext(conf)
> >> >> >> >   // Create sqlContext based on HiveContext
> >> >> >> >   val sqlContext = new HiveContext(sc)
> >> >> >> >   import sqlContext.implicits._
> >> >> >> >   val HiveContext = new
> org.apache.spark.sql.hive.HiveContext(sc)
> >> >> >> >   println ("\nStarted at"); sqlContext.sql("SELECT
> >> >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> >> >> >> > ").collect.foreach(println)
> >> >> >> > val ssc = new StreamingContext(conf, Seconds(1))
> >> >> >> > ssc.checkpoint("checkpoint")
> >> >> >> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
> >> >> >> > "rhes564:9092",
> >> >> >> > "schema.registry.url" -> "http://rhes564:8081";,
> >> >> >> > "zookeeper.connect"
> >> >> >> > ->
> >> >> >> > "rhes564:2181", "group.id" -> "StreamTest" )
> >> >> >> > val topic = Set("newtopic")
> >> >> >> > //val dstream = KafkaUtils.createStream[String, String,
> >> >> >> > StringDecoder,
> >> >> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >> >> > val dstream = KafkaUtils.createDirectStream[String, String,
> >> >> >> > StringDecoder,
> >> >> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >> >> > dstream.cache()
> >> >> >> > //val windowed_dstream = dstream.window(new
> >> >> >> > Duration(sliding_window_length),
> >> >> >> > new Duration(sliding_window_interval))
> >> >> >> > dstream.print(1000)
> >> >> >> > val lines = dstream.map(_._2)
> >> >> >> > // Check for message
> >> >> >> > val showResults = lines.filter(_.contains("Sending
> >> >> >> > dstream")).flatMap(line
> >> >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> >> >> >> > _).print(1000)
> >> >> >> > // Check for statement cache
> >> >> >> > val showResults2 = lines.filter(_.contains("statement
> >> >> >> > cache")).flatMap(line
> >> >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> >> >> >> > _).print(1000)
> >> >> >> > ssc.start()
> >> >> >> > ssc.awaitTermination()
> >> >> >> > //ssc.stop()
> >> >> >> >   println ("\nFinished at"); sqlContext.sql("SELECT
> >> >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> >> >> >> > ").collect.foreach(println)
> >> >> >> >   }
> >> >> >> > }
> >> >> >> >
> >> >> >> > Thanks
> >> >> >> >
> >> >> >> >
> >> >> >> > Dr Mich Talebzadeh
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > LinkedIn
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > http://talebzadehmich.wordpress.com
> >> >> >> >
> >> >> >> >
> >> >> >
> >> >> >
> >> >
> >> >
> >
> >
>

Reply via email to