Distributed Computing 3 | RDD Transformation Operations and RDD Actions

Series: Distributed Computing

Distributed Computing 3 | RDD Transformation Operations and RDD Actions

  1. RDD Transformation Operations

Transformation means to construct a new RDD from an existing RDD. They do not take place until an action is called.

(1) Narrow Transformation

Narrow transformations transform data without any shuffle involved. These transformations transform the data on a per-partition basis; that is to say, each element of the output RDD can be computed without involving any elements from different partitions. It includes,

  • Return a new distributed dataset formed by passing each element of the source through a function func.
rdd.map(func)
  • flatMap(func) is similar to map, but each input item can be mapped to 0 or more output items (so func should return a sequence rather than a single item).
rdd.flatMap(func)
  • Return a new dataset formed by selecting those elements of the source on which func returns true.
rdd.filter(func)

(2) Wide Transformation

Wide transformations involve a shuffle of the data between the partitions. It includes,

  • Return a new dataset that contains the distinct elements of the source dataset.
rdd.distinct()
  • Return a new dataset that contains the union of the elements in the sources dataset and the argument.
rdd.union(otherDataset)
  • Return a new RDD that contains the intersection of elements in the source database and the argument
rdd.intersection(otherDataset)
  • Return each value in self that is not contained in otherDataset
rdd.substract(otherDataset)
  • Return the Cartesian product of the RDD and otherDataset
rdd.cartesian(otherDataset)
  • Sorts the present RDD by the given func.
rdd.sortBy(func, ascending=True)

2. RDD Actions

The actions are the triggers of the operations, and they are going to return non-RDD datatypes like int, string, list, or else. The actions operate on the data known in the memory and it will be wrapped out if we restart.

(1) Simple RDD Actions

  • Return all the elements of the dataset as an array at the driver program.
rdd.collect()
  • Return the first element of the dataset (similar to take(1)).
rdd.first()
  • Return an array with the first n elements of the dataset.
rdd.take(n)
  • Return the top n elements of the RDD by descending order
rdd.top(n)
  • Return the last n elements of the RDD by descending order
rdd.takeOrdered(n)
  • Return the number of elements in the dataset.
rdd.count()
  • Return the number of times occurs in the RDD in a dictionary.
rdd.countByValue()
  • Return the number of times each element occurs in the RDD in tuples.
rdd.countByValue().item()

(2) Math RDD Actions

  • Return the mean of the RDD’s elements.
rdd.mean()
  • Add up the elements in the RDD.
rdd.sum()
  • Return the maximum item in the RDD.
rdd.max()
  • Return the minimum item in the RDD.
rdd.min()
  • Return the variance of the RDD’s elements.
rdd.variance()
  • Return the standard deviation of the RDD’s elements.
rdd.stdev()

(3) Complicated RDD Actions

  • Combine the elements of the RDD together in parallel by func.
rdd.reduce(func)
  • Same as reduce(), but with the provided zeroValue (don’t necessarily have to be 0) to start within each partition. This is safer because we won’t have errors if there is no element in a partition.
rdd.fold(zeroValue, func)

For example, if we have a sum function of lambda x, y: x+y , then we are going to have an added value of zeroValue * (numberOfPartition + 1) . Having this numberOfPartition + 1 because the program starts with zeroValue in each partition, and after calculating the result of each partition, the program treats all the values in a partition and it will also start from zeroValue.

Now, let’s see some common examples of reduce and fold

  • Return the sum of all elements in all the partitions with non-empty RDDs
rdd.reduce(lambda x, y: x + y)
  • Safely return the sum of all elements in all the partitions in an RDD. When the RDD is empty, return 0.
rdd.fold(0, lambda x, y: x + y)
  • Return the product of all elements in all the partitions with non-empty RDDs
rdd.reduce(lambda x, y: x * y)
  • Safely return the product of all elements in all the partitions in an RDD. When the RDD is empty, return 1.
rdd.fold(1, lambda x, y: x * y)
  • Return the maximum value in a non-empty RDD by reduce
rdd.reduce(lambda x, y: x if x > y else y)
  • Return the minimum value in a non-empty RDD by reduce
rdd.reduce(lambda x, y: x if x < y else y)
  • Return the string combined with all the words in a non-empty RDD separated by " "
rdd.reduce(lambda x, y: x + " " + y)

(5) reduce and fold Problem: Counting

If we have only reduce and fold , we can only do little things. For example, we are not able to count the elements in the RDD. If we try,

rdd = sc.parallelize([5,2,7,4,3,6,1], 3)
rdd.fold(0, lambda x, y: x + 1)

The result will probably be,

3

This is because we will start from the first partition which has elements [5, 2] . Then the lambda function will count the values in this function to 2. It will also be the same for the other two partitions. As a result, the result in each partition will be,

[2, 2, 3]

However, when the lambda starts to work on this list, it will again start by 0 and it will give us the number of partitions instead of the number of the elements. So what we want here is to have a different last step.

(6) RDD aggregate Action

Finally, let’s see the most complicated RDD action called aggregate . This is very useful when we want to have a different step in the end when combining the value in all the partitions.

rdd.aggregate(zeroValue, SeqOp, CombOp)

where,

  • SeqOp: this is the operation inside each partition
  • CombOp: this is the operation when combining the returned value in all the partitions in the last step

(7) RDD aggregate Action Example 1: Counting

Now, let’s think about counting the array again. In the last step, we definitely don’t want to add 1 to the zeroValue iteratively. Instead, we would like to sum the result returned from all the partitions. Therefore,

  • SeqOp = lambda x, y: x + 1 (counting in each partition)
  • CombOp = lambda x, y: x + y (sum all the results)

So the action should be,

rdd.aggregate(0, (lambda x, y: x + 1), (lambda x, y: x + y))

(8) RDD aggregate Action Example 2: Safely Return String

We have given an example about how to join a list of words separated by the space character " ". However, this method is not safe, especially when we have an empty RDD. So the safer idea is that we can start from an empty string, and if there’s no word, we can return an empty string "".

  • SeqOp = lambda x, y: x + " " + y(joining in each partition)
  • CombOp = lambda x, y: x + y (join all the results)

So the action should be,

rdd.aggregate("", (lambda x, y: x + " " + y), (lambda x, y: x + y))