Parallel RNG usage

Ralf Stubner

2024-05-28

When you want to use random number generators (RNG) for parallel computations, you need to make sure that the sequences of random numbers used by the different processes do not overlap. There are two main approaches to this problem:1

The RNGs included in dqrng offer at least one of these methods for parallel RNG usage. When using the R or C++ interface independent streams can be accessed using the two argument dqset.seed(seed, stream) or dqset_seed(seed, stream) functions.

Threefry: usage from R

The Threefry engine uses internally a counter with \(2^{256}\) possible states, which can be split into different substreams. When used from R or C++ with the two argument dqset.seed or dqset_seed this counter space is split into \(2^{64}\) streams with \(2^{192}\) possible states each. This is equivalent to \(2^{64}\) streams with a period of \(2^{194}\) each.

In the following example a matrix with random numbers is generated in parallel using the parallel package. The resulting correlation matrix should be close to the identity matrix if the different streams are independent:

library(parallel)
cl <- parallel::makeCluster(2)
res <- clusterApply(cl, 1:8, function(stream, seed, N) {
  library(dqrng)
  dqRNGkind("Threefry")
  dqset.seed(seed, stream)
  dqrnorm(N)
}, 42, 1e6)
stopCluster(cl)

res <- matrix(unlist(res), ncol = 8)
symnum(x = cor(res), cutpoints = c(0.001, 0.003, 0.999),
       symbols = c(" ", "?", "!", "1"),
       abbr.colnames = FALSE, corr = TRUE)

Correlation matrix:

[1,] 1              
[2,]   1            
[3,]   ? 1          
[4,]   ? ? 1        
[5,] ?     ? 1      
[6,]     ?     1    
[7,]     ?       1  
[8,]         ?     1
attr(,"legend")
[1] 0 ‘ ’ 0.001 ‘?’ 0.003 ‘!’ 0.999 ‘1’ 1

As expected the correlation matrix for the different columns is almost equal to the identity matrix.

Xo(ro)shiro: jump ahead with OpenMP

The Xoshiro256+/++/** generators has a period of \(2^{256} -1\) and offers \(2^{128}\) sub-sequences that are \(2^{128}\) random draws apart as well as \(2^{64}\) streams that are \(2^{192}\) random draws apart. The Xoroshiro128+/++/** generators has a period of \(2^{128} -1\) and offers \(2^{64}\) sub-sequences that are \(2^{64}\) random draws apart as well as \(2^{32}\) streams that are \(2^{98}\) random draws apart. You can go from one sub-sequence to the next using the jump() or long_jump() method and the convenience wrapper jump(int n) or long_jump(int n), which advances to the nth sub-sequence. When used from R or C++ with the two argument dqset.seed and dqset_seed you get \(2^{64}\) streams that are \(2^{192}\) and \(2^{64}\) random draws apart for Xoshiro256+/++/** and Xoroshiro128+/++/**, respectively.

As an example using C++ we draw and sum a large number of uniformly distributed numbers. This is done several times sequentially as well as using OpenMP for parallelisation. Care has been taken to keep the global RNG rng usable outside of the parallel block.

#include <Rcpp.h>
// [[Rcpp::depends(dqrng, BH)]]
#include <dqrng_distribution.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>

// [[Rcpp::export]]
std::vector<double> random_sum(int n, int m) {
  dqrng::uniform_distribution dist(0.0, 1.0);               // Uniform distribution [0,1)
  auto rng = dqrng::generator<dqrng::xoshiro256plusplus>(); // seeded from R's RNG
  std::vector<double> res(m);
  for (int i = 0; i < m; ++i) {
    double lres(0);
    for (int j = 0; j < n; ++j) {
      lres += dist(*rng);
    }
    res[i] = lres / n;
  }
  return res;
}

// [[Rcpp::export]]
std::vector<double> parallel_random_sum(int n, int m, int ncores) {
  dqrng::uniform_distribution dist(0.0, 1.0);               // Uniform distribution [0,1)
  auto rng = dqrng::generator<dqrng::xoshiro256plusplus>(); // seeded from R's RNG
  std::vector<double> res(m);
  // ok to use rng here
  
  #pragma omp parallel num_threads(ncores)
  {
    // make thread local copy of rng and advance it by 1 ... ncores jumps
    auto lrng = rng->clone(omp_get_thread_num() + 1);

    #pragma omp for
    for (int i = 0; i < m; ++i) {
      double lres(0);
      for (int j = 0; j < n; ++j) {
        lres += dist(*lrng);
      }
      res[i] = lres / n;
    }
  }
  // ok to use rng here
  return res;
}

/*** R
bm <- bench::mark(
  parallel_random_sum(1e7, 8, 4),
  random_sum(1e7, 8),
  check = FALSE
)
bm[,1:4]
*/

Result:

  expression                            min   median `itr/sec`
  <bch:expr>                       <bch:tm> <bch:tm>     <dbl>
1 parallel_random_sum(1e+07, 8, 4)   98.3ms     99ms     10.1 
2 random_sum(1e+07, 8)              270.2ms    271ms      3.68

Note that the thread local RNG uses std::unique_ptr instead of Rcpp::Xptr since it is used in parallel code.

PCG: multiple streams with RcppParallel

From the PCG family we will look at pcg64, a 64-bit generator with a period of \(2^{128}\). It offers the function advance(int n), which is equivalent to n random draws but scales as \(O(ln(n))\) instead of \(O(n)\). In addition, it offers \(2^{127}\) separate streams that can be enabled via the function set_stream(int n) or the two argument constructor with seed and stream. When used from R or C++ with the two argument dqset.seed and dqset_seed you get \(2^{64}\) streams out of the possible \(2^{127}\) separate streams.

In the following example a matrix with random numbers is generated in parallel using RcppParallel. Instead of using the more traditional approach of generating the random numbers from a certain distribution, we are using the fast sampling methods from dqrng_sample.h. The resulting correlation matrix should be close to the identity matrix if the different streams are independent:

#include <Rcpp.h>
// [[Rcpp::depends(dqrng, BH)]]
#include <dqrng_generator.h>
#include <dqrng_sample.h>
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>

struct RandomFill : public RcppParallel::Worker {
  RcppParallel::RMatrix<int> output;
  uint64_t seed;

  RandomFill(Rcpp::IntegerMatrix output, const uint64_t seed) : output(output), seed(seed) {};

  void operator()(std::size_t begin, std::size_t end) {
    auto rng = dqrng::generator<pcg64>(seed, end);
    for (std::size_t col = begin; col < end; ++col) {
      auto sampled = dqrng::sample::sample<std::vector<int>, uint32_t>(*rng, 100000, output.nrow(), true);
      RcppParallel::RMatrix<int>::Column column = output.column(col);
      std::copy(sampled.begin(), sampled.end(), column.begin());
    }
  }
};

// [[Rcpp::export]]
Rcpp::IntegerMatrix parallel_random_matrix(const int n, const int m, const int ncores) {
  Rcpp::IntegerMatrix res(n, m);
  RandomFill randomFill(res, 42);
  RcppParallel::parallelFor(0, m, randomFill, m/ncores + 1);
  return res;
}

/*** R
res <- parallel_random_matrix(1e6, 8, 4)
head(res)
symnum(x = cor(res), cutpoints = c(0.001, 0.003, 0.999),
       symbols = c(" ", "?", "!", "1"),
       abbr.colnames = FALSE, corr = TRUE)
*/

Head of the random matrix:

      [,1]  [,2]  [,3]  [,4]  [,5]  [,6]  [,7]  [,8]
[1,] 67984 16279 69262  7126 21441 37720 51107 51045
[2,] 69310 21713 82885 81157 54051  5261 91165 17833
[3,] 76742 31232 78953  4626 94939 29416 85652 78296
[4,] 76349 47427  1770 37957 33888 59134 94591 65793
[5,] 85008 89224 43493  7925 60866  2464 14080 10763
[6,] 38017 88509 51195 73086  1883 68193 75259 62216

Correlation matrix:

[1,] 1              
[2,]   1            
[3,]   ? 1          
[4,]     ? 1        
[5,]         1      
[6,] ? ?     ? 1    
[7,]     ?       1  
[8,]     ?         1
attr(,"legend")
[1] 0 ‘ ’ 0.001 ‘?’ 0.003 ‘!’ 0.999 ‘1’ 1

So as expected the correlation matrix is almost equal to the identity matrix.

Using the global RNG

So far we have used our own RNG and either seeded it from R’s RNG or with an explicit seed. As an alternative, we can also make use of dqrng’s global RNG. This is exemplified in the template function parallel_generate<>() defined in the header file dqrng_extra/parallel_generate.h. A simple way to use this template would be the following functions which generate random variates according to the uniform, normal, or exponential distribution:

#include <Rcpp.h>
// [[Rcpp::depends(dqrng, BH, RcppParallel)]]
// [[Rcpp::plugins(openmp)]]
#include <dqrng_extra/parallel_generate.h>
#include <dqrng_distribution.h>
using dqrng::extra::parallel_generate;

// [[Rcpp::export]]
Rcpp::NumericVector runif_para(std::size_t n, double min = 0, double max = 1,
                               std::size_t threads = 2, std::size_t streams = 24) {
  return parallel_generate<dqrng::uniform_distribution>(n, threads, streams, min, max);
}

// [[Rcpp::export]]
Rcpp::NumericVector rnorm_para(std::size_t n, double mean = 0, double sd = 1,
                               std::size_t threads = 2, std::size_t streams = 24) {
  return parallel_generate<dqrng::normal_distribution>(n, threads, streams, mean, sd);
}

// [[Rcpp::export]]
Rcpp::NumericVector rexp_para(std::size_t n, double rate = 1,
                              std::size_t threads = 2, std::size_t streams = 24) {
  return parallel_generate<dqrng::exponential_distribution>(n, threads, streams, rate);
}

Generating n numbers is split into (about) equal streams streams that are processed by threads threads. For an efficient distribution of the workload it is important that streams is a multiple of threads, since then every thread gets to process the same number of streams. The default streams = 24 can be used together with 1, 2, 3, 4, 6, 8, 12, and 24 threads. The result for a given number of streams is independent of the number of threads:

dqset.seed(42); norm1 <- rnorm_para(22, threads = 1)
dqset.seed(42); norm2 <- rnorm_para(22, threads = 4)
identical(norm1, norm2)
#> [1] TRUE

At the same time, the serial version is almost as fast as using the normal dqrng::dqr<dist> function and therefore much faster than the stats::r<dist> function. Using multiple threads gives a clear speed up.

n <- 1e6
bench::mark(stats::runif(n),
            dqrng::dqrunif(n),
            runif_para(n, threads = 2L),
            runif_para(n, threads = 1L),
            check = FALSE)[, 1:6]
bench::mark(stats::rnorm(n),
            dqrng::dqrnorm(n),
            rnorm_para(n, threads = 2L),
            rnorm_para(n, threads = 1L),
            check = FALSE)[, 1:6]
bench::mark(stats::rexp(n),
            dqrng::dqrexp(n),
            rexp_para(n, threads = 2L),
            rexp_para(n, threads = 1L),
            check = FALSE)[, 1:6]

Here the actual implementation of the template function:

// Copyright 2024 Ralf Stubner
// Copyright 2024 Philippe Grosjean
//
// This file is part of dqrng.
//
// dqrng is free software: you can redistribute it and/or modify it
// under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// dqrng is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with dqrng.  If not, see <http://www.gnu.org/licenses/>.

#include <sstream>
#include <dqrng.h>
#include <RcppParallel/RVector.h>
#include <omp.h>

namespace dqrng {
namespace extra {
template<typename Dist, typename... Params>
Rcpp::NumericVector parallel_generate(std::size_t n,
                                      std::size_t threads,
                                      std::size_t streams,
                                      Params&&... params) {
  if (n < streams)
    streams = n;
  std::size_t stream_size = n / streams;
  std::size_t remainder = n % streams;

  // use RcppParallel::RVector as thread safe accessor
  Rcpp::NumericVector res(Rcpp::no_init(n));
  RcppParallel::RVector<double> work(res);

  // use global RNG from dqrng
  dqrng::random_64bit_accessor rng{};
  std::stringstream buffer;

#ifdef _OPENMP
  std::size_t maxthreads = omp_get_num_procs();
  if (threads > maxthreads)
    threads = maxthreads;
  // No need for more threads than there are streams
  if (threads > streams)
    threads = streams;
#endif

#pragma omp parallel num_threads(threads)
{
  std::size_t start,end;

#pragma omp for schedule(static,1)
  for (std::size_t i = 0; i < streams; ++i) {
    if (i < remainder) {
      start = i * stream_size + i;
      end = start + stream_size + 1;
    } else {
      start = i * stream_size + remainder;
      end = start + stream_size;
    }
    // private RNG in each stream; RNG with i == 0 is identical to global RNG
    auto prng = rng.clone(i);
    prng->generate<Dist>(std::begin(work) + start, std::begin(work) + end,
                         std::forward<Params>(params)...);
    if (i == 0) {// Save the state of the global RNG's clone
      buffer << *prng;
    }
  }
}
  // Make sure that the global RNG advances as well by applying the state
  // of the global RNG's clone to the global RNG
  buffer >> rng;
  return res;
}
} // namespace extra
} // namespace dqrng

Besides the distribution of the work into (about) equal streams, the pattern for RNG access is similar to the OpenMP example above with the important difference that the local RNGs are not thread- but stream-specific. This way, the result becomes independent of the used amount of parallelism. However, one has to consider one important aspect: After the parallel for loop, the global RNG has not advanced at all. Calling the function repeatedly would lead to identical results. To circumvent this, one of the stream specific RNGs is an exact clone of the global RNG (clone(stream=0)) and the state of this RNG after processing its stream is saved and used to overwrite the global RNG’s state.2 This way, the global RNG’s state advances as if it had processed one of the streams and successive calls to parallel_generate() produce different random numbers as expected.

dqset.seed(153)
runif_para(30)
#>   [1] 0.87693642 0.14323366 0.33129746 0.07856319 0.80991119 0.37524485
#>  [7] 0.90387542 0.38746776 0.30473153 0.01102334 0.21272306 0.11975609
#> [13] 0.98440547 0.13373340 0.82823735 0.87196225 0.14920422 0.27723804
#> [19] 0.59308120 0.07853078 0.63040483 0.21707435 0.25876379 0.81296194
#> [25] 0.53645030 0.29976254 0.37159454 0.38683266 0.03737063 0.03359113
runif_para(30) # Different values, as expected
#>  [1] 0.90407135 0.73543499 0.09026296 0.90321975 0.66162669 0.51716146
#>  [7] 0.74186366 0.41125413 0.17581202 0.68547734 0.11766549 0.82316789
#> [13] 0.40565668 0.44854610 0.95477820 0.64388593 0.31991691 0.02239872
#> [19] 0.13687388 0.32476719 0.67093851 0.05564081 0.76817620 0.49502455
#> [25] 0.07459706 0.25493312 0.14019729 0.89704659 0.40548199 0.53800443

Pitfall: should you use parallel_generate() in concurrent threads or streams, make sure to seed each of them with enough separate streams to avoid overlap. For instance, with parallel_generate(..., stream = 4), you could seed first thread with something like dqset.seed(546, 1), but you must seed second thread at least on stream 5 (previous stream plus the number of streams you reserve for parallel_generate()). If you use dqset.seed(546, 2) on the second thread, there will be an overlap like this:

# Seed used in the first thread
dqset.seed(546, 1); (v1 <- rnorm_para(8, streams = 4))
#> [1]  0.01904358  0.57750157  0.39156879 -1.72594164  1.24949453 -0.87535133
#> [7] -0.49878776  0.26077249
# Seed used in the second thread
dqset.seed(546, 2); (v2 <- rnorm_para(8, streams = 4))
#> [1]  0.3915688 -1.7259416  1.2494945 -0.8753513 -0.4987878  0.2607725
#> [7]  1.2018189 -0.1060487

Note how values 1-6 of v2 are identical to values 3-8 of v1 because of an overlap of the streams consumed in each call to parallel_generate(). You will get the same result if the two lines of code were run in separate threads or even, separate R processes. Here, if second dqset.seed() is shifted by 4 streams or more from first one, then there is no overlap any more:

# Seed used in the first thread
dqset.seed(546, 1); (v1 <- rnorm_para(8, streams = 4))
#> [1]  0.01904358  0.57750157  0.39156879 -1.72594164  1.24949453 -0.87535133
#> [7] -0.49878776  0.26077249
# Seed used in the second thread
dqset.seed(546, 5); (v2 <- rnorm_para(8, streams = 4))
#> [1]  1.2018189 -0.1060487 -0.8532641  0.6531933 -0.8304053 -0.4745548
#> [7] -0.4211618 -0.5871540

  1. See for example https://www.pcg-random.org/posts/critiquing-pcg-streams.html.↩︎

  2. Note that here in contrast to the default PCG implementation, streams are counted from the current stream, i.e. setting stream to 0 will give the same RNG. This brings PCG in line with the other RNGs.↩︎