A Foreach Parallel Adaptor using Futures

Introduction

The future package provides a generic API for using futures in R. A future is a simple yet powerful mechanism to evaluate an R expression and retrieve its value at some point in time. Futures can be resolved in many different ways depending on which strategy is used. There are various types of synchronous and asynchronous futures to choose from in the future package. Additional futures are implemented in other packages. For instance, the future.batchtools package provides futures for any type of backend that the batchtools package supports. For an introduction to futures in R, please consult the vignettes of the future package.

The doFuture package provides a %dopar% adaptor for the foreach package that works with any type of future. The doFuture package is cross platform just as the future package.

Below is an example showing how to make %dopar% work with multiprocess futures. A multiprocess future will be evaluated in parallel using forked processes. If process forking is not supported by the operating system, then multiple background R sessions will instead be used to resolve the futures.

library("doFuture")
registerDoFuture()
plan(multiprocess)

mu <- 1.0
sigma <- 2.0
x <- foreach(i = 1:3, .export = c("mu", "sigma")) %dopar% {
  rnorm(i, mean = mu, sd = sigma)
}

Futures bring foreach to the HPC cluster

To do the same on high-performance computing (HPC) cluster, the future.batchtools package can be used. Assuming batchtools has been configured correctly, then following foreach iterations will be submitted to the HPC job scheduler and distributed for evaluation on the compute nodes.

library("doFuture")
registerDoFuture()
library("future.batchtools")
plan(batchjobs_slurm)

mu <- 1.0
sigma <- 2.0
x <- foreach(i = 1:3, .export = c("mu", "sigma")) %dopar% {
  rnorm(i, mean = mu, sd = sigma)
}

Futures for plyr

The plyr package uses foreach as a parallel backend. This means that with doFuture any type of futures can be used for asynchronous (and synchronous) plyr processing including multicore, multisession, MPI, ad hoc clusters and HPC job schedulers. For example,

library("doFuture")
registerDoFuture()
plan(multiprocess)

library("plyr")
x <- list(a = 1:10, beta = exp(-3:3), logic = c(TRUE, FALSE, FALSE, TRUE))
y <- llply(x, quantile, probs = (1:3) / 4, .parallel = TRUE)
## $a
##  25%  50%  75%
## 3.25 5.50 7.75
##
## $beta
##       25%       50%       75%
## 0.2516074 1.0000000 5.0536690
##
## $logic
## 25% 50% 75%
## 0.0 0.5 1.0

Futures and BiocParallel

The BiocParallel package supports any %dopar% adaptor as a parallel backend. This means that with doFuture, BiocParallel supports any type of future. For example,

library("doFuture")
registerDoFuture()
plan(multiprocess)
library("BiocParallel")
register(DoparParam(), default = TRUE)

mu <- 1.0
sigma <- 2.0
x <- bplapply(1:3, mu = mu, sigma = sigma, function(i, mu, sigma) {
  rnorm(i, mean = mu, sd = sigma)
})

doFuture takes care of global variables for foreach

The foreach package has some support for automated handling of globals, but it does not work in all cases. Specifically, if foreach() is called from within a function, you do need to export globals explicitly. For example, although globals a and b are properly exported when we do

> library("doParallel")
> registerDoParallel(parallel::makeCluster(2))
> mu <- 1.0
> sigma <- 2.0
> x <- foreach(i = 1:3) %dopar% { rnorm(i, mean = mu, sd = sigma) }
> str(x)
List of 3
 $ : num -1.42
 $ : num [1:2] 3.12 -1.33
 $ : num [1:3] -0.0376 -0.1446 1.6368

it falls short as soon as we try

> foo <- function() foreach(i = 1:3) %dopar% { rnorm(i, mean = mu, sd = sigma) }
> x <- foo()
Error in { : task 1 failed - "object 'mu' not found"

The solution is to explicitly export global variables, e.g.

> foo <- function() {
+   foreach(i = 1:3, .export = c("mu", "sigma")) %dopar% {
+     rnorm(i, mean = mu, sd = sigma)
+   }
+ }
> x <- foo()

However, when using the %dopar% adaptor of doFuture, all of the future machinery comes in to play including automatic handling of global variables, e.g.

> library("doFuture")
> registerDoFuture()
> plan(cluster, workers = 2)
> mu <- 1.0
> sigma <- 2.0
> foo <- function() foreach(i = 1:3) %dopar% { rnorm(i, mean = mu, sd = sigma) }
> x <- foo()
> str(x)
List of 3
 $ : num 0.358
 $ : num [1:2] 3.317 -0.689
 $ : num [1:3] -0.104 1.237 2.474

Having said all this, in order to write foreach code that works everywhere, it is better to be conservative and not assume that all end users will use a doFuture backend. Because of this, it is still recommended to explicitly specify all objects that need to be export whenever using the foreach API. The doFuture framework can help you identify what should go into the .export argument. By setting options(doFuture.foreach.export = ".export-and-automatic-with-warning"), doFuture will scan each foreach() %dopar% { ... } call for globals. If it detects global candidates not listed in .export, it will produce an informative warning message suggesting that those should be added.

doFuture replaces existing doNnn packages

Due to the generic nature of futures, the doFuture package provides the same functionality as many of the existing doNnn packages combined, e.g. doMC, doParallel, doMPI, and doSNOW.

doNnn usagedoFuture alternative

library("foreach")
registerDoSEQ()

library("doFuture")
registerDoFuture()
plan(sequential)

library("doMC")
registerDoMC()

library("doFuture")
registerDoFuture()
plan(multicore)

library("doParallel")
registerDoParallel()

library("doFuture")
registerDoFuture()
plan(multiprocess)

library("doParallel")
cl <- makeCluster(4)
registerDoParallel(cl)

library("doFuture")
registerDoFuture()
cl <- makeCluster(4)
plan(cluster, workers = cl)

library("doMPI")
cl <- startMPIcluster(count = 4)
registerDoMPI(cl)

library("doFuture")
registerDoFuture()
cl <- makeCluster(4, type = "MPI")
plan(cluster, workers = cl)

library("doSNOW")
cl <- makeCluster(4)
registerDoSNOW(cl)

library("doFuture")
registerDoFuture()
cl <- makeCluster(4)
plan(cluster, workers = cl)

N/A High-performance compute (HPC) schedulers, e.g. SGE, Slurm, and TORQUE / PBS.

library("doFuture")
registerDoFuture()
library(future.batchtools)
plan(batchtools_sge)

library("doRedis")
registerDoRedis("jobs")
startLocalWorkers(n = 4, queue = "jobs")

N/A. There is currently no known Redis-based future backend and therefore no known doFuture alternative to the doRedis package.


Copyright Henrik Bengtsson, 2016-2017