Hi,

I am running into a strange issue when doing a JOIN of two RDDs followed by ZIP 
from PySpark.
It’s part of a more complex application, but was able to narrow it down to a 
simplified example that’s easy to replicate and causes the same problem to 
appear:


raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair: 
','.join([x for x in pair[1]]))
d1 = data.map(lambda s: s.split(',')[0])
d2 = data.map(lambda s: s.split(',')[1])
x = d1.zip(d2)

print x.take(10)


The output is:


[('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81', 
'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'), ('v83', 
'v83')]

As you can see, the ordering of items is not preserved anymore in all cases. 
(e.g., ‘v81’ is preserved, and ‘v45’ is not)
Is it not supposed to be preserved?

If I do the same thing without the JOIN:

data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
d1 = data.map(lambda s: s.split(',')[0])
d2 = data.map(lambda s: s.split(',')[1])
x = d1.zip(d2)

print x.take(10)


The output is:


[('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'), ('v5', 
'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]

As expected.

Anyone run into this or a similar issue?

Ofer

Reply via email to