Distributed Computing 5 | Paired RDDs
Distributed Computing 5 | Paired RDDs

- Paired RDDs
(1) Definition
Pair RDDs are key-value pairs that are commonly used for many operations including aggregations or ETL in Spark.
- Key: can be any type (hashable or unhashable)
- Value: can be any type (hashable or unhashable)
(2) Paired RDD Transformations
- Return an RDD of just the keys.
rdd.keys()
- Return an RDD of just the values.
rdd.values()
- Return an RDD sorted by the key.
rdd.sortByKey()
- Group values with the same key.
rdd.groupByKey()
- Apply a function to each value of a pair RDD without changing the key.
rdd.mapValues(func)
- Pass each value in the key-value pair RDD through a flatMap function without changing the keys.
rdd.flatMapValues(func)
- Combine values with the same key.
rdd.reduceByKey(func)
- Remove elements with a key that existed in the other RDD.
rdd.subtractByKey(otherDataset)
- Perform an inner join between two RDDs.
rdd.join(otherDataset)
- Perform a join between two RDDs where the key must be present in the first RDD.
rdd.leftOuterJoin(otherDataset)
- Perform a join between two RDDs where the key must be present in the other RDD.
rdd.rightOuterJoin(otherDataset)
(3) Paired RDD Actions
- Only available on pair RDDs. Returns (Key, Int) pairs with the count of each key.
rdd.countByKey()
- Return all values associated with the provided key.
rdd.lookup(key)
2. Self Testing Questions
(1) Suppose we have the following RDD.
rdd = sc.parallelize([[5,2],[7,4],[3,6],[1,4],[9,7]],3)
- What is the result of the following code?
rdd_pair = rdd.map(lambda x: ([x[0]], x[1]))
rdd_pair.take(1)
Ans: [([5], 2)]
because the paired key can be a list
- What is the result of the following code?
rdd_pair = rdd.map(lambda x: ({x[0]:1}, x[1]))
rdd_pair.take(1)
Ans: [({5: 1}, 2)]
because the paired key can be a list
- What is the result of the following code?
rdd_pair = rdd.map(lambda x: (x[0], {x[1]:1}))
rdd_pair.distinct().count()
Ans: ERROR
because distinct()
requires keys and values in paired RDDs to be hashable.
- What is the result of the following code?
rdd_pair = rdd.map(lambda x: (x[0], str(x[1])))
rdd_pair.distinct().count()
Ans: 5
(2) Suppose we have the following paired RDD.
rdd = sc.parallelize([[7,2],[7,4],[3,6],[1,4],[3,7]],3)
rdd = rdd.map(lambda x: (x[0], x[1]))
- What is the result of the following code?
rdd.flatMap(lambda x: x).collect()
Ans: [7, 2, 7, 4, 3, 6, 1, 4, 3, 7]
the answer has no pairs
- What is the result of the following code?
rdd.flatMapValues(lambda x: x).collect()
Ans: ERROR
because the value for each pair is not iterable.
- What is the result of the following code?
rdd.sortByKey().groupByKey().take(1)[0][1] == [6, 7]
Ans: False
because groupByKey()
returns ResultIterable
type object as its value. The following code should return True
,
rdd.sortByKey().groupByKey().mapValues(list).take(1)[0][1] == [6, 7]
- What is the result of the following code?
rdd.groupByKey().mapValues(list).flatMapValues(lambda x: x).collect() == rdd.collect()
Ans: False
although two results have the same elements, their order can be different. The following code should return True
.
set(rdd.groupByKey().mapValues(list).flatMapValues(lambda x: x).collect()) == set(rdd.collect())
- What’s the result of the following code?
rdd.groupByKey().mapValues(list).reduceByKey(lambda x, y: x + y).take(1)
Ans: [(3, [6, 7])]
the method reduceByKey
is not useful because we have unique keys. Directly use reduceByKey
if you want to sum them up.
rdd.reduceByKey(lambda x, y: x + y).take(1)
- What’s the result of the following code?
rdd.lookup(3)
Ans: [6, 7]
, should return a list.
- What’s the result of the following code?
rdd.countByKey()[3]
Ans: 2
, returns a dictionary with keys and the number of its values.
(3) Which of the following transformations don’t require data shuffle?
keys()
values()
sortByKey()
groupByKey()
mapValues(func)
flatMapValues(func)
reduceByKey(func)
Ans: keys(), values(), mapValues(func), flatMapValues(func)
(4) Which of the following code performs better?
Code 1: rdd.groupByKey().mapValues(sum).collect()
Code 2: rdd.reduceByKey(lambda x, y: x + y).collect()
Ans: Code 2
because groupByKey
requires shuffle and aggregate.