Hi,

I’m trying to implement in Pyspark the Theta sketch multiple sketches for 
countries example in 
https://datasketches.apache.org/docs/Theta/ThetaSparkExample.html. I am stuck 
on the aggregateByKey, after implementing a ThetaSketchSerializable class and 
the Add and Combine functions. Is there an example of this tutorial in Pyspark?

Thanks,
Elle

--------------------------

from pyspark.serializers import Serializer, FramedSerializer

class theta_sketch_serializable(FramedSerializer):

  def __init__(self):
    self.sketch = update_theta_sketch(12)

  def get_sketch(self):
    return self.sketch

  def get_compact_sketch(self):
    if isinstance(self.sketch, theta_sketch):
      return self.sketch.compact()
    elif isinstance(self.sketch, compact_theta_sketch):
      return self.sketch

  def update(self, value):
    if isinstance(self.sketch, theta_sketch):
      self.sketch.update(value)
    else:
      raise Exception('trying to update compact sketch or null sketch')

  def get_estimate(self):
    return self.sketch.get_estimate()

  def dumps(self, obj):
    return obj.compact.serialize() # it will be made compact so no longer 
updateable

  def loads(self, obj):
    return compact_theta_sketch.deserialize(obj)


def Add(sketch, value):
  sketch.update(value)
  return sketch

def Combine(sketch1, sketch2):
  sketch_union = theta_union()
  compact_sketch_1 = sketch_1.compact()
  compact_sketch_2 = sketch_2.compact()
  sketch_union.update(compact_sketch_1)
  sketch_union.update(compact_sketch_2)
  return sketch_union

pairs.aggregateByKey(zeroValue=theta_sketch_serializable(),
                     seqFunc=Add,
                     combFunc=Combine,
                     numPartitions=1).take(1)

Reply via email to