Basically what I want to do it'd be something like.. val errorLines = lines.filter(_.contains("h")) val mapErrorLines = errorLines.map(line => ("key", line)) val grouping = errorLinesValue.groupByKeyAndWindow(Seconds(8), Seconds(4))
if (errorLinesValue.getValue().size() > X){ //iterate values and do something for each element. } I think that it must be pretty basic,, argg. 2014-12-17 18:43 GMT+01:00 Guillermo Ortiz <konstt2...@gmail.com>: > What I would like to do it's to count the number of elements and if > it's greater than a number, I have to iterate all them and store them > in mysql or another system. So, I need to count them and preserve the > values because saving in other system. > > I know about this map(line => ("key", line)), it was just a test, I > want to change "key" for a value which comes from a RE. > > 2014-12-17 17:28 GMT+01:00 Gerard Maas <gerard.m...@gmail.com>: >> >> You can create a DStream that contains the count, transforming the grouped >> windowed RDD, like this: >> val errorCount = grouping.map{case (k,v) => v.size } >> >> If you need to preserve the key: >> val errorCount = grouping.map{case (k,v) => (k,v.size) } >> >> or you if you don't care about the content of the values, you could count >> directly, instead of grouping first: >> >> val errorCount = mapErrorLines.countByWindow(Seconds(8), Seconds(4)) >> >> Not sure why you're using map(line => ("key", line)) as there only seem to >> be one key. If that's not required, we can simplify one more step: >> >> val errorCount = errorLines.countByWindow(Seconds(8), Seconds(4)) >> >> >> The question is: what do you want to do with that count afterwards? >> >> -kr, Gerard. >> >> >> On Wed, Dec 17, 2014 at 5:11 PM, Guillermo Ortiz <konstt2...@gmail.com> >> wrote: >>> >>> I'm a newbie with Spark,,, a simple question >>> >>> val errorLines = lines.filter(_.contains("h")) >>> val mapErrorLines = errorLines.map(line => ("key", line)) >>> val grouping = errorLinesValue.groupByKeyAndWindow(Seconds(8), Seconds(4)) >>> >>> I get something like: >>> >>> 604: ------------------------------------------- >>> 605: Time: 1418832180000 ms >>> 606: ------------------------------------------- >>> 607: (key,ArrayBuffer(h2, h3, h4)) >>> >>> Now, I would like to get that ArrayBuffer and count the number of >>> elements,, >>> How could I get that arrayBuffer??? something like: >>> val values = grouping.getValue()... How could I do this in Spark with >>> Scala? >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org