1#ifndef TATAMI_R_PARALLELIZE_HPP
2#define TATAMI_R_PARALLELIZE_HPP
7#ifdef TATAMI_R_PARALLELIZE_UNKNOWN
13#include "sanisizer/sanisizer.hpp"
88template<
class Function_,
class Index_>
89void parallelize(Function_ fun, Index_ ntasks,
int nthreads) {
94 if (nthreads <= 1 || ntasks == 1) {
99 Index_ tasks_per_worker = ntasks / nthreads;
100 int remainder = ntasks % nthreads;
101 if (tasks_per_worker == 0) {
102 tasks_per_worker = 1;
107 auto& mexec = executor();
108 mexec.initialize(nthreads,
"failed to execute R command");
110 std::vector<std::thread> runners;
111 runners.reserve(nthreads);
112 auto errors = sanisizer::create<std::vector<std::exception_ptr> >(nthreads);
115 for (
int w = 0; w < nthreads; ++w) {
116 Index_ length = tasks_per_worker + (w < remainder);
118 runners.emplace_back(
119 [&](
int id, Index_ s, Index_ l) ->
void {
123 errors[id] = std::current_exception();
125 mexec.finish_thread();
136 for (
auto& x : runners) {
140 for (
const auto& err : errors) {
142 std::rethrow_exception(err);
void parallelize(Function_ fun, Index_ tasks, int threads)