Tuesday, January 6, 2015

Counters and Spark - Think Accumulators

Common perusal of spark application code shows me that we often have multiple transformations on a RDD followed by an action and sometimes preceded/followed by a count operation to find out the number of records. The count operation is usually costly when you have billions of records and especially if you have this pattern in your code - it starts slowing the system down. I agree to certain extent that RDD's still lay in the JVM memory space and this operation might not be as costly as we think -- so make sure you profile for your code.

For example:

val baseRDD = sc.textFile("mysamplefile.txt")
val firstElementRDD = baseRDD.map(elem => elem.split(",")(0))
firstElementRDD.saveAsObjectFile("path_to_object_file")
val count = firstElementRDD.count()
/* Something gets done with this count, How can we eliminate this? */

We can declare a simple accumulator to do the same job:

val acc = sc.accumulator(0)
val baseRDD = sc.textFile("mysamplefile.txt")
val firstElementRDD = baseRDD.map{case elem => acc.add(1)
                                                                               elem.split(",")(0)}
firstElementRDD.saveAsObjectFile("path_to_object_file")
val count = acc

This is a very common programming pattern for accumulators but is often overlooked.




Friday, November 7, 2014

Spark 101 : How to test your new installation

You have installed Apache Spark (or some of the other distributions like Cloudera, MapR or Hortonworks) and are quite excited to start using it.

But wait -- How do you know you installed it correctly? There are 2 simple tests I want you to run (They take less than a minute, I promise!!)

Open the spark shell (./spark-shell) and run

sc.parallelize(1 to 1000).count()

What to check:
a. It runs and comes back with res0: Long = 1000 as result
b. It distributed the computation across nodes of the cluster. This is evident from the TaskSetManager logs you see when it computes

The second example to quickly test is our ubiquitous Pi computation

sc.parallelize(1 to 1000).map {
    case _ => val x = Math.random()
                    val y = Math.random()
                    if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _) * 4 /1000

It should return back with res1: Int = 3 as result. Check the logs above to make sure that the computation did not throw any exception and was distributed across multiple nodes on the cluster.