Show the full relevant code including imports. On Fri, Apr 22, 2016 at 4:46 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Cody, > > This is my first attempt on using offset ranges (this may not mean much in > my context at the moment) > > val ssc = new StreamingContext(conf, Seconds(10)) > ssc.checkpoint("checkpoint") > val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", > "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> > "rhes564:2181", "group.id" -> "StreamTest" ) > val topics = Set("newtopic", "newtopic") > val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topics) > dstream.cache() > val lines = dstream.map(_._2) > val showResults = lines.filter(_.contains("statement cache")).flatMap(line > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _) > // Define the offset ranges to read in the batch job. Just one offset range > val offsetRanges = Array( > OffsetRange("newtopic", 0, 110, 220) > ) > // Create the RDD based on the offset ranges > val rdd = KafkaUtils.createRDD[String, String, StringDecoder, > StringDecoder](sc, kafkaParams, offsetRanges) > > > This comes back with error > > [info] Compiling 1 Scala source to > /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes... > [error] > /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37: > not found: value OffsetRange > [error] OffsetRange("newtopic", 0, 110, 220), > [error] ^ > [error] one error found > [error] (compile:compileIncremental) Compilation failed > > Any ideas will be appreciated > > > Dr Mich Talebzadeh > > > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > http://talebzadehmich.wordpress.com > > > > > On 22 April 2016 at 22:04, Cody Koeninger <c...@koeninger.org> wrote: >> >> Spark streaming as it exists today is always microbatch. >> >> You can certainly filter messages using spark streaming. >> >> >> On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh >> <mich.talebza...@gmail.com> wrote: >> > yep actually using createDirectStream sounds a better way of doing it. >> > Am I >> > correct that createDirectStream was introduced to overcome >> > micro-batching >> > limitations? >> > >> > In a nutshell I want to pickup all the messages and keep signal >> > according to >> > pre-built criteria (say indicating a buy signal) and ignore the >> > pedestals >> > >> > Thanks >> > >> > Dr Mich Talebzadeh >> > >> > >> > >> > LinkedIn >> > >> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> > >> > >> > >> > http://talebzadehmich.wordpress.com >> > >> > >> > >> > >> > On 22 April 2016 at 21:56, Cody Koeninger <c...@koeninger.org> wrote: >> >> >> >> You can still do sliding windows with createDirectStream, just do your >> >> map / extraction of fields before the window. >> >> >> >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh >> >> <mich.talebza...@gmail.com> wrote: >> >> > Hi Cody, >> >> > >> >> > I want to use sliding windows for Complex Event Processing >> >> > micro-batching >> >> > >> >> > Dr Mich Talebzadeh >> >> > >> >> > >> >> > >> >> > LinkedIn >> >> > >> >> > >> >> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >> > >> >> > >> >> > >> >> > http://talebzadehmich.wordpress.com >> >> > >> >> > >> >> > >> >> > >> >> > On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote: >> >> >> >> >> >> Why are you wanting to convert? >> >> >> >> >> >> As far as doing the conversion, createStream doesn't take the same >> >> >> arguments, look at the docs. >> >> >> >> >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh >> >> >> <mich.talebza...@gmail.com> wrote: >> >> >> > Hi, >> >> >> > >> >> >> > What is the best way of converting this program of that uses >> >> >> > KafkaUtils.createDirectStream to Sliding window using >> >> >> > >> >> >> > val dstream = KafkaUtils.createDirectStream[String, String, >> >> >> > StringDecoder, >> >> >> > StringDecoder](ssc, kafkaParams, topic) >> >> >> > >> >> >> > to >> >> >> > >> >> >> > val dstream = KafkaUtils.createStream[String, String, >> >> >> > StringDecoder, >> >> >> > StringDecoder](ssc, kafkaParams, topic) >> >> >> > >> >> >> > >> >> >> > The program below works >> >> >> > >> >> >> > >> >> >> > import org.apache.spark.SparkContext >> >> >> > import org.apache.spark.SparkConf >> >> >> > import org.apache.spark.sql.Row >> >> >> > import org.apache.spark.sql.hive.HiveContext >> >> >> > import org.apache.spark.sql.types._ >> >> >> > import org.apache.spark.sql.SQLContext >> >> >> > import org.apache.spark.sql.functions._ >> >> >> > import _root_.kafka.serializer.StringDecoder >> >> >> > import org.apache.spark.streaming._ >> >> >> > import org.apache.spark.streaming.kafka.KafkaUtils >> >> >> > // >> >> >> > object CEP_assembly { >> >> >> > def main(args: Array[String]) { >> >> >> > val conf = new SparkConf(). >> >> >> > setAppName("CEP_assembly"). >> >> >> > setMaster("local[2]"). >> >> >> > set("spark.driver.allowMultipleContexts", "true"). >> >> >> > set("spark.hadoop.validateOutputSpecs", "false") >> >> >> > val sc = new SparkContext(conf) >> >> >> > // Create sqlContext based on HiveContext >> >> >> > val sqlContext = new HiveContext(sc) >> >> >> > import sqlContext.implicits._ >> >> >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> >> >> > println ("\nStarted at"); sqlContext.sql("SELECT >> >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> >> >> > ").collect.foreach(println) >> >> >> > val ssc = new StreamingContext(conf, Seconds(1)) >> >> >> > ssc.checkpoint("checkpoint") >> >> >> > val kafkaParams = Map[String, String]("bootstrap.servers" -> >> >> >> > "rhes564:9092", >> >> >> > "schema.registry.url" -> "http://rhes564:8081", >> >> >> > "zookeeper.connect" >> >> >> > -> >> >> >> > "rhes564:2181", "group.id" -> "StreamTest" ) >> >> >> > val topic = Set("newtopic") >> >> >> > //val dstream = KafkaUtils.createStream[String, String, >> >> >> > StringDecoder, >> >> >> > StringDecoder](ssc, kafkaParams, topic) >> >> >> > val dstream = KafkaUtils.createDirectStream[String, String, >> >> >> > StringDecoder, >> >> >> > StringDecoder](ssc, kafkaParams, topic) >> >> >> > dstream.cache() >> >> >> > //val windowed_dstream = dstream.window(new >> >> >> > Duration(sliding_window_length), >> >> >> > new Duration(sliding_window_interval)) >> >> >> > dstream.print(1000) >> >> >> > val lines = dstream.map(_._2) >> >> >> > // Check for message >> >> >> > val showResults = lines.filter(_.contains("Sending >> >> >> > dstream")).flatMap(line >> >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> >> >> > _).print(1000) >> >> >> > // Check for statement cache >> >> >> > val showResults2 = lines.filter(_.contains("statement >> >> >> > cache")).flatMap(line >> >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + >> >> >> > _).print(1000) >> >> >> > ssc.start() >> >> >> > ssc.awaitTermination() >> >> >> > //ssc.stop() >> >> >> > println ("\nFinished at"); sqlContext.sql("SELECT >> >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> >> >> > ").collect.foreach(println) >> >> >> > } >> >> >> > } >> >> >> > >> >> >> > Thanks >> >> >> > >> >> >> > >> >> >> > Dr Mich Talebzadeh >> >> >> > >> >> >> > >> >> >> > >> >> >> > LinkedIn >> >> >> > >> >> >> > >> >> >> > >> >> >> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >> >> > >> >> >> > >> >> >> > >> >> >> > http://talebzadehmich.wordpress.com >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > > >
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org