crew
is a distributed computing framework with a
centralized interface and auto-scaling. A crew
controller
is an object in R which accepts tasks, returns results, and launches
workers. Workers can be local processes, jobs on traditional clusters
such as SLURM, or jobs on cloud services such as AWS Batch, depending on
the launcher
plugin of the controller.
A task is a piece of R code, such as an expression or a
function call. A worker is a non-interactive
R process that runs one or more tasks. When tasks run on workers, the
local R session is free and responsive, and work gets done faster. For
example, this
vignette shows how crew
and mirai
work
together to speed up Shiny apps.
crew
First, create a controller object to manage tasks and workers.
library(crew)
controller <- crew_controller_local(
name = "example",
workers = 2,
seconds_idle = 10
)
Next, start the controller to create the mirai
client. Later, when you are done with the controller, call
controller$terminate()
to clean up the workers and
dispatcher.
Use push()
to submit a new task and pop()
to return a completed task.
As a side effect, methods push()
, pop()
,
and scale()
also launch workers to run the tasks. If your
controller uses transient workers and has a backlog of tasks, you may
need to loop over pop()
or scale()
multiple
times to make sure enough workers are always available.
controller$pop() # No workers started yet and the task is not done.
#> NULL
task <- controller$pop() # Worker started, task complete.
task
#> # A tibble: 1 × 12
#> name command result seconds seed algorithm error trace warnings
#> <chr> <chr> <list> <dbl> <int> <chr> <chr> <chr> <chr>
#> 1 get pid NA <int> 0 NA NA NA NA NA
#> # ℹ 3 more variables: launcher <chr>, worker <int>, instance <chr>
Alternatively, wait()
is a loop that repeatedly checks
tasks and launches workers until all tasks complete.
The return value of the task is in the result
column.
Here is the full list of output in the task
object
returned by pop()
.
name
: the task name if given.command
: a character string with the R command if
save_command
was set to TRUE
in
push()
.result
: a list containing the return value of the R
command.seconds
: number of seconds that the task ran.seed
: the single integer originally supplied to
push()
, NA
if seed
was supplied
as NULL
.algorithm
: name of the pseudo-random number generator
algorithm originally supplied to push()
, NA
if
algorithm
was supplied as NULL
.error
: the first 2048 characters of the error message
if the task threw an error, NA
otherwise.trace
: the first 2048 characters of the text of the
traceback if the task threw an error, NA
otherwise.warnings
: the first 2048 characters. of the text of
warning messages that the task may have generated, NA
otherwise.launcher
: name of the crew
launcher where
the task ran.If seed
and algorithm
are both non-missing
in the output, then you can recover the pseudo-random number generator
state of the task using
set.seed(seed = seed, kind = algorithm)
. However, it is
recommended to supply NULL
to these arguments in
push()
, in which case you will observe NA
in
the outputs. With seed
and algorithm
both
NULL
, the random number generator defaults to the
recommended widely spaced worker-specific L’Ecuyer streams supported by
mirai::nextstream()
. See
vignette("parallel", package = "parallel")
for details.
The map()
method of the controller supports functional programming similar to purrr::map()
and clustermq::Q()
.
The arguments of map()
are mostly the same those of push()
,
but there is a new iterate
argument to define the inputs of
individual tasks. map()
submits a whole collection of tasks, auto-scales the workers, waits for
all the tasks to finish, and returns the results in a
tibble
.
Below, map()
submits one task to compute 1 + 2 + 5 + 6
and another task
to compute 3 + 4 + 5 + 6
. The lists and vectors inside
iterate
vary from task to task, while the elements of
data
and globals
stay constant across
tasks.
results <- controller$map(
command = a + b + c + d,
iterate = list(
a = c(1, 3),
b = c(2, 4)
),
data = list(c = 5),
globals = list(d = 6)
)
results
#> # A tibble: 2 × 12
#> name command result seconds seed algorithm error trace warnings
#> <chr> <chr> <list> <dbl> <int> <chr> <chr> <chr> <chr>
#> 1 1 NA <dbl [1]> 0 NA NA NA NA NA
#> 2 2 NA <dbl [1]> 0 NA NA NA NA NA
#> # ℹ 3 more variables: launcher <chr>, worker <int>, instance <chr>
as.numeric(results$result)
#> [1] 14 18
If at least one task in map()
throws an error, the default behavior is to error out in the main
session and not return the results, If that happens, the results are
available in the controller$error
. To return the results
instead of setting controller$error
, regardless of error
status, set error = "warn"
or "silent"
in map()
.
To conserve memory, consider setting
controller$error <- NULL
when you are done
troubleshooting.
The walk()
method is just like map()
, but it does not wait for any
tasks to complete. Instead, it returns control to the local R session
immediately and lets you do other things while the tasks run in the
background.
controller$walk(
command = a + b + c + d,
iterate = list(
a = c(1, 3),
b = c(2, 4)
),
data = list(c = 5),
globals = list(d = 6)
)
The collect()
pops all completed tasks. Put together, walk()
,
wait(mode = "all")
, and collect()
have the
same overall effect as map()
.
controller$wait(mode = "all")
controller$collect()
#> # A tibble: 2 × 12
#> name command result seconds seed algorithm error trace warnings
#> <chr> <chr> <list> <dbl> <int> <chr> <chr> <chr> <chr>
#> 1 1 NA <dbl [1]> 0 NA NA NA NA NA
#> 2 2 NA <dbl [1]> 0 NA NA NA NA NA
#> # ℹ 3 more variables: launcher <chr>, worker <int>, instance <chr>
However, there are subtle differences between the synchronous and asynchronous functional programming methods:
map()
requires an empty controller to start with (no
prior tasks). But with walk()
, the controller can have any
number of running or unpopped tasks beforehand.wait()
does not show a progress bar because it would be
misleading if there are a lot of prior tasks. Because map()
requires the controller to be empty initially (i.e. (1)), it shows a
progress bar while correctly representing the amount of work left to
do.The controller summary shows how many tasks each worker ran, how many total seconds it spent running tasks, and how many tasks threw warnings and errors.
controller$summary()
#> # A tibble: 2 × 6
#> controller worker tasks seconds errors warnings
#> <chr> <int> <int> <dbl> <int> <int>
#> 1 example 1 2 0.001 0 0
#> 2 example 2 1 0 0 0
The launcher summary counts the number of times each worker was
launched, and it shows the total number of assigned and completed tasks
from all past terminated instances of each worker. In addition, it shows
whether the current worker instance was actively connected (“online”) or
had connected at some point during its life cycle (“discovered”) as of
the last call to controller$launcher$tally()
.
controller$launcher$summary()
#> # A tibble: 2 × 6
#> worker launches online discovered assigned complete
#> <int> <int> <lgl> <lgl> <int> <int>
#> 1 1 2 TRUE TRUE 0 0
#> 2 2 1 TRUE TRUE 0 0
Finally, the client summary shows up-to-date worker status from
mirai::daemons()
.
Call terminate()
on the controller after you finish
using it. terminate()
tries to close the the mirai
dispatcher and any workers that may still be running. It is important to
free up these resources.
The mirai
dispatcher process should exit on its own, but
if not, you can manually terminate the process ID at
controller$client$dispatcher
or call
crew_clean()
to terminate any dispatchers from current or
previous R sessions.
A crew
controller creates different types of local
processes. These include:
mirai
needs
this process to orchestrate tasks.crew
launches to run
tasks. These may be local processes as in the case of
crew_controller_local()
, or they may be processes on
different computers if you are using a third-party launcher
plugin like crew.cluster
or
crew.aws.batch
. launches processes.mirai
outside of
crew
to run tasks. Such processes may spawn automatically
if you set the processes
argument of
e.g. crew.aws.batch::crew_controller_aws_batch()
to a
positive integer.Usually these processes terminate themselves when the parent R
session exits or the controller terminates, but under rare circumstances
they may continue running. The “local monitor” in crew
makes it easy to list and terminate any of these processes which may be
running on your local computer. Example:
monitor <- crew_monitor_local()
monitor$dispatchers() # List PIDs of all local {mirai} dispatcher processes.
#> [1] 31215
monitor$daemons()
#> integer(0)
monitor$workers()
#> [1] 57001 57002
monitor$terminate(pid = c(57001, 57002))
monitor$workers()
#> integer(0)
crew_monitor_local()
only manages processes running on
your local computer. To manage crew
workers running on
different computers, such as SLURM or AWS Batch, please familiarize
yourself with the given computing platform, and consider using the
monitor objects in the relevant third-party plugin packages such as crew.cluster
or crew.aws.batch
.
Example: https://wlandau.github.io/crew.aws.batch/index.html#job-management.
As explained above, push()
, pop()
, and
wait()
launch new workers to run tasks. The number of new
workers depends on the number of tasks at the time. In addition, workers
can shut themselves down as work completes. In other words,
crew
automatically raises and lowers the number of workers
in response to fluctuations in the task workload.
The most useful arguments for down-scaling, in order of importance, are:
seconds_idle
: shut down a worker if it spends too long
waiting for a task.tasks_max
: shut down a worker after it completes a
certain number of tasks.seconds_wall
: soft wall time of a worker.Please tune these these arguments to achieve the desired balance for
auto-scaling. The two extremes of auto-scaling are clustermq
-like
persistent workers and future
-like
transient workers, and each is problematic in its own way.
Some launchers support local processes to launch and terminate
workers asynchronously. For example, a cloud-based launcher may need to
make HTTP requests to launch and terminate workers on e.g. AWS Batch,
and these time-consuming requests should happen in the background.
Controllers that support this will have a processes
argument to specify the number of local R processes to churn through
worker launches and terminations. Set processes = NULL
to
disable async, which can be helpful for troubleshooting.