Distributed Computing 6 | Improving PySpark Performance

Series: Distributed Computing

Distributed Computing 6 | Improving PySpark Performance

  1. Load and Save Data
  • Load all the files located in a folder as a single RDD. Each line should be one element in the RDD.
sc.textFile(folder_name)
  • Load all the files located in a folder as a paired RDD. Each file should be one pair in the RDD.
sc.wholeTextFiles(dir_name)
  • Return multiple outputs
rdd.saveAsTextFile(dir_name)

2. Memory-Disk Persistence

  • Select a persistency level for an RDD
from pyspark.storagelevel import StorageLevel rdd.persist(StorageLevel.<persistency_level>)
  • Store RDDs in memory. If an RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
rdd.persist(StorageLevel.MEMORY_ONLY)

or

rdd.cache()
  • Store RDDs in disk
rdd.persist(StorageLevel.DISK_ONLY)
  • If an RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
rdd.persist(StorageLevel.MEMORY_AND_DISK)
  • Same as the levels above, but replicate each partition on two cluster nodes.
StorageLevel.MEMORY_ONLY_2
StorageLevel.MEMORY_AND_DISK_2
...
  • Manually remove the persistency level
rdd.unpersist()

3. Parallelism Level

  • Returns the number of partitions in RDD.
rdd.getNumPartitions()
  • Return an RDD created by coalescing all elements within each partition into a list.
rdd.glom().collect()
  • Specify the min number of partitions to cut the RDD into.
rdd.<trans>(...,numPartitions)
  • Defines how the elements in an RDD or paired RDD are partitioned. The default HashPartitioner maps each key to a partition ID, from 0 to numPartitions-1 . We can also RangePartitioner or CustomPartitioner.
rdd.partitionBy(partition_size, partitionFunc)
  • Shuffle data across the network to create a new set of partitions.
rdd.repartition(numPartitions: Int)
  • An optimized version of repartition() — avoid data movement and reduce the number of RDD partitions.
rdd.coalesce(numPartitions: Int, shuffle = false)

4. RDD Dependencies (RDD Lineage)

  • RDD Dependency: A dependency between an old and a new RDD is created, every time a transformation is performed on an RDD.
  • RDD Resilience: As Spark records the linage of each RDD, any RDDs can be reconstructed to the state it was at the time of the failure using RDD lineage.
  • Narrow Dependency: When no data shuffle between partitions is required.
  • Wide Dependency: When it requires shuffle when joining RDDs.
  • Shows a textual representation of RDD dependencies.
rdd.toDebugString().decode('utf-8')
  • Set a checkpoint to remove the RDD linage including its parents’ information.
sc.setCheckpointDir("dir")
rdd.checkpoint()
rdd.action()

5. Shared Variables

  • Create an accumulator for communication with Spark executors start by inital_value ,
accum = sc.accumulator(inital_value)
  • foreach action used to operate accumulator
rdd.foreach(lambda x: accum.add(1))
  • Get the value of the accumulator
accum.value
  • Create a read-only broadcast variable set by the driver
broadcast_var = sc.broadcast(value)
  • Unpersist method removes from executor nodes. It still stays on the driver node, so it can be re-broadcast.
broadcast_var.unpersist()