tatami_r
R bindings to tatami matrices
Loading...
Searching...
No Matches
parallelize.hpp
Go to the documentation of this file.
1#ifndef TATAMI_R_PARALLELIZE_HPP
2#define TATAMI_R_PARALLELIZE_HPP
3
7#ifdef TATAMI_R_PARALLELIZE_UNKNOWN
13#include <thread>
14#include <cmath>
15#include <vector>
16#include <string>
17#include <stdexcept>
18#include <algorithm>
19
26namespace tatami_r {
27
35 // This should end up resolving to a single instance, even across dynamically linked libraries:
36 // https://stackoverflow.com/questions/52851239/local-static-variable-linkage-in-a-template-class-static-member-function
37 static manticore::Executor mexec;
38 return mexec;
39}
40
60template<class Function_, class Index_>
61void parallelize(Function_ fun, Index_ ntasks, int nthreads) {
62 if (ntasks == 0) {
63 return;
64 }
65
66 if (nthreads <= 1 || ntasks == 1) {
67 fun(0, 0, ntasks);
68 return;
69 }
70
71 Index_ tasks_per_worker = ntasks / nthreads;
72 int remainder = ntasks % nthreads;
73 if (tasks_per_worker == 0) {
74 tasks_per_worker = 1;
75 remainder = 0;
76 nthreads = ntasks;
77 }
78
79 auto& mexec = executor();
80 mexec.initialize(nthreads, "failed to execute R command");
81
82 std::vector<std::thread> runners;
83 runners.reserve(nthreads);
84 std::vector<std::exception_ptr> errors(nthreads);
85
86 Index_ start = 0;
87 for (int w = 0; w < nthreads; ++w) {
88 Index_ length = tasks_per_worker + (w < remainder);
89
90 runners.emplace_back([&](int id, Index_ s, Index_ l) {
91 try {
92 fun(id, s, l);
93 } catch (...) {
94 errors[id] = std::current_exception();
95 }
96 mexec.finish_thread();
97 }, w, start, length);
98
99 start += length;
100 }
101
102 mexec.listen();
103 for (auto& x : runners) {
104 x.join();
105 }
106
107 for (const auto& err : errors) {
108 if (err) {
109 std::rethrow_exception(err);
110 }
111 }
112}
113
114}
115
119#endif
124#endif
tatami bindings for arbitrary R matrices.
void parallelize(Function_ fun, Index_ ntasks, int nthreads)
Definition parallelize.hpp:61
manticore::Executor & executor()
Definition parallelize.hpp:34