Show the full relevant code including imports.

On Fri, Apr 22, 2016 at 4:46 PM, Mich Talebzadeh
<mich.talebza...@gmail.com> wrote:
> Hi Cody,
>
> This is my first attempt on using offset ranges (this may not mean much in
> my context at the moment)
>
> val ssc = new StreamingContext(conf, Seconds(10))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092",
> "schema.registry.url" -> "http://rhes564:8081";, "zookeeper.connect" ->
> "rhes564:2181", "group.id" -> "StreamTest" )
> val topics = Set("newtopic", "newtopic")
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topics)
> dstream.cache()
> val lines = dstream.map(_._2)
> val showResults = lines.filter(_.contains("statement cache")).flatMap(line
> => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> // Define the offset ranges to read in the batch job. Just one offset range
> val offsetRanges = Array(
>   OffsetRange("newtopic", 0, 110, 220)
> )
> // Create the RDD based on the offset ranges
> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
> StringDecoder](sc, kafkaParams, offsetRanges)
>
>
> This comes back with error
>
> [info] Compiling 1 Scala source to
> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
> not found: value OffsetRange
> [error]   OffsetRange("newtopic", 0, 110, 220),
> [error]   ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> Any ideas will be appreciated
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 22 April 2016 at 22:04, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Spark streaming as it exists today is always microbatch.
>>
>> You can certainly filter messages using spark streaming.
>>
>>
>> On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh
>> <mich.talebza...@gmail.com> wrote:
>> > yep actually using createDirectStream sounds a better way of doing it.
>> > Am I
>> > correct that createDirectStream was introduced to overcome
>> > micro-batching
>> > limitations?
>> >
>> > In a nutshell I want to pickup all the messages and keep signal
>> > according to
>> > pre-built criteria (say indicating a buy signal) and ignore the
>> > pedestals
>> >
>> > Thanks
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> >
>> >
>> > On 22 April 2016 at 21:56, Cody Koeninger <c...@koeninger.org> wrote:
>> >>
>> >> You can still do sliding windows with createDirectStream, just do your
>> >> map / extraction of fields before the window.
>> >>
>> >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
>> >> <mich.talebza...@gmail.com> wrote:
>> >> > Hi Cody,
>> >> >
>> >> > I want to use sliding windows for Complex Event Processing
>> >> > micro-batching
>> >> >
>> >> > Dr Mich Talebzadeh
>> >> >
>> >> >
>> >> >
>> >> > LinkedIn
>> >> >
>> >> >
>> >> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >> >
>> >> >
>> >> >
>> >> > http://talebzadehmich.wordpress.com
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On 22 April 2016 at 21:51, Cody Koeninger <c...@koeninger.org> wrote:
>> >> >>
>> >> >> Why are you wanting to convert?
>> >> >>
>> >> >> As far as doing the conversion, createStream doesn't take the same
>> >> >> arguments, look at the docs.
>> >> >>
>> >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
>> >> >> <mich.talebza...@gmail.com> wrote:
>> >> >> > Hi,
>> >> >> >
>> >> >> > What is the best way of converting this program of that uses
>> >> >> > KafkaUtils.createDirectStream to Sliding window using
>> >> >> >
>> >> >> > val dstream = KafkaUtils.createDirectStream[String, String,
>> >> >> > StringDecoder,
>> >> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> >> >
>> >> >> > to
>> >> >> >
>> >> >> > val dstream = KafkaUtils.createStream[String, String,
>> >> >> > StringDecoder,
>> >> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> >> >
>> >> >> >
>> >> >> > The program below works
>> >> >> >
>> >> >> >
>> >> >> > import org.apache.spark.SparkContext
>> >> >> > import org.apache.spark.SparkConf
>> >> >> > import org.apache.spark.sql.Row
>> >> >> > import org.apache.spark.sql.hive.HiveContext
>> >> >> > import org.apache.spark.sql.types._
>> >> >> > import org.apache.spark.sql.SQLContext
>> >> >> > import org.apache.spark.sql.functions._
>> >> >> > import _root_.kafka.serializer.StringDecoder
>> >> >> > import org.apache.spark.streaming._
>> >> >> > import org.apache.spark.streaming.kafka.KafkaUtils
>> >> >> > //
>> >> >> > object CEP_assembly {
>> >> >> >   def main(args: Array[String]) {
>> >> >> >   val conf = new SparkConf().
>> >> >> >                setAppName("CEP_assembly").
>> >> >> >                setMaster("local[2]").
>> >> >> >                set("spark.driver.allowMultipleContexts", "true").
>> >> >> >                set("spark.hadoop.validateOutputSpecs", "false")
>> >> >> >   val sc = new SparkContext(conf)
>> >> >> >   // Create sqlContext based on HiveContext
>> >> >> >   val sqlContext = new HiveContext(sc)
>> >> >> >   import sqlContext.implicits._
>> >> >> >   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> >> >> >   println ("\nStarted at"); sqlContext.sql("SELECT
>> >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> >> >> > ").collect.foreach(println)
>> >> >> > val ssc = new StreamingContext(conf, Seconds(1))
>> >> >> > ssc.checkpoint("checkpoint")
>> >> >> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> >> >> > "rhes564:9092",
>> >> >> > "schema.registry.url" -> "http://rhes564:8081";,
>> >> >> > "zookeeper.connect"
>> >> >> > ->
>> >> >> > "rhes564:2181", "group.id" -> "StreamTest" )
>> >> >> > val topic = Set("newtopic")
>> >> >> > //val dstream = KafkaUtils.createStream[String, String,
>> >> >> > StringDecoder,
>> >> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> >> > val dstream = KafkaUtils.createDirectStream[String, String,
>> >> >> > StringDecoder,
>> >> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> >> > dstream.cache()
>> >> >> > //val windowed_dstream = dstream.window(new
>> >> >> > Duration(sliding_window_length),
>> >> >> > new Duration(sliding_window_interval))
>> >> >> > dstream.print(1000)
>> >> >> > val lines = dstream.map(_._2)
>> >> >> > // Check for message
>> >> >> > val showResults = lines.filter(_.contains("Sending
>> >> >> > dstream")).flatMap(line
>> >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> >> >> > _).print(1000)
>> >> >> > // Check for statement cache
>> >> >> > val showResults2 = lines.filter(_.contains("statement
>> >> >> > cache")).flatMap(line
>> >> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> >> >> > _).print(1000)
>> >> >> > ssc.start()
>> >> >> > ssc.awaitTermination()
>> >> >> > //ssc.stop()
>> >> >> >   println ("\nFinished at"); sqlContext.sql("SELECT
>> >> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> >> >> > ").collect.foreach(println)
>> >> >> >   }
>> >> >> > }
>> >> >> >
>> >> >> > Thanks
>> >> >> >
>> >> >> >
>> >> >> > Dr Mich Talebzadeh
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > LinkedIn
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > http://talebzadehmich.wordpress.com
>> >> >> >
>> >> >> >
>> >> >
>> >> >
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to