bq. is not a member of (String, String) As shown above, contains shouldn't be applied directly on a tuple.
Choose the element of the tuple and then apply contains on it. On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Thank you gents. > > That should "\n" as carriage return > > OK I am using spark streaming to analyse the message > > It does the streaming > > import _root_.kafka.serializer.StringDecoder > import org.apache.spark.SparkConf > import org.apache.spark.streaming._ > import org.apache.spark.streaming.kafka.KafkaUtils > // > scala> val sparkConf = new SparkConf(). > | setAppName("StreamTest"). > | setMaster("local[12]"). > | set("spark.driver.allowMultipleContexts", "true"). > | set("spark.hadoop.validateOutputSpecs", "false") > scala> val ssc = new StreamingContext(sparkConf, Seconds(55)) > scala> > scala> val kafkaParams = Map[String, String]("bootstrap.servers" -> > "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", > "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) > kafkaParams: scala.collection.immutable.Map[String,String] = > Map(bootstrap.servers -> rhes564:9092, schema.registry.url -> > http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id -> > StreamTest) > scala> val topic = Set("newtopic") > topic: scala.collection.immutable.Set[String] = Set(newtopic) > scala> val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topic) > messages: org.apache.spark.streaming.dstream.InputDStream[(String, > String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c > > This part is tricky > > scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_ > contains("UPDATE INDEX STATISTICS")).flatMap(line => > line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > _).collect.foreach(println) > <console>:47: error: value contains is not a member of (String, String) > val showlines = messages.filter(_ contains("ASE 15")).filter(_ > contains("UPDATE INDEX STATISTICS")).flatMap(line => > line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + > _).collect.foreach(println) > > > How does one refer to the content of the stream here? > > Thanks > > > > > > > > > > > // > // Now want to do some analysis on the same text file > // > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 3 April 2016 at 15:32, Ted Yu <yuzhih...@gmail.com> wrote: > >> bq. split"\t," splits the filter by carriage return >> >> Minor correction: "\t" denotes tab character. >> >> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas <elir...@iguaz.io> wrote: >> >>> Hi Mich, >>> >>> 1. The first underscore in your filter call is refering to a line in the >>> file (as textFile() results in a collection of strings) >>> 2. You're correct. No need for it. >>> 3. Filter is expecting a Boolean result. So you can merge your contains >>> filters to one with AND (&&) statement. >>> 4. Correct. Each character in split() is used as a divider. >>> >>> Eliran Bivas >>> >>> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >>> *Sent:* Apr 3, 2016 15:06 >>> *To:* Eliran Bivas >>> *Cc:* user @spark >>> *Subject:* Re: multiple splits fails >>> >>> Hi Eliran, >>> >>> Many thanks for your input on this. >>> >>> I thought about what I was trying to achieve so I rewrote the logic as >>> follows: >>> >>> >>> 1. Read the text file in >>> 2. Filter out empty lines (well not really needed here) >>> 3. Search for lines that contain "ASE 15" and further have sentence >>> "UPDATE INDEX STATISTICS" in the said line >>> 4. Split the text by "\t" and "," >>> 5. Print the outcome >>> >>> >>> This was what I did with your suggestions included >>> >>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>> f.cache() >>> f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ >>> contains("UPDATE INDEX STATISTICS")).flatMap(line => >>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ + >>> _).collect.foreach(println) >>> >>> >>> Couple of questions if I may >>> >>> >>> 1. I take that "_" refers to content of the file read in by default? >>> 2. _.length > 0 basically filters out blank lines (not really needed >>> here) >>> 3. Multiple filters are needed for each *contains* logic >>> 4. split"\t," splits the filter by carriage return AND ,? >>> >>> >>> Regards >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 3 April 2016 at 12:35, Eliran Bivas <elir...@iguaz.io> wrote: >>> >>>> Hi Mich, >>>> >>>> Few comments: >>>> >>>> When doing .filter(_ > “”) you’re actually doing a lexicographic >>>> comparison and not filtering for empty lines (which could be achieved with >>>> _.notEmpty or _.length > 0). >>>> I think that filtering with _.contains should be sufficient and the >>>> first filter can be omitted. >>>> >>>> As for line => line.split(“\t”).split(“,”): >>>> You have to do a second map or (since split() requires a regex as >>>> input) .split(“\t,”). >>>> The problem is that your first split() call will generate an Array and >>>> then your second call will result in an error. >>>> e.g. >>>> >>>> val lines: Array[String] = line.split(“\t”) >>>> lines.split(“,”) // Compilation error - no method split() exists for >>>> Array >>>> >>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or >>>> map(_.split(“\t,”)) >>>> >>>> Hope that helps. >>>> >>>> *Eliran Bivas* >>>> Data Team | iguaz.io >>>> >>>> >>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh <mich.talebza...@gmail.com> >>>> wrote: >>>> >>>> Hi, >>>> >>>> I am not sure this is the correct approach >>>> >>>> Read a text file in >>>> >>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt") >>>> >>>> >>>> Now I want to get rid of empty lines and filter only the lines that >>>> contain "ASE15" >>>> >>>> f.filter(_ > "").filter(_ contains("ASE15")). >>>> >>>> The above works but I am not sure whether I need two filter >>>> transformation above? Can it be done in one? >>>> >>>> Now I want to map the above filter to lines with carriage return ans >>>> split them by "," >>>> >>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>> (line.split("\t"))) >>>> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131] >>>> at map at <console>:30 >>>> >>>> Now I want to split the output by "," >>>> >>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>> (line.split("\t").split(","))) >>>> <console>:30: error: value split is not a member of Array[String] >>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line => >>>> (line.split("\t").split(","))) >>>> >>>> ^ >>>> Any advice will be appreciated >>>> >>>> Thanks >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> >>> >> >