Tag Archives: Performance

Advanced Data Partitioning in Spark

Screen Shot 2015-05-30 at 4.26.30 PMIn this post we take a look at advance data partitioning Spark.  Partitioning is the process that distributes your data across the Spark worker nodes.  If you process large amounts of data it is important that you chose your partitioning strategy carefully because communication is expensive in a distributed program.

Partitioning only applies to Spark key-value pair RDDs, and although you cannot force a record with a specific key to go to a specific node you can make sure that records with the same key are processed together on the same node.  This is important because you may need to join one data set against another dataset, and if the same keys reside on the same node for both datasets Spark does not need to communicate across nodes.

An example is a program that analyzes click events for a set of users.  In your code, you might define a `UserData` and `events` RDDs, and join them as follows:


val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://…").persist()
val events = sc.squenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)

view raw

gistfile1.scala

hosted with ❤ by GitHub

5397024801_3d64b81c58_zHowever, this process is inefficient because if you perform the join in a loop, say once for each of a set of log files, then Spark will hash all the keys of both datasets and send elements with the same keys of both datasets across the network to perform the join.

To prevent this performance issue you should partition the user dataset before you persist it and then use it as the first dataset in the join operation.  If you do so you can leave the join statement from above unchanged and ensure that only the event data is sent across the network to pair up with the user data that is already persisted across worker nodes:


val userData = sc.sequenceFile[UserID, UserInfo]("HDFS://…")
.partitionedBy(new HashPartitioner(100))
.persist()
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)
}

view raw

gistfile1.scala

hosted with ❤ by GitHub

By using this technique of partitioning a larger reference dataset before persisting it, and then using it as the first dataset in a join, you can reduce network traffic and improve performance of your Spark code.  For more detailed information see “Learning Spark”, Chapter 4.