thanks I sorted this out. Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 25 April 2016 at 15:20, Cody Koeninger <c...@koeninger.org> wrote: > Show the full relevant code including imports. > > On Fri, Apr 22, 2016 at 4:46 PM, Mich Talebzadeh > <mich.talebza...@gmail.com> wrote: > > Hi Cody, > > > > This is my first attempt on using offset ranges (this may not mean much > in > > my context at the moment) > > > > val ssc = new StreamingContext(conf, Seconds(10)) > > ssc.checkpoint("checkpoint") > > val kafkaParams = Map[String, String]("bootstrap.servers" -> > "rhes564:9092", > > "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> > > "rhes564:2181", "group.id" -> "StreamTest" ) > > val topics = Set("newtopic", "newtopic") > > val dstream = KafkaUtils.createDirectStream[String, String, > StringDecoder, > > StringDecoder](ssc, kafkaParams, topics) > > dstream.cache() > > val lines = dstream.map(_._2) > > val showResults = lines.filter(_.contains("statement > cache")).flatMap(line > > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) > > // Define the offset ranges to read in the batch job. Just one offset > range > > val offsetRanges = Array( > > OffsetRange("newtopic", 0, 110, 220) > > ) > > // Create the RDD based on the offset ranges > > val rdd = KafkaUtils.createRDD[String, String, StringDecoder, > > StringDecoder](sc, kafkaParams, offsetRanges) > > > > > > This comes back with error > > > > [info] Compiling 1 Scala source to > > /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes... > > [error] > > > /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37: > > not found: value OffsetRange > > [error] OffsetRange("newtopic", 0, 110, 220), > > [error] ^ > > [error] one error found > > [error] (compile:compileIncremental) Compilation failed > > > > Any ideas will be appreciated > > > > > > Dr Mich Talebzadeh > > > > > > > > LinkedIn > > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > > > > > http://talebzadehmich.wordpress.com > > > > > > > > > > On 22 April 2016 at 22:04, Cody Koeninger <c...@koeninger.org> wrote: > >> > >> Spark streaming as it exists today is always microbatch. > >> > >> You can certainly filter messages using spark streaming. > >> > >> > >> On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh > >> <mich.talebza...@gmail.com> wrote: > >> > yep actually using createDirectStream sounds a better way of doing it. > >> > Am I > >> > correct that createDirectStream was introduced to overcome > >> > micro-batching > >> > limitations? > >> > > >> > In a nutshell I want to pickup all the messages and keep signal > >> > according to > >> > pre-built criteria (say indicating a buy signal) and ignore the > >> > pedestals > >> > > >> > Thanks > >> > > >> > Dr Mich Talebzadeh > >> > > >> > > >> > > >> > LinkedIn > >> > > >> > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >> > > >> > > >> > > >> > http://talebzadehmich.wordpress.com > >> > > >> > > >> > > >> > > >> > On 22 April 2016 at 21:56, Cody Koeninger <c...@koeninger.org> wrote: > >> >> > >> >> You can still do sliding windows with createDirectStream, just do > your > >> >> map / extraction of fields before the window. > >> >> > >> >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh > >> >> <mich.talebza...@gmail.com> wrote: > >> >> > Hi Cody, > >> >> > > >> >> > I want to use sliding windows for Complex Event Processing > >> >> > micro-batching > >> >> > > >> >> > Dr Mich Talebzadeh > >> >> > > >> >> > > >> >> > > >> >> > LinkedIn > >> >> > > >> >> > > >> >> > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >> >> > > >> >> > > >> >> > > >> >> > http://talebzadehmich.wordpress.com > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> > wrote: > >> >> >> > >> >> >> Why are you wanting to convert? > >> >> >> > >> >> >> As far as doing the conversion, createStream doesn't take the same > >> >> >> arguments, look at the docs. > >> >> >> > >> >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh > >> >> >> <mich.talebza...@gmail.com> wrote: > >> >> >> > Hi, > >> >> >> > > >> >> >> > What is the best way of converting this program of that uses > >> >> >> > KafkaUtils.createDirectStream to Sliding window using > >> >> >> > > >> >> >> > val dstream = KafkaUtils.createDirectStream[String, String, > >> >> >> > StringDecoder, > >> >> >> > StringDecoder](ssc, kafkaParams, topic) > >> >> >> > > >> >> >> > to > >> >> >> > > >> >> >> > val dstream = KafkaUtils.createStream[String, String, > >> >> >> > StringDecoder, > >> >> >> > StringDecoder](ssc, kafkaParams, topic) > >> >> >> > > >> >> >> > > >> >> >> > The program below works > >> >> >> > > >> >> >> > > >> >> >> > import org.apache.spark.SparkContext > >> >> >> > import org.apache.spark.SparkConf > >> >> >> > import org.apache.spark.sql.Row > >> >> >> > import org.apache.spark.sql.hive.HiveContext > >> >> >> > import org.apache.spark.sql.types._ > >> >> >> > import org.apache.spark.sql.SQLContext > >> >> >> > import org.apache.spark.sql.functions._ > >> >> >> > import _root_.kafka.serializer.StringDecoder > >> >> >> > import org.apache.spark.streaming._ > >> >> >> > import org.apache.spark.streaming.kafka.KafkaUtils > >> >> >> > // > >> >> >> > object CEP_assembly { > >> >> >> > def main(args: Array[String]) { > >> >> >> > val conf = new SparkConf(). > >> >> >> > setAppName("CEP_assembly"). > >> >> >> > setMaster("local[2]"). > >> >> >> > set("spark.driver.allowMultipleContexts", > "true"). > >> >> >> > set("spark.hadoop.validateOutputSpecs", "false") > >> >> >> > val sc = new SparkContext(conf) > >> >> >> > // Create sqlContext based on HiveContext > >> >> >> > val sqlContext = new HiveContext(sc) > >> >> >> > import sqlContext.implicits._ > >> >> >> > val HiveContext = new > org.apache.spark.sql.hive.HiveContext(sc) > >> >> >> > println ("\nStarted at"); sqlContext.sql("SELECT > >> >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > >> >> >> > ").collect.foreach(println) > >> >> >> > val ssc = new StreamingContext(conf, Seconds(1)) > >> >> >> > ssc.checkpoint("checkpoint") > >> >> >> > val kafkaParams = Map[String, String]("bootstrap.servers" -> > >> >> >> > "rhes564:9092", > >> >> >> > "schema.registry.url" -> "http://rhes564:8081", > >> >> >> > "zookeeper.connect" > >> >> >> > -> > >> >> >> > "rhes564:2181", "group.id" -> "StreamTest" ) > >> >> >> > val topic = Set("newtopic") > >> >> >> > //val dstream = KafkaUtils.createStream[String, String, > >> >> >> > StringDecoder, > >> >> >> > StringDecoder](ssc, kafkaParams, topic) > >> >> >> > val dstream = KafkaUtils.createDirectStream[String, String, > >> >> >> > StringDecoder, > >> >> >> > StringDecoder](ssc, kafkaParams, topic) > >> >> >> > dstream.cache() > >> >> >> > //val windowed_dstream = dstream.window(new > >> >> >> > Duration(sliding_window_length), > >> >> >> > new Duration(sliding_window_interval)) > >> >> >> > dstream.print(1000) > >> >> >> > val lines = dstream.map(_._2) > >> >> >> > // Check for message > >> >> >> > val showResults = lines.filter(_.contains("Sending > >> >> >> > dstream")).flatMap(line > >> >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > >> >> >> > _).print(1000) > >> >> >> > // Check for statement cache > >> >> >> > val showResults2 = lines.filter(_.contains("statement > >> >> >> > cache")).flatMap(line > >> >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > >> >> >> > _).print(1000) > >> >> >> > ssc.start() > >> >> >> > ssc.awaitTermination() > >> >> >> > //ssc.stop() > >> >> >> > println ("\nFinished at"); sqlContext.sql("SELECT > >> >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > >> >> >> > ").collect.foreach(println) > >> >> >> > } > >> >> >> > } > >> >> >> > > >> >> >> > Thanks > >> >> >> > > >> >> >> > > >> >> >> > Dr Mich Talebzadeh > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > LinkedIn > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > http://talebzadehmich.wordpress.com > >> >> >> > > >> >> >> > > >> >> > > >> >> > > >> > > >> > > > > > >