Why does foreach %dopar% get slower with each additional node?

JustGettinStarted Source

I wrote a simple matrix multiplication to test out multithreading/parallelization capabilities of my network and I noticed that the computation was much slower than expected.

The Test is simple : multiply 2 matrices (4096x4096) and return the computation time. Neither the matrices nor results are stored. The computation time is not trivial (50-90secs depending on your processor).

The Conditions : I repeated this computation 10 times using 1 processor, split these 10 computations to 2 processors (5 each), then 3 processors, ... up to 10 processors (1 computation to each processor). I expected the total computation time to decrease in stages, and i expected 10 processors to complete the computations 10 times as fast as it takes one processor to do the same.

The Results : Instead what i got was only a 2 fold reduction in computation time which is 5 times SLOWER than expected.

enter image description here

When i computed the average computation time per node, i expected each processor to compute the test in the same amount of time (on average) regardless of the number of processors assigned. I was surprised to see that merely sending the same operation to multiple processor was slowing down the average computation time of each processor.

enter image description here

Can anyone explain why this is happening?

Note this is question is NOT a duplicate of these questions:

foreach %dopar% slower than for loop

or

Why is the parallel package slower than just using apply?

Because the test computation is not trivial (ie 50-90secs not 1-2secs), and because there is no communication between processors that i can see (i.e. no results are returned or stored other than the computation time).

I have attached the scripts and functions bellow for replication.

library(foreach); library(doParallel);library(data.table)
# functions adapted from
# http://www.bios.unc.edu/research/genomic_software/Matrix_eQTL/BLAS_Testing.html

Matrix.Multiplier <- function(Dimensions=2^12){
  # Creates a matrix of dim=Dimensions and runs multiplication
  #Dimensions=2^12
  m1 <- Dimensions; m2 <- Dimensions; n <- Dimensions;
  z1 <- runif(m1*n); dim(z1) = c(m1,n)
  z2 <- runif(m2*n); dim(z2) = c(m2,n)
  a <- proc.time()[3]
  z3 <- z1 %*% t(z2)
  b <- proc.time()[3]
  c <- b-a
  names(c) <- NULL
  rm(z1,z2,z3,m1,m2,n,a,b);gc()
  return(c)
}

Nodes <- 10
Results <- NULL
for(i in 1:Nodes){
  cl <- makeCluster(i)
  registerDoParallel(cl)
  ptm <- proc.time()[3]
  i.Node.times <- foreach(z=1:Nodes,.combine="c",.multicombine=TRUE, 
                          .inorder=FALSE) %dopar% {
                            t <- Matrix.Multiplier(Dimensions=2^12)
                          }
  etm <- proc.time()[3]
  i.TotalTime <- etm-ptm
  i.Times <- cbind(Operations=Nodes,Node.No=i,Avr.Node.Time=mean(i.Node.times),
                   sd.Node.Time=sd(i.Node.times),
                   Total.Time=i.TotalTime)
  Results <- rbind(Results,i.Times)
  rm(ptm,etm,i.Node.times,i.TotalTime,i.Times)
  stopCluster(cl)
}
library(data.table)
Results <- data.table(Results)
Results[,lower:=Avr.Node.Time-1.96*sd.Node.Time]
Results[,upper:=Avr.Node.Time+1.96*sd.Node.Time]
Exp.Total <- c(Results[Node.No==1][,Avr.Node.Time]*10,
               Results[Node.No==1][,Avr.Node.Time]*5,
               Results[Node.No==1][,Avr.Node.Time]*4,
               Results[Node.No==1][,Avr.Node.Time]*3,
               Results[Node.No==1][,Avr.Node.Time]*2,
               Results[Node.No==1][,Avr.Node.Time]*2,
               Results[Node.No==1][,Avr.Node.Time]*2,
               Results[Node.No==1][,Avr.Node.Time]*2,
               Results[Node.No==1][,Avr.Node.Time]*2,
               Results[Node.No==1][,Avr.Node.Time]*1)
Results[,Exp.Total.Time:=Exp.Total]

jpeg("Multithread_Test_TotalTime_Results.jpeg")
par(oma=c(0,0,0,0)) # set outer margin to zero
par(mar=c(3.5,3.5,2.5,1.5)) # number of lines per margin (bottom,left,top,right)
plot(x=Results[,Node.No],y=Results[,Total.Time],  type="o", xlab="", ylab="",ylim=c(80,900),
     col="blue",xaxt="n", yaxt="n", bty="l")
title(main="Time to Complete 10 Multiplications", line=0,cex.lab=3)
title(xlab="Nodes",line=2,cex.lab=1.2,
      ylab="Total Computation Time (secs)")
axis(2, at=seq(80, 900, by=100), tick=TRUE, labels=FALSE)
axis(2, at=seq(80, 900, by=100), tick=FALSE, labels=TRUE, line=-0.5)
axis(1, at=Results[,Node.No], tick=TRUE, labels=FALSE)
axis(1, at=Results[,Node.No], tick=FALSE, labels=TRUE, line=-0.5)
lines(x=Results[,Node.No],y=Results[,Exp.Total.Time], type="o",col="red")
legend('topright','groups',
       legend=c("Measured", "Expected"), bty="n",lty=c(1,1),
       col=c("blue","red"))
dev.off()

jpeg("Multithread_Test_PerNode_Results.jpeg")
par(oma=c(0,0,0,0)) # set outer margin to zero
par(mar=c(3.5,3.5,2.5,1.5)) # number of lines per margin (bottom,left,top,right)
plot(x=Results[,Node.No],y=Results[,Avr.Node.Time],  type="o", xlab="", ylab="",
     ylim=c(50,500),col="blue",xaxt="n", yaxt="n", bty="l")
title(main="Per Node Multiplication Time", line=0,cex.lab=3)
title(xlab="Nodes",line=2,cex.lab=1.2,
      ylab="Computation Time (secs) per Node")
axis(2, at=seq(50,500, by=50), tick=TRUE, labels=FALSE)
axis(2, at=seq(50,500, by=50), tick=FALSE, labels=TRUE, line=-0.5)
axis(1, at=Results[,Node.No], tick=TRUE, labels=FALSE)
axis(1, at=Results[,Node.No], tick=FALSE, labels=TRUE, line=-0.5)
abline(h=Results[Node.No==1][,Avr.Node.Time], col="red")
epsilon = 0.2
segments(Results[,Node.No],Results[,lower],Results[,Node.No],Results[,upper])
segments(Results[,Node.No]-epsilon,Results[,upper],
         Results[,Node.No]+epsilon,Results[,upper])
segments(Results[,Node.No]-epsilon, Results[,lower],
         Results[,Node.No]+epsilon,Results[,lower])
legend('topleft','groups',
       legend=c("Measured", "Expected"), bty="n",lty=c(1,1),
       col=c("blue","red"))
dev.off()

EDIT : Response @Hong Ooi's comment

I used lscpu in UNIX to get;

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                30
On-line CPU(s) list:   0-29
Thread(s) per core:    1
Core(s) per socket:    1
Socket(s):             30
NUMA node(s):          4
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 63
Model name:            Intel(R) Xeon(R) CPU E5-2630 v3 @ 2.40GHz
Stepping:              2
CPU MHz:               2394.455
BogoMIPS:              4788.91
Hypervisor vendor:     VMware
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              20480K
NUMA node0 CPU(s):     0-7
NUMA node1 CPU(s):     8-15
NUMA node2 CPU(s):     16-23
NUMA node3 CPU(s):     24-29

EDIT : Response to @Steve Weston's comment.

I am using a virtual machine network (but I'm not the admin) with access to up to 30 clusters. I ran the test you suggested. Opened up 5 R sessions and ran the matrix multiplication on 1,2...5 simultaneously (or as quickly as i could tab over and execute). Got very similar results to before (re: each additional process slows down all individual sessions). Note i checked memory usage using top and htop and the usage never exceeded 5% of the network capacity (~2.5/64Gb).

enter image description here

CONCLUSIONS:

The problem seems to be R specific. When i run other multi-threaded commands with other software (e.g. PLINK) i don't run into this problem and parallel process run as expected. I have also tried running the above with Rmpi and doMPI with same (slower) results. The problem appears to be related R sessions/parallelized commands on virtual machine network. What i really need help on is how to pinpoint the problem. Similar problem seems to be pointed out here

rmultithreadingforeachparallel-processingdoparallel

Answers

answered 1 year ago Mandar #1

I guess it has been answered here already? foreach %dopar% slower than for loop

I mean its a bit of extrapolation of the same response. So basically concept remains constant.

Here is another example where the process happens in parallel (Sequentially or out of sequence). The answer has no expectation of to be combined. Its simple reusing the same outcome by discarding the same variable. (Exactly like simple for loop). But still the loop slows in doParallel Why is foreach() %do% sometimes slower than for?

answered 1 year ago Steve Weston #2

I find the per-node multiplication time very interesting because the timings don't include any of the overhead associated with the parallel loop, but only the time to perform the matrix multiplication, and they show that the time increases with the number of matrix multiplications executing in parallel on the same machine.

I can think of two reasons why that might happen:

  1. The memory bandwidth of the machine is saturated by the matrix multiplications before you run out of cores;
  2. The matrix multiplication is multi-threaded.

You can test for the first situation by starting multiple R sessions (I did this in multiple terminals), creating two matrices in each session:

> x <- matrix(rnorm(4096*4096), 4096)
> y <- matrix(rnorm(4096*4096), 4096)

and then executing a matrix multiplication in each of those sessions at about the same time:

> system.time(z <- x %*% t(y))

Ideally, this time will be the same regardless of the number of R sessions you use (up to the number of cores), but since matrix multiplication is a rather memory intensive operation, many machines will run out of memory bandwidth before they run out of cores, causing the times to increase.

If your R installation was built with a multi-threaded math library, such as MKL or ATLAS, then you could be using all of your cores with a single matrix multiplication, so you can't expect better performance by using multiple processes unless you use multiple computers.

You can use a tool such as "top" to see if you're using a multi-threaded math library.

Finally, the output from lscpu suggests that you're using a virtual machine. I've never done any performance testing on multi-core virtual machines, but that could also be a source of problems.


Update

I believe the reason that your parallel matrix multiplications run more slowly than a single matrix multiplication is that your CPU isn't able to read memory fast enough to feed more than about two cores at full speed, which I referred to as saturating your memory bandwidth. If your CPU had large enough caches, you might be able to avoid this problem, but it doesn't really have anything to do with the amount of memory that you have on your motherboard.

I think this is just a limitation of using a single computer for parallel computations. One of the advantages of using a cluster is that your memory bandwidth goes up as well as your total aggregate memory. So if you ran one or two matrix multiplications on each node of a multi-node parallel program, you wouldn't run into this particular problem.

Assuming you don't have access to a cluster, you could try benchmarking a multi-threaded math library such as MKL or ATLAS on your computer. It's very possible that you could get better performance running one multi-threaded matrix multiply than running them in parallel in multiple processes. But be careful when using both a multi-threaded math library and a parallel programming package.

You could also try using a GPU. They're obviously good at performing matrix multiplications.


Update 2

To see if the problem is R specific, I suggest that you benchmark the dgemm function, which is the BLAS function used by R to implement matrix multiplication.

Here's a simple Fortran program to benchmark dgemm. I suggest executing it from multiple terminals in the same way that I described for benchmarking %*% in R:

      program main
      implicit none
      integer n, i, j
      integer*8 stime, etime
      parameter (n=4096)
      double precision a(n,n), b(n,n), c(n,n)
      do i = 1, n
        do j = 1, n
          a(i,j) = (i-1) * n + j
          b(i,j) = -((i-1) * n + j)
          c(i,j) = 0.0d0
        end do
      end do
      stime = time8()
      call dgemm('N','N',n,n,n,1.0d0,a,n,b,n,0.0d0,c,n)
      etime = time8()
      print *, etime - stime
      end

On my Linux machine, one instance runs in 82 seconds, while four instances run in 116 seconds. This is consistent with the results that I see in R and with my guess that this is a memory bandwidth problem.

You can also link this against different BLAS libraries to see which implementation works better on your machine.

You might also get some useful information about the memory bandwidth of your virtual machine network using pmbw - Parallel Memory Bandwidth Benchmark, although I've never used it.

answered 1 year ago Alex W #3

I think the obvious answer here is the correct one. Matrix multiplication is not embarrassingly parallel. And you do not appear to have modified the serial multiplication code to parallelize it.

Instead, you are multiplying two matrices. Since the multiplication of each matrix is likely being handled by only a single core, every core in excess of two is simply idle overhead. The result is that you only see a speed improvement of 2x.

You could test this by running more than 2 matrix multiplications. But I'm not familiar with the foreach, doParallel framework (I use parallel framework) nor do I see where in your code to modify this to test it.

An alternative test is to do a parallelized version of matrix multiplication, which I borrow directly from Matloff's Parallel Computing for Data Science. Draft available here, see page 27

mmulthread <- function(u, v, w) {
  require(parallel)
  # determine which rows for this thread
  myidxs <- splitIndices(nrow(u), myinfo$nwrkrs ) [[ myinfo$id ]]
  # compute this thread's portion of the result
  w[myidxs, ] <- u [myidxs, ] %*% v [ , ]
  0 # dont return result -- expensive
}
# t e s t on snow c l u s t e r c l s
test <- function (cls,  n = 2^5) {
  # i n i t Rdsm
  mgrinit(cls)
  # shared variables
  mgrmakevar(cls, "a", n, n)
  mgrmakevar(cls, "b", n, n)
  mgrmakevar(cls, "c", n, n)
  # f i l l i n some t e s t data
  a [ , ] <- 1:n
  b [ , ] <- rep (1 ,n)

  # export function
  clusterExport(cls , "mmulthread" )
  # run function
  clusterEvalQ(cls , mmulthread (a ,b ,c ))
  #print ( c[ , ] ) # not p ri n t ( c ) !
}


library(parallel)
library(Rdsm)

c1 <- makeCluster(1)
c2 <- makeCluster (2)
c4 <- makeCluster(4)
c8 <- makeCluster(8)

library(microbenchmark)

microbenchmark(node1= test(c1, n= 2^10),
           node2= test(c2, n= 2^10),
           node4= test(c4, n= 2^10),
           node8= test(c8, n= 2^10))



 Unit: milliseconds
  expr      min       lq     mean   median       uq      max neval  cld
 node1 715.8722 780.9861 818.0487 817.6826 847.5353 922.9746   100    d
 node2 404.9928 422.9330 450.9016 437.5942 458.9213 589.1708   100   c 
 node4 255.3105 285.8409 309.5924 303.6403 320.8424 481.6833   100 a   
 node8 304.6386 328.6318 365.5114 343.0939 373.8573 836.2771   100  b  

As expected, by parallelizing the matrix multiplication, we do see the spend improvement we wanted, although parallel overhead is clearly extensive.

comments powered by Disqus