R+SNOW

Primer

Forms of parallel computing.
Bit level
Instruction level
Data
Task parallelism

Issues in programming parallel applications.
Race conditions
Communication between subtasks
* Synchronization between subtasks

http://en.wikipedia.org/wiki/Parallel_computing - Wikipedia link to parallel computing

Intro

R and SNOW may be considered task parallelism. Using R with the Rmpi library can complicate your scripts if you are not familiar with what you are doing. SNOW for R is an Rmpi wrapper that attempts to simplify the interface and get you closer to implementing a parallel R environment with a less gradient learning curve. The R language often finds itself in situations that can be parallelized, and makes a great primer to parallel programming in general.

This is a simple guide to how to make a parallel app, and is by no means in depth or complete. Many great references are out there, and are worth looking at.

Theoretical Applications
The first transformation that SNOW addresses is iterative to parallel processing. Consider the following:

factorial.R

#computes the factorial of   
 i <- 1  
 iprod <- 1  
 while(i < 10) {  
        iprod <- iprod * i  
        i <- i + 1  
 }

R has built in functions for things like these, but we’ll ignore them for the moment. In this case, we have an iterative loop determining a factorial. Using SNOW’s parLapply, we can rewrite a parallel application:

factorial-par.R

library(snow)  
 c1 <- makeCluster(4, type="MPI")

 #map  
 iprod <- 1  
 imap <- parLapply(c1, 1:9, function(i) {  
        return(i)  
        }  
 )  
 #reduce  
 i <- 1  
 while(i <= 9) {  
        iprod <- iprod * imap[[i]]  
        i <- i + 1  
 }

 stopCluster(c1)

Discussion

There are a few things to consider with the above example:
Data dependency / mutual exclusion
Methods of data handling
*Transfer considerations

Data Dependency / MutEx

Most iterative to parallel transformations involve data dependency. That is, the state of one iteration depends on the state of the previous. To highlight this, in this example, the value of iprod is its previous value times the index, i. Also, if we used some of the other mpi calls, we would have a mutex issue with the variable i.

parLapply helps us out in a few ways. First, we call each iteration a function. The first argument, i, has mutex taken care of by SNOW, and automatically iterates. That is, 0 is called on node0, 1 is called on node1, etc. If i is greater than the number of nodes, the next iteration is called on the next node that finishes its process (or it starts back from the beginning, I am not exactly sure).

Using a function as the iterative body, results are stored in parLapply’s return value. Which leads into the next point.

Methods of data handling
Next, parLapply helps communicate ‘iterative’ (or per node) results by using the return from the function. When the function returns, its return value is added into a vector entry for the entire return vector from parLapply. The result is one return vector with all the iterative results. If you were to print imap from our example, you would see:

[[1]] 1  
 [[2]] 2  
 ...  
 [[9]] 9

So we could (perhaps slightly inaccurately) coin parLapply as [http://en.wikipedia.org/wiki/MapReduce#Map_function mapping] processes out to the nodes, and we then collect and reduce them from the resulting vector. It this case, it is a simple integer vector, so we determine the products here, and get our factorial.

In this case, the parallel application adds nothing but overhead, although it highlights ease in parallelizing an application. There is no speedup. So making a parallel application may sometimes be detrimental. There are things to consider when transfering a sequential application to parallel, which leads to the next point.

Transfer Considerations

In our Theoretical example, we added nothing but overhead because we only really utilized the iterative functionality of parLapply, for simplicity. A truely beneficial application would have return a result that took some time to compute. In using SNOW, we incur the time it takes to spawn MPI processes on the nodes, the time to map data to the nodes, and the time it takes to reduce the information from the nodes. These are fairly substantial processes in themselves.

More formally, see [http://en.wikipedia.org/wiki/Parallel_computing#Amdahl.27s _law_and_Gustafson.27s_law Amdahl’s Law ]. When considering parallelizing a process, consider the time it takes to send and then receive data through the network per element (i.e. (send = .03 ms) + (receive = .03 ms) = Total time per element = .06 ms). Then consider the time it takes to reduce the resulting vector per element (say .03 ms). Lets consider the overhead per element (here .09 ms). Processing is say .5 ms per element, and lets say we have 10 nodes for 10 elements. The total processing time is still .5 ms since we do them all at the same time. So we spend (processing time per node = .5 ms) + (overhead per node = .06 ms * 10 => = .6). Total processing time with the parallel app would be 1.1, versus the sequential version, which was strictly (.5 ms * 10 elements = 5 seconds). Due to the processing time, this would be beneficial. One could say to parallelize when:

Pt < St

Pt = {(o) * n} + {(p) * (n/N)}
St = (p * n)

where
Pt = parallel process time
St = sequential process time
o = overhead time per node (send and receive time in the network, reduce time, etc)
n = sample size in the data set (nth iterations)
p = process time per data set
N = number of nodes available to process in parallel

This algorithm highlights a loss in speedup in the case that the sample size exceeds the number of nodes, amoung other things.

Practical Applications

Now that the example has one convinced that speedup is hard to achieve, consider a real world example. This is an excerpt from a Brownian Bridge algorithm to calculate spacial use probabilities:

      #Estimate Brownian bridge  
        #Note: 5 minute time step (dt interval in Eq.5 Horne et al. 2007) is used.  
        Time.Diff = c(diff(Time), NA)  
        T.Total = Time[n.locs] - Time[1]  
        probability = NULL  
        int = 0

        for(i in 1:(n.locs-1)){  
                if(Time.Diff[i] <= max.lag){  
                        theta = NULL  
                        tm = 0  
                        while(tm <= Time.Diff[i]){  
                                alpha = tm/Time.Diff[i]  
                                mu.x = X[i] + alpha*(X[i+1] - X[i])  
                                mu.y = Y[i] + alpha*(Y[i+1] - Y[i])  
                                sigma.2 = Time.Diff[i]*alpha*(1-alpha)*BMvar[i] +  
                                        ((1-alpha)^2)*(LocationError[i]^2) +  
                                        (alpha^2)*(LocationError[i+1]^2)  
                                ZTZ = (grid$x - mu.x)^2 + (grid$y - mu.y)^2  
                                theta = (1/sqrt(2*pi*sigma.2))*exp(-ZTZ/(2*sigma.2))  
                                int = int + theta  
                                tm = tm + 5  
                        }  
                }   
        }

theta, sigma, tm, the ZTZ vector, and others are set elsewhere, but the only changing value here is the int vector.

The motivation is the seemingly exponential increase in process time considering location sample size (variable ‘loc’). At a sample size of 5K, processing time is around .05 ms per location on a 1 Gflop/s processor (roughly). At a sample size of 140K, it is around 4.5 seconds!

A parallel app certainly fits the bill:

      #Estimate Brownian bridge  
        #Note: 5 minute time step (dt interval in Eq.5 Horne et al. 2007) is used.  
        Time.Diff = c(diff(Time), NA)  
        T.Total = Time[n.locs] - Time[1]  
        probability = NULL  
        int = 0  
        #map  
        intmap <- parLapply(c1, 1:(n.locs-1), function(i) {  
                if(Time.Diff[i] <= max.lag){  
                        theta = NULL  
                        tm = 0  
                        while(tm <= Time.Diff[i]){  
                                alpha = tm/Time.Diff[i]  
                                mu.x = X[i] + alpha*(X[i+1] - X[i])  
                                mu.y = Y[i] + alpha*(Y[i+1] - Y[i])  
                                sigma.2 = Time.Diff[i]*alpha*(1-alpha)*BMvar[i] +  
                                        ((1-alpha)^2)*(LocationError[i]^2) +  
                                        (alpha^2)*(LocationError[i+1]^2)  
                                ZTZ = (grid$x - mu.x)^2 + (grid$y - mu.y)^2  
                                theta = (1/sqrt(2*pi*sigma.2))*exp(-ZTZ/(2*sigma.2))  
                                int = int + theta  
                                tm = tm + 5

                        }  
                return(int)  
                }  
            }  
        )

        #reduce  
        print("Start Reduce")  
        int = 0  
        i = 1  
        while(i < length(intmap)) {  
              if(length(intmap[[i]]) != 0) {  
                int <- int + intmap[[i]]  
              }  
              i <- i + 1  
        }

A sequential process time of 25 minutes is brought down to 7~ minutes using 3 nodes. In this example, you can actually see the potential for time spent per iteration. Since process time per element become so great, it seems applicable to have a cluster of nodes chip away at all these iterations.

PBS/SGE Submission

Works for submitting to PBS or SGE. IBEST now only uses PBS, so you can ignore active comments ‘#$’ if you like.

To take advantage of firefly’s mpi interface for the cluster, we have to submit our ‘’‘factorial-par.R’‘’ script to the batch system. The submission file looks like this:

#!/bin/sh

#$ -cwd  
#$ -pe orte 10  
#$ -l m_core=2  
#PBS -l nodes=5:ppn=2  
#$ -N R-test  
#PBS -N R-test  
#$ -o R-test.o  
#PBS -o R-test.o  
#$ -e R-test.e  
#PBS -e R-test.e  
#$ -V  
#PBS -V

. /usr/modules/init/bash  
module load openmpi

cd $PBS_O_WORKDIR

R < R-MPI_test.R --no-save

Note: Do not do something silly like run R with mpirun. This will cause your Rmpi library load to hang and strangeness to ensue.

The batch server queues and runs the job, and SNOW interfaces through the batch server into mpi. The program is on its way to parallel computing.

More

Snow has a few good pages, and are worth looking over and bookmarking:
http://www.sfu.ca/~sblay/R/snow.html - snow Simplified, good reference
http://www.bepress.com/uwbiostat/paper193/ - Simple Parallel Statistical Computing in R, COBRA, a great Cobra Paper
*http://www.cs.uiowa.edu/~luke/talks/uiowa03.pdf - Simple Parallel Statistical Computing in R - COBRA Slides, Accompanying slides and examples