I have a RDD of type (String,
Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]
Here String is Key and a list of tuples for that key. I got above RDD after
doing a groupByKey. I later want to compute total number of values for a
given key and total number of unique values for the same given key and
hence i do this
val totalViCount = details.size.toLong
val uniqueViCount =
details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
How do i do this using reduceByKey.
*Total Code:*
val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
DataRecord)])] = detailInputsToGroup.map {
case (detailInput, dataRecord) =>
val key: StringBuilder = new StringBuilder
dimensions.foreach {
dimension =>
key ++= {
Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString
}
}
(key.toString, (detailInput, dataRecord))
}.groupByKey
groupedDetail.map {
case (key, values) => {
val valueList = values.toList
//Compute dimensions // You can skup this
val (detailInput, dataRecord) = valueList.head
val schema = SchemaUtil.outputSchema(_detail)
val detailOutput = new DetailOutputRecord(detail, new
SessionRecord(schema))
DataUtil.populateDimensions(schema, dimensions.toArray,
detailInput, dataRecord, detailOutput)
val metricsData = metricProviders.flatMap {
case (className, instance) =>
val data = instance.getMetrics(valueList)
ReflectionUtil.getData(data,
_metricProviderMemberNames(className))
}
metricsData.map { case (k, v) => detailOutput.put(k, v) }
val wrap = new AvroKey[DetailOutputRecord](detailOutput)
(wrap, NullWritable.get)
}
}
//getMetrics:
def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
val totalViCount = details.size.toLong
val uniqueViCount =
details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
new ViewItemCountMetric(totalViCount, uniqueViCount)
}
I understand that totalViCount can be implemented using reduceByKey. How
can i implement total unique count as i need to have the full list to know
the unique values.
--
Deepak