ipc
provides methods for inter-process communication
useful when developing Shiny applications that use asynchronous
processing.
Perhaps the most important use of this package is to communicate with
a parent process from a child process executed using the
future
package. This is very easy to do. Simply create and
start a Queue
in the parent process. The child process can
then send messages and evaluate R code on the main process.
library(ipc)
library(future)
plan(multisession)
<- queue()
q
<- ""
value
<- future({
f # set value in the main process
$producer$fireEval({
q<- "Hello world"
value
})
})
Sys.sleep(.5)
# Evaluate signals
<- q$consumer$consume()
cons print(value)
## [1] "Hello world"
# Remove temporary files
$destroy() q
A Queue
object has a producer
field that is
used to send signals onto the Queue
and a
consumer
field that is used to read from the queue, and
process any signals written to the Queue
.
Messages can be consumed by calling the consumer’s
consume
method and will be handled by default using the
environment that consume is called in. The consumer’s start
method will execute consume at regular intervals provided the R process
is idle.
By default, a Consumer
object knows how to handle two
signals. The "eval"
handler will execute the signal’s data
as an expression in the evaluation environment (i.e. where
consume
is called. The "doCall"
handler will
call the function defined by the first element of the data with
parameters equal to the second element. Functions to handle various
signals can be added to the consumer using the addHandler
method.
Producer
objects has a built in method to signal for
evaluation. fireEval(expr, env)
will signal for
expr
to be evaluated, substituting in any value in
env
. For example, the following code will set the variable
val to 2 in the environment in which it is consumed.
<- 2
variable $producer$fireEval(val <- j, env=list(j=variable)) q
Signals can also be sent from the main process to a child. Here we will cause the child process to throw an error.
library(future)
library(promises)
plan(multisession)
<- queue()
q
<- future({
fut for(i in 1:1000){
Sys.sleep(.1)
$consumer$consume()
q
}
})
$producer$fireEval(stop("Stop that child"))
qcat(try(value(fut)))
## Error in eval(obj, envir = env) : Stop that child
## Error in eval(obj, envir = env) : Stop that child
$destroy() q
If errors occur during the consumption, all messages are processed,
and then the first error encountered is then thrown. Alternatively,
errors can be switched to warnings using
q$consumer$consume(throwErrors=FALSE)
.
A consumer’s start
method can be used to execute
comsume
at regular intervals (provided the R process is
idle).
library(future)
library(promises)
plan(multisession)
<- queue()
q
<- future({
fut for(i in 1:100){
Sys.sleep(.1)
$producer$fireEval(print(index), list(index=i))
q
}
})
$consumer$start()
q
# ... Later, stop consumption and clean up
# q$destroy()
By default, ipc
communication is backed by text files
(using TextFileSource
class, which wraps the
txtq
package). The files’ location is, again by default,
generated by the tempfile
function. These global defaults
can be overridden using the tempFileGenerator
and
defaultSource
functions.
For communication between processes on the same machine, the defaults
will generally suffice. If processes are running on multiple machines,
two strategies for sources may be used. First, if all machines have
access to a single file system, override the
tempFileGenerator
to point to generate files in the shared
file system. Alternately, ipc
provides the
RedisSource
class to back queues using a redis
database.
<- queue(RedisSource$new()) q
A major use case for this package is to support Shiny applications.
You can view three example applications using the
shinyExample
function. Taking advantage of inter-process
communication allows for more dynamic applications that are more
responsive to the user.
In Shiny apps, it is recommended that queues be created with the
shinyQueue
function. This will ensure that the queue is
properly destroyed on session end.
Reactive values can not be changed directly from within a future. Queues make it easy to signal the main process to assign a reactive value from within the body of a future.
The application below creates a future every time countdown is clicked, which assigns a value to the reactive value every second, counting down from 10 to 0. If you click the button multiple times, each future will compete to set the value, and the numbers will jump around.
library(shiny)
library(ipc)
library(future)
plan(multisession)
<- fluidPage(
ui
titlePanel("Countdown"),
sidebarLayout(
sidebarPanel(
actionButton('run', 'count down')
),
mainPanel(
tableOutput("result")
)
)
)
<- function(input, output) {
server
<- shinyQueue()
queue $consumer$start(100) # Execute signals every 100 milliseconds
queue
# A reactive value to hold output
<- reactiveVal()
result_val
# Handle button click
observeEvent(input$run,{
future({
for(i in 10:0){
Sys.sleep(1)
<- data.frame(count=i)
result # change value
$producer$fireAssignReactive("result_val",result)
queue
}
})
#Return something other than the future so we don't block the UI
NULL
})
# set output to reactive value
$result <- renderTable({
outputreq(result_val())
})
}
# Run the application
shinyApp(ui = ui, server = server)
AsyncProgress
is a drop in replacement for Shiny’s
Progress
class that allows you to update progress bars
within a future. The example below shows a minimal example of this. Note
how you can click run multiple times and get multiple progress bars.
library(shiny)
library(ipc)
library(future)
library(promises)
plan(multisession)
<- fluidPage(
ui
titlePanel("Countdown"),
sidebarLayout(
sidebarPanel(
actionButton('run', 'Run')
),
mainPanel(
tableOutput("result")
)
)
)
<- function(input, output) {
server
# A reactive value to hold output
<- reactiveVal()
result_val
# Handle button click
observeEvent(input$run,{
result_val(NULL)
# Create a progress bar
<- AsyncProgress$new(message="Complex analysis")
progress future({
for(i in 1:10){
Sys.sleep(1)
$inc(1/10) # Increment progress bar
progress
}$close() # Close the progress bar
progressdata.frame(result="Insightful result")
%...>% result_val # Assign result of future to result_val
})
# Return something other than the future so we don't block the UI
NULL
})
# Set output to reactive value
$result <- renderTable({
outputreq(result_val())
})
}
# Run the application
shinyApp(ui = ui, server = server)
When creating a UI with a process that take a significant amount of
times it is critical to provide the user a mechanism to cancel the
operation. The recommended mechanism to do this is to send a kill
message. The AsyncInterruptor
is an easy to use wrapper
around a Queue
object for this common use case.
library(future)
library(promises)
plan(multisession)
# A long running function. Having a callback (progressMonitor) is a way to
# allow for interrupts to be checked for without adding a dependency
# to the analysis function.
<- function(progressMonitor=function(i) cat(".")){
accessableAnalysisFunction for(i in 1:1000){
Sys.sleep(.1)
progressMonitor(i)
}data.frame(result="Insightful analysis")
}
<- AsyncInterruptor$new()
inter <- future({
fut accessableAnalysisFunction(progressMonitor = function(i) inter$execInterrupts())
})$interrupt("Stop that future")
intercat(try(value(fut)))
## Error : Stop that future
## Error : Stop that future
$destroy() inter
There are times where the Shiny developer does not have access to the
long running code in such a way that execInterrupts
can be
called as the computation progresses. In these cases, the only way to
terminate a running future is to kill it at the OS level.
The function stopMulticoreFuture
kills a future,
provided it is executed in a multicore plan. For mac and linux machines,
plan(multicore) results in a multicore execution plans. In windows it is
not possible to use a multicore execution plan.
The behavior of the execution plan after a kill signal has been sent
is technically undefined, but currently there are no large unintended
consequences of killing child processes. This may change in the future
however, which is why AsyncInterruptor
is strongly
preferred if at all possible.
library(shiny)
library(ipc)
library(future)
library(promises)
plan(multicore) # This will only work with multicore, which is unavailable on Windows
<- function(){
inaccessableAnalysisFunction Sys.sleep(10)
data.frame(result="Insightful analysis")
}
# Define UI for application that draws a histogram
<- fluidPage(
ui
# Application title
titlePanel("Cancelable Async Task"),
# Sidebar with a slider input for number of bins
sidebarLayout(
sidebarPanel(
actionButton('run', 'Run'),
actionButton('cancel', 'Cancel')
),
# Show a plot of the generated distribution
mainPanel(
tableOutput("result")
)
)
)
<- function(input, output) {
server
<- NULL
fut
<- reactiveVal()
result_val <- reactiveVal(FALSE)
running observeEvent(input$run,{
#Don't do anything if in the middle of a run
if(running())
return(NULL)
running(TRUE)
print("Starting Run")
result_val(NULL)
<<- future({
fut <- inaccessableAnalysisFunction()
result
})<- fut %...>% result_val
prom <- catch(fut,
prom function(e){
result_val(NULL)
print(e$message)
showNotification("Task Stopped")
})<- finally(prom, function(){
prom print("Done")
running(FALSE) #declare done with run
})
#Return something other than the future so we don't block the UI
NULL
})
# Kill future
observeEvent(input$cancel,{
#
# Use this method of stopping only if you don't have access to the
# internals of the long running process. If you are able, it is
# recommended to use AsyncInterruptor instead.
#
stopMulticoreFuture(fut)
})
$result <- renderTable({
outputreq(result_val())
})
}
# Run the application
shinyApp(ui = ui, server = server)
Shiny apps are not the only place where it is desirable to monitor
computationally complex tasks. Packages like ergm
and
rstan
use parallel computing when performing MCMC
simulations. ipc
can be used to report back intermediate
results and progress to the user. Here is a function that does parallel
(fake) MCMC, and provides both a progress bar and running trace plots
for the chains.
library(parallel)
mcmcTask <- function(){
on.exit(q$destroy())
nchains <- min(4, detectCores() - 1)
q <- queue()
prog <- 1
pb <- txtProgressBar(min = 0, max = nchains * 10, style=3)
res <- list()
chains <- list()
for(i in 1:nchains){
# Run each chain in parallel
res[[i]] <- mcparallel({
chain <- c()
for(j in 1:10){
chain <- c(chain, rnorm(100))
# Send the current chain to the main process
q$producer$fireEval(chains[[i]] <- chain, list(i=i,chain=chain))
# Update progress
q$producer$fireEval(prog <- prog + 1)
Sys.sleep(runif(1)*5)
}
chain
})
}
# Monitor progress
while(prog < nchains * 10){
q$consumer$consume()
setTxtProgressBar(pb, prog)
par(mfrow=c(2,ceiling(nchains / 2)))
for(j in seq_along(chains))
if(length(chains[[j]]) > 1)
plot(chains[[j]], main=paste("Trace Plot for MCMC chain", j))
Sys.sleep(2)
}
close(pb)
mccollect(res)
}
chains <- mcmcTask()