Hi, I’m trying to implement in Pyspark the Theta sketch multiple sketches for countries example in https://datasketches.apache.org/docs/Theta/ThetaSparkExample.html. I am stuck on the aggregateByKey, after implementing a ThetaSketchSerializable class and the Add and Combine functions. Is there an example of this tutorial in Pyspark?
Thanks, Elle -------------------------- from pyspark.serializers import Serializer, FramedSerializer class theta_sketch_serializable(FramedSerializer): def __init__(self): self.sketch = update_theta_sketch(12) def get_sketch(self): return self.sketch def get_compact_sketch(self): if isinstance(self.sketch, theta_sketch): return self.sketch.compact() elif isinstance(self.sketch, compact_theta_sketch): return self.sketch def update(self, value): if isinstance(self.sketch, theta_sketch): self.sketch.update(value) else: raise Exception('trying to update compact sketch or null sketch') def get_estimate(self): return self.sketch.get_estimate() def dumps(self, obj): return obj.compact.serialize() # it will be made compact so no longer updateable def loads(self, obj): return compact_theta_sketch.deserialize(obj) def Add(sketch, value): sketch.update(value) return sketch def Combine(sketch1, sketch2): sketch_union = theta_union() compact_sketch_1 = sketch_1.compact() compact_sketch_2 = sketch_2.compact() sketch_union.update(compact_sketch_1) sketch_union.update(compact_sketch_2) return sketch_union pairs.aggregateByKey(zeroValue=theta_sketch_serializable(), seqFunc=Add, combFunc=Combine, numPartitions=1).take(1)