---
title: "mirai - Plumber Integration"
vignette: >
  %\VignetteIndexEntry{mirai - Plumber Integration}
  %\VignetteEngine{litedown::vignette}
  %\VignetteEncoding{UTF-8}
---


### Plumber Integration

`mirai` may be used as an asynchronous backend for [`plumber`](https://www.rplumber.io/) pipelines.

Example usage is provided below for different types of endpoint.

#### Example GET Endpoint

The plumber router code is run in a daemon process itself so that it does not block the interactive process.

The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the 'msg' request header together with a timestamp and the process ID of the process it is run on.


``` r
library(mirai)

# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # more efficient not to use dispatcher if all requests are similar length
  daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously

  pr() |>
    pr_get(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L)
            list(
              status = 200L,
              body = list(
                time = format(Sys.time()), msg = msg, pid = Sys.getpid()
              )
            )
          },
          msg = req[["HEADERS"]][["msg"]]
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8985)
})
```

The API can be queried using an async HTTP client such as `nanonext::ncurl_aio()`.

Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).

``` r
library(nanonext)
res <- lapply(
  1:8,
  function(i) ncurl_aio(
    "http://127.0.0.1:8985/echo",
    headers = c(msg = as.character(i))
  )
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"1\"],\"pid\":[30785]}"
#> 
#> [[2]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"2\"],\"pid\":[30777]}"
#> 
#> [[3]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"3\"],\"pid\":[30779]}"
#> 
#> [[4]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"4\"],\"pid\":[30785]}"
#> 
#> [[5]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"5\"],\"pid\":[30782]}"
#> 
#> [[6]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"6\"],\"pid\":[30782]}"
#> 
#> [[7]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"7\"],\"pid\":[30777]}"
#> 
#> [[8]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"8\"],\"pid\":[30779]}"

daemons(0)
#> [1] 0
```

#### Example POST Endpoint

Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.

Note that `req$postBody` should always be accessed in the router process and passed in as an argument to the 'mirai', as this is retrieved using a connection that is not serializable.

``` r
library(mirai)

# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # uses dispatcher - suitable when requests take differing times to complete
  daemons(4L) # handles 4 requests simultaneously

  pr() |>
    pr_post(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L) # simulate expensive computation
            list(
              status = 200L,
              body = list(
                time = format(Sys.time()),
                msg = jsonlite::fromJSON(data)[["msg"]],
                pid = Sys.getpid()
              )
            )
          },
          data = req$postBody
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8986)
})
```

Querying the endpoint produces the same set of outputs as the previous example.

``` r
library(nanonext)
res <- lapply(
  1:8,
  function(i) ncurl_aio(
    "http://127.0.0.1:8986/echo",
    method = "POST",
    data = sprintf('{"msg":"%d"}', i)
  )
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"1\"],\"pid\":[31046]}"
#> 
#> [[2]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"2\"],\"pid\":[31048]}"
#> 
#> [[3]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"3\"],\"pid\":[31054]}"
#> 
#> [[4]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"4\"],\"pid\":[31051]}"
#> 
#> [[5]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"5\"],\"pid\":[31046]}"
#> 
#> [[6]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"6\"],\"pid\":[31048]}"
#> 
#> [[7]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"7\"],\"pid\":[31051]}"
#> 
#> [[8]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"8\"],\"pid\":[31054]}"

daemons(0)
#> [1] 0
```