bq. is not a member of (String, String)

As shown above, contains shouldn't be applied directly on a tuple.

Choose the element of the tuple and then apply contains on it.

On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Thank you gents.
>
> That should "\n" as carriage return
>
> OK I am using spark streaming to analyse the message
>
> It does the streaming
>
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> //
> scala> val sparkConf = new SparkConf().
>      |              setAppName("StreamTest").
>      |              setMaster("local[12]").
>      |              set("spark.driver.allowMultipleContexts", "true").
>      |              set("spark.hadoop.validateOutputSpecs", "false")
> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
> scala>
> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
> StreamTest)
> scala> val topic = Set("newtopic")
> topic: scala.collection.immutable.Set[String] = Set(newtopic)
> scala> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>
> This part is tricky
>
> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).collect.foreach(println)
> <console>:47: error: value contains is not a member of (String, String)
>          val showlines = messages.filter(_ contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).collect.foreach(println)
>
>
> How does one refer to the content of the stream here?
>
> Thanks
>
>
>
>
>
>
>
>
>
>
> //
> // Now want to do some analysis on the same text file
> //
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> bq. split"\t," splits the filter by carriage return
>>
>> Minor correction: "\t" denotes tab character.
>>
>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote:
>>
>>> Hi Mich,
>>>
>>> 1. The first underscore in your filter call is refering to a line in the
>>> file (as textFile() results in a collection of strings)
>>> 2. You're correct. No need for it.
>>> 3. Filter is expecting a Boolean result. So you can merge your contains
>>> filters to one with AND (&&) statement.
>>> 4. Correct. Each character in split() is used as a divider.
>>>
>>> Eliran Bivas
>>>
>>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com>
>>> *Sent:* Apr 3, 2016 15:06
>>> *To:* Eliran Bivas
>>> *Cc:* user @spark
>>> *Subject:* Re: multiple splits fails
>>>
>>> Hi Eliran,
>>>
>>> Many thanks for your input on this.
>>>
>>> I thought about what I was trying to achieve so I rewrote the logic as
>>> follows:
>>>
>>>
>>>    1. Read the text file in
>>>    2. Filter out empty lines (well not really needed here)
>>>    3. Search for lines that contain "ASE 15" and further have sentence
>>>    "UPDATE INDEX STATISTICS" in the said line
>>>    4. Split the text by "\t" and ","
>>>    5. Print the outcome
>>>
>>>
>>> This was what I did with your suggestions included
>>>
>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>> f.cache()
>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>> _).collect.foreach(println)
>>>
>>>
>>> Couple of questions if I may
>>>
>>>
>>>    1. I take that "_" refers to content of the file read in by default?
>>>    2. _.length > 0 basically filters out blank lines (not really needed
>>>    here)
>>>    3. Multiple filters are needed for each *contains* logic
>>>    4. split"\t," splits the filter by carriage return AND ,?
>>>
>>>
>>> Regards
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> Few comments:
>>>>
>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>> comparison and not filtering for empty lines (which could be achieved with
>>>> _.notEmpty or _.length > 0).
>>>> I think that filtering with _.contains should be sufficient and the
>>>> first filter can be omitted.
>>>>
>>>> As for line => line.split(“\t”).split(“,”):
>>>> You have to do a second map or (since split() requires a regex as
>>>> input) .split(“\t,”).
>>>> The problem is that your first split() call will generate an Array and
>>>> then your second call will result in an error.
>>>> e.g.
>>>>
>>>> val lines: Array[String] = line.split(“\t”)
>>>> lines.split(“,”) // Compilation error - no method split() exists for
>>>> Array
>>>>
>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>> map(_.split(“\t,”))
>>>>
>>>> Hope that helps.
>>>>
>>>> *Eliran Bivas*
>>>> Data Team | iguaz.io
>>>>
>>>>
>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am not sure this is the correct approach
>>>>
>>>> Read a text file in
>>>>
>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>
>>>>
>>>> Now I want to get rid of empty lines and filter only the lines that
>>>> contain "ASE15"
>>>>
>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>
>>>> The above works but I am not sure whether I need two filter
>>>> transformation above? Can it be done in one?
>>>>
>>>> Now I want to map the above filter to lines with carriage return ans
>>>> split them by ","
>>>>
>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>> (line.split("\t")))
>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131]
>>>> at map at <console>:30
>>>>
>>>> Now I want to split the output by ","
>>>>
>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>> (line.split("\t").split(",")))
>>>> <console>:30: error: value split is not a member of Array[String]
>>>>               f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>> (line.split("\t").split(",")))
>>>>
>>>> ^
>>>> Any advice will be appreciated
>>>>
>>>> Thanks
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to