tatami_r
R bindings to tatami matrices
Loading...
Searching...
No Matches
parallelize.hpp
Go to the documentation of this file.
1#ifndef TATAMI_R_PARALLELIZE_HPP
2#define TATAMI_R_PARALLELIZE_HPP
3
7#ifdef TATAMI_R_PARALLELIZE_UNKNOWN
13#include "sanisizer/sanisizer.hpp"
14
15#include <thread>
16#include <cmath>
17#include <vector>
18#include <string>
19#include <stdexcept>
20#include <algorithm>
21
28namespace tatami_r {
29
33inline manticore::Executor* executor_ptr = NULL;
46inline manticore::Executor& executor() {
47 if (executor_ptr) {
48 return *executor_ptr;
49 } else {
50 // In theory, this should end up resolving to a single instance, even across dynamically linked libraries:
51 // https://stackoverflow.com/questions/52851239/local-static-variable-linkage-in-a-template-class-static-member-function
52 // In practice, this doesn't seem to be the case on a Mac, requiring us to use `set_executor()`.
53 static manticore::Executor mexec;
54 return mexec;
55 }
56}
57
65inline void set_executor(manticore::Executor* ptr) {
66 executor_ptr = ptr;
67}
68
88template<class Function_, class Index_>
89void parallelize(Function_ fun, Index_ ntasks, int nthreads) {
90 if (ntasks == 0) {
91 return;
92 }
93
94 if (nthreads <= 1 || ntasks == 1) {
95 fun(0, 0, ntasks);
96 return;
97 }
98
99 Index_ tasks_per_worker = ntasks / nthreads;
100 int remainder = ntasks % nthreads;
101 if (tasks_per_worker == 0) {
102 tasks_per_worker = 1;
103 remainder = 0;
104 nthreads = ntasks;
105 }
106
107 auto& mexec = executor();
108 mexec.initialize(nthreads, "failed to execute R command");
109
110 std::vector<std::thread> runners;
111 runners.reserve(nthreads);
112 auto errors = sanisizer::create<std::vector<std::exception_ptr> >(nthreads);
113
114 Index_ start = 0;
115 for (int w = 0; w < nthreads; ++w) {
116 Index_ length = tasks_per_worker + (w < remainder);
117
118 runners.emplace_back(
119 [&](int id, Index_ s, Index_ l) -> void {
120 try {
121 fun(id, s, l);
122 } catch (...) {
123 errors[id] = std::current_exception();
124 }
125 mexec.finish_thread();
126 },
127 w,
128 start,
129 length
130 );
131
132 start += length;
133 }
134
135 mexec.listen();
136 for (auto& x : runners) {
137 x.join();
138 }
139
140 for (const auto& err : errors) {
141 if (err) {
142 std::rethrow_exception(err);
143 }
144 }
145}
146
147}
148
152#endif
157#endif
void parallelize(Function_ fun, Index_ tasks, int threads)