Hi

 I have been trying to this simple operation.  I want to land all values
with one key in same partition, and not have any different key in the same
partition.  Is this possible?   I am getting b and c always getting mixed
up in the same partition.


rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
('b', 3),('c', 8)])
from pyspark.rdd import portable_hash

n = 4

def partitioner(n):
    """Partition by the first item in the key tuple"""
    def partitioner_(x):
        val = x[0]
        key = portable_hash(x[0])
        print ("Val %s Assigned Key %s" % (val, key))
        return key
    return partitioner_

def validate(part):
    last_key = None
    for p in part:
        k = p[0]
        if not last_key:
            last_key = k
        if k != last_key:
            print("Mixed keys in partition %s %s" % (k,last_key) )

partioned = (rdd
  .keyBy(lambda kv: (kv[0], kv[1]))
  .repartitionAndSortWithinPartitions(
      numPartitions=n, partitionFunc=partitioner(n),
ascending=False)).map(lambda x: x[1])

print(partioned.getNumPartitions())
partioned.foreachPartition(validate)


Val a Assigned Key -7583489610679606711
Val a Assigned Key -7583489610679606711
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Val c Assigned Key 1421958803217889556
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Mixed keys in partition b c
Mixed keys in partition b c


Regards
Sumit Chawla

Reply via email to