If the number of items is very large, have you considered using probabilistic counting? The HyperLogLogPlus <https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java> class from stream-lib <https://github.com/addthis/stream-lib> might be suitable.
On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > I have a RDD of type (String, > > Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord, > com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])] > > Here String is Key and a list of tuples for that key. I got above RDD > after doing a groupByKey. I later want to compute total number of values > for a given key and total number of unique values for the same given key > and hence i do this > > val totalViCount = details.size.toLong > val uniqueViCount = > details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong > > How do i do this using reduceByKey. > > *Total Code:* > > val groupedDetail: RDD[(String, Iterable[(DetailInputRecord, > DataRecord)])] = detailInputsToGroup.map { > case (detailInput, dataRecord) => > val key: StringBuilder = new StringBuilder > dimensions.foreach { > dimension => > key ++= { > > Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString > } > } > (key.toString, (detailInput, dataRecord)) > }.groupByKey > > groupedDetail.map { > case (key, values) => { > val valueList = values.toList > > //Compute dimensions // You can skup this > val (detailInput, dataRecord) = valueList.head > val schema = SchemaUtil.outputSchema(_detail) > val detailOutput = new DetailOutputRecord(detail, new > SessionRecord(schema)) > DataUtil.populateDimensions(schema, dimensions.toArray, > detailInput, dataRecord, detailOutput) > > > val metricsData = metricProviders.flatMap { > case (className, instance) => > val data = instance.getMetrics(valueList) > ReflectionUtil.getData(data, > _metricProviderMemberNames(className)) > } > metricsData.map { case (k, v) => detailOutput.put(k, v) } > val wrap = new AvroKey[DetailOutputRecord](detailOutput) > (wrap, NullWritable.get) > } > } > > > //getMetrics: > def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = { > val totalViCount = details.size.toLong > val uniqueViCount = > details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong > new ViewItemCountMetric(totalViCount, uniqueViCount) > } > > > I understand that totalViCount can be implemented using reduceByKey. How > can i implement total unique count as i need to have the full list to know > the unique values. > > -- > Deepak > >