# hadoop - How to correlate all combination of arrays in an RDD?

I have an RDD from `model.productFeatures()` which returns an RDD in the form of `(id, array("d", (...)))`. For example:

``(1, array("d", (0, 1, 2)))(2, array("d", (4, 3, 2)))(3, array("d", (5, 3, 0)))...``

I would like to calculate the pairwise correlation between each array, then return for each `id` another `id` whose array has the highest correlation.

The first thing you need is to get all pairs of elements, except the "diagonal" where they're the same.

``````>>> rdd.cartesian(rdd).filter(lambda (x, y): x != y).collect()
[((1, array('d', [0.0, 1.0, 2.0])), (2, array('d', [4.0, 3.0, 2.0]))),
((1, array('d', [0.0, 1.0, 2.0])), (3, array('d', [5.0, 3.0, 0.0]))),
((2, array('d', [4.0, 3.0, 2.0])), (1, array('d', [0.0, 1.0, 2.0]))),
((3, array('d', [5.0, 3.0, 0.0])), (1, array('d', [0.0, 1.0, 2.0]))),
((2, array('d', [4.0, 3.0, 2.0])), (3, array('d', [5.0, 3.0, 0.0]))),
((3, array('d', [5.0, 3.0, 0.0])), (2, array('d', [4.0, 3.0, 2.0])))]
``````

Then a function to calculate the correlation and rearrange to prepare for the last step. Let's assume by "correlation" you mean what is done by `numpy.correlate`.

``````def corr_pair(((id1, a1), (id2, a2))):
return id1, (id2, np.correlate(a1, a2)[0])

>>> rdd.cartesian(rdd).filter(lambda (p1, p2): p1 != p2).map(corr_pair).collect()
[(1, (2, 7.0)), (1, (3, 3.0)), (2, (1, 7.0)), (3, (1, 3.0)), (2, (3, 29.0)), (3, (2, 29.0))]
``````

To get the 2nd ID with the maximum correlation with each 1st ID, you can use `reduceByKey` and always keep the bigger one:

``````def keep_higher((id1, c1), (id2, c2)):
if c1 > c2:
return id1, c1
else:
return id2, c2

>>> rdd.cartesian(rdd).filter(lambda (x, y): x != y).map(corr_pair).reduceByKey(keep_higher).collect()
[(1, (2, 7.0)), (2, (3, 29.0)), (3, (2, 29.0))]
``````