mirai
may be used as an asynchronous backend for plumber
pipelines.
Example usage is provided below for different types of endpoint.
The plumber router code is run in a daemon process itself so that it does not block the interactive process.
The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.
library(mirai)
# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1
m <- mirai({
library(plumber)
library(promises) # to provide the promise pipe
library(mirai)
# more efficient not to use dispatcher if all requests are similar length
daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously
pr() |>
pr_get(
"/echo",
function(req, res) {
mirai(
{
Sys.sleep(1L)
list(
status = 200L,
body = list(
time = format(Sys.time()), msg = msg, pid = Sys.getpid()
)
)
},
msg = req[["HEADERS"]][["msg"]]
) %...>% (function(x) {
res$status <- x$status
res$body <- x$body
})
}
) |>
pr_run(host = "127.0.0.1", port = 8985)
})
The API can be queried using an async HTTP client such as nanonext::ncurl_aio()
.
Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).
library(nanonext)
res <- lapply(
1:8,
function(i) ncurl_aio(
"http://127.0.0.1:8985/echo",
headers = c(msg = as.character(i))
)
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-08-09 10:54:27\"],\"msg\":[\"1\"],\"pid\":[38085]}"
#>
#> [[2]]
#> [1] "{\"time\":[\"2024-08-09 10:54:27\"],\"msg\":[\"2\"],\"pid\":[38077]}"
#>
#> [[3]]
#> [1] "{\"time\":[\"2024-08-09 10:54:27\"],\"msg\":[\"3\"],\"pid\":[38082]}"
#>
#> [[4]]
#> [1] "{\"time\":[\"2024-08-09 10:54:27\"],\"msg\":[\"4\"],\"pid\":[38079]}"
#>
#> [[5]]
#> [1] "{\"time\":[\"2024-08-09 10:54:28\"],\"msg\":[\"5\"],\"pid\":[38079]}"
#>
#> [[6]]
#> [1] "{\"time\":[\"2024-08-09 10:54:28\"],\"msg\":[\"6\"],\"pid\":[38085]}"
#>
#> [[7]]
#> [1] "{\"time\":[\"2024-08-09 10:54:28\"],\"msg\":[\"7\"],\"pid\":[38082]}"
#>
#> [[8]]
#> [1] "{\"time\":[\"2024-08-09 10:54:28\"],\"msg\":[\"8\"],\"pid\":[38077]}"
daemons(0)
#> [1] 0
Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.
Note that req$postBody
should always be accessed in the router process and passed in as an argument to the ‘mirai’, as this is retrieved using a connection that is not serializable.
library(mirai)
# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1
m <- mirai({
library(plumber)
library(promises) # to provide the promise pipe
library(mirai)
# uses dispatcher - suitable when requests take differing times to complete
daemons(4L, dispatcher = TRUE) # handles 4 requests simultaneously
pr() |>
pr_post(
"/echo",
function(req, res) {
mirai(
{
Sys.sleep(1L) # simulate expensive computation
list(
status = 200L,
body = list(
time = format(Sys.time()),
msg = jsonlite::fromJSON(data)[["msg"]],
pid = Sys.getpid()
)
)
},
data = req$postBody
) %...>% (function(x) {
res$status <- x$status
res$body <- x$body
})
}
) |>
pr_run(host = "127.0.0.1", port = 8986)
})
Querying the endpoint produces the same set of outputs as the previous example.
library(nanonext)
res <- lapply(
1:8,
function(i) ncurl_aio(
"http://127.0.0.1:8986/echo",
method = "POST",
data = sprintf('{"msg":"%d"}', i)
)
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-08-09 10:54:31\"],\"msg\":[\"1\"],\"pid\":[38347]}"
#>
#> [[2]]
#> [1] "{\"time\":[\"2024-08-09 10:54:31\"],\"msg\":[\"2\"],\"pid\":[38349]}"
#>
#> [[3]]
#> [1] "{\"time\":[\"2024-08-09 10:54:31\"],\"msg\":[\"3\"],\"pid\":[38355]}"
#>
#> [[4]]
#> [1] "{\"time\":[\"2024-08-09 10:54:31\"],\"msg\":[\"4\"],\"pid\":[38352]}"
#>
#> [[5]]
#> [1] "{\"time\":[\"2024-08-09 10:54:32\"],\"msg\":[\"5\"],\"pid\":[38347]}"
#>
#> [[6]]
#> [1] "{\"time\":[\"2024-08-09 10:54:32\"],\"msg\":[\"6\"],\"pid\":[38349]}"
#>
#> [[7]]
#> [1] "{\"time\":[\"2024-08-09 10:54:32\"],\"msg\":[\"7\"],\"pid\":[38355]}"
#>
#> [[8]]
#> [1] "{\"time\":[\"2024-08-09 10:54:32\"],\"msg\":[\"8\"],\"pid\":[38352]}"
daemons(0)
#> [1] 0