I would suggest reading the documentation first.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.OffsetRange$

The OffsetRange class is not private.  The instance constructor is
private.  You obtain instances by using the apply method on the
companion object, ie do

OffsetRange(...)
not
new OffsetRange(...)



On Fri, Apr 22, 2016 at 6:27 PM, Mich Talebzadeh
<[email protected]> wrote:
> So is there anyway of creating an rdd without using offsetRanges? Sorry for
> lack of clarity here
>
>
> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
> StringDecoder](sc, kafkaParams, offsetRanges)
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 23 April 2016 at 00:13, Mich Talebzadeh <[email protected]>
> wrote:
>>
>> So there is really no point in using it :(
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 23 April 2016 at 00:11, Ted Yu <[email protected]> wrote:
>>>
>>> The class is private :
>>>
>>> final class OffsetRange private(
>>>
>>> On Fri, Apr 22, 2016 at 4:08 PM, Mich Talebzadeh
>>> <[email protected]> wrote:
>>>>
>>>> Ok I decided to forgo that approach and use an existing program of mine
>>>> with slight modification. The code is this
>>>>
>>>> import org.apache.spark.SparkContext
>>>> import org.apache.spark.SparkConf
>>>> import org.apache.spark.sql.Row
>>>> import org.apache.spark.sql.hive.HiveContext
>>>> import org.apache.spark.sql.types._
>>>> import org.apache.spark.sql.SQLContext
>>>> import org.apache.spark.sql.functions._
>>>> import _root_.kafka.serializer.StringDecoder
>>>> import org.apache.spark.streaming._
>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>> import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
>>>> //
>>>> object CEP_assembly {
>>>>   def main(args: Array[String]) {
>>>>   val conf = new SparkConf().
>>>>                setAppName("CEP_assembly").
>>>>                setMaster("local[2]").
>>>>                set("spark.driver.allowMultipleContexts", "true").
>>>>                set("spark.hadoop.validateOutputSpecs", "false")
>>>>   val sc = new SparkContext(conf)
>>>>   // Create sqlContext based on HiveContext
>>>>   val sqlContext = new HiveContext(sc)
>>>>   import sqlContext.implicits._
>>>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>   println ("\nStarted at"); sqlContext.sql("SELECT
>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>> ").collect.foreach(println)
>>>> val ssc = new StreamingContext(conf, Seconds(1))
>>>> ssc.checkpoint("checkpoint")
>>>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>> val topics = Set("newtopic", "newtopic")
>>>> val dstream = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>>>> dstream.cache()
>>>> val lines = dstream.map(_._2)
>>>> val showResults = lines.filter(_.contains("statement
>>>> cache")).flatMap(line => line.split("\n,")).map(word => (word,
>>>> 1)).reduceByKey(_ + _)
>>>> // Define the offset ranges to read in the batch job
>>>> val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
>>>> // Create the RDD based on the offset ranges
>>>> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
>>>> StringDecoder](sc, kafkaParams, offsetRanges)
>>>> ssc.start()
>>>> ssc.awaitTermination()
>>>> //ssc.stop()
>>>>   println ("\nFinished at"); sqlContext.sql("SELECT
>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>> ").collect.foreach(println)
>>>>   }
>>>> }
>>>>
>>>>
>>>> With sbt
>>>>
>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>>>> "provided"
>>>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
>>>> "provided"
>>>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
>>>> "provided"
>>>> libraryDependencies += "junit" % "junit" % "4.12"
>>>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>>>> % "provided"
>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>>>> "1.6.1"
>>>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>>>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>>>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
>>>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
>>>> % "test"
>>>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" %
>>>> "1.6.1"
>>>>
>>>>
>>>> However, I an getting the following error
>>>>
>>>> [info] Loading project definition from
>>>> /data6/hduser/scala/CEP_assembly/project
>>>> [info] Set current project to CEP_assembly (in build
>>>> file:/data6/hduser/scala/CEP_assembly/)
>>>> [info] Compiling 1 Scala source to
>>>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
>>>> [error]
>>>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
>>>> constructor OffsetRange in class OffsetRange cannot be accessed in object
>>>> CEP_assembly
>>>> [error] val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>>
>>>> On 22 April 2016 at 18:41, Marcelo Vanzin <[email protected]> wrote:
>>>>>
>>>>> On Fri, Apr 22, 2016 at 10:38 AM, Mich Talebzadeh
>>>>> <[email protected]> wrote:
>>>>> > I am trying to test Spark with CEP and I have been shown a sample
>>>>> > here
>>>>> >
>>>>> > https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>>>>>
>>>>> I'm not familiar with CEP, but that's a Spark unit test, so if you're
>>>>> trying to run it outside of the context of Spark unit tests (as it
>>>>> seems you're trying to do), you're going to run into a world of
>>>>> trouble. I'd suggest a different approach where whatever you're trying
>>>>> to do is done through the Spark build, not outside of it.
>>>>>
>>>>> --
>>>>> Marcelo
>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to