I modified to
detailInputsToGroup.map {
case (detailInput, dataRecord) =>
val key: StringBuilder = new StringBuilder
dimensions.foreach {
dimension =>
key ++= {
Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString
}
}
(key.toString, (detailInput, dataRecord))
}.reduceByKey {
case (v1, v2) => {
val v1Detail = v1._1
val v2Detail = v2._1
val v1Data = v1._2
val v2Data = v2._2
* val totalViCount =
Option(v1Data.get("totalViCount").asInstanceOf[Int]).getOrElse(0)*
* v1Data.getRecord.put("totalViCount", totalViCount + 1)*
(v1)
}
}.map {
case (k, v) => {
val schema = SchemaUtil.outputSchema(_detail)
val detailOutputRecord = new DetailOutputRecord(detail, new
SessionRecord(schema))
//Compute dimensions
DataUtil.populateDimensions(schema, dimensions.toArray, v._1,
v._2, detailOutputRecord)
//Construct Output
val wrap = new AvroKey[DetailOutputRecord](detailOutputRecord)
(wrap, NullWritable.get)
}
}
How do i compute unique count ?
On Tue, Jun 30, 2015 at 12:04 PM, Daniel Siegmann <
[email protected]> wrote:
> If the number of items is very large, have you considered using
> probabilistic counting? The HyperLogLogPlus
> <https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java>
> class from stream-lib <https://github.com/addthis/stream-lib> might be
> suitable.
>
> On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]>
> wrote:
>
>> I have a RDD of type (String,
>>
>> Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
>> com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]
>>
>> Here String is Key and a list of tuples for that key. I got above RDD
>> after doing a groupByKey. I later want to compute total number of values
>> for a given key and total number of unique values for the same given key
>> and hence i do this
>>
>> val totalViCount = details.size.toLong
>> val uniqueViCount =
>> details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
>>
>> How do i do this using reduceByKey.
>>
>> *Total Code:*
>>
>> val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
>> DataRecord)])] = detailInputsToGroup.map {
>> case (detailInput, dataRecord) =>
>> val key: StringBuilder = new StringBuilder
>> dimensions.foreach {
>> dimension =>
>> key ++= {
>>
>> Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString
>> }
>> }
>> (key.toString, (detailInput, dataRecord))
>> }.groupByKey
>>
>> groupedDetail.map {
>> case (key, values) => {
>> val valueList = values.toList
>>
>> //Compute dimensions // You can skup this
>> val (detailInput, dataRecord) = valueList.head
>> val schema = SchemaUtil.outputSchema(_detail)
>> val detailOutput = new DetailOutputRecord(detail, new
>> SessionRecord(schema))
>> DataUtil.populateDimensions(schema, dimensions.toArray,
>> detailInput, dataRecord, detailOutput)
>>
>>
>> val metricsData = metricProviders.flatMap {
>> case (className, instance) =>
>> val data = instance.getMetrics(valueList)
>> ReflectionUtil.getData(data,
>> _metricProviderMemberNames(className))
>> }
>> metricsData.map { case (k, v) => detailOutput.put(k, v) }
>> val wrap = new AvroKey[DetailOutputRecord](detailOutput)
>> (wrap, NullWritable.get)
>> }
>> }
>>
>>
>> //getMetrics:
>> def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
>> val totalViCount = details.size.toLong
>> val uniqueViCount =
>> details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
>> new ViewItemCountMetric(totalViCount, uniqueViCount)
>> }
>>
>>
>> I understand that totalViCount can be implemented using reduceByKey. How
>> can i implement total unique count as i need to have the full list to know
>> the unique values.
>>
>> --
>> Deepak
>>
>>
>
--
Deepak