tatami_hdf5
tatami bindings for HDF5-backed matrices
Loading...
Searching...
No Matches
write_compressed_sparse_matrix.hpp
Go to the documentation of this file.
1#ifndef TATAMI_WRITE_SPARSE_MATRIX_TO_HDF5_HPP
2#define TATAMI_WRITE_SPARSE_MATRIX_TO_HDF5_HPP
3
4#include "tatami/tatami.hpp"
5#include "utils.hpp"
6
7#include "H5Cpp.h"
8
9#include <cstdint>
10#include <string>
11#include <vector>
12#include <cmath>
13#include <limits>
14
20namespace tatami_hdf5 {
21
29 WriteCompressedSparseMatrixOptions() : data_name("data"), index_name("indices"), ptr_name("indptr") {}
38 std::string data_name;
39
44 std::string index_name;
45
50 std::string ptr_name;
51
56 WriteStorageLayout columnar = WriteStorageLayout::AUTOMATIC;
57
62 WriteStorageType data_type = WriteStorageType::AUTOMATIC;
63
70 bool force_integer = false;
71
76 WriteStorageType index_type = WriteStorageType::AUTOMATIC;
77
82
86 hsize_t chunk_size = sanisizer::cap<hsize_t>(100000);
87
93 int num_threads = 1;
94};
95
99inline H5::DataSet create_1d_compressed_hdf5_dataset(H5::Group& location, const H5::DataType& dtype, const std::string& name, hsize_t length, int deflate_level, hsize_t chunk) {
100 H5::DataSpace dspace(1, &length);
101 H5::DSetCreatPropList plist;
102
103 if (deflate_level >= 0 && length) {
104 plist.setDeflate(deflate_level);
105 if (chunk > length) {
106 plist.setChunk(1, &length);
107 } else {
108 plist.setChunk(1, &chunk);
109 }
110 }
111
112 return location.createDataSet(name, dtype, dspace, plist);
113}
114
115inline H5::DataSet create_1d_compressed_hdf5_dataset(H5::Group& location, WriteStorageType type, const std::string& name, hsize_t length, int deflate_level, hsize_t chunk) {
116 const H5::PredType* dtype;
117 switch (type) {
118 case WriteStorageType::INT8:
119 dtype = &(H5::PredType::NATIVE_INT8);
120 break;
121 case WriteStorageType::UINT8:
122 dtype = &(H5::PredType::NATIVE_UINT8);
123 break;
124 case WriteStorageType::INT16:
125 dtype = &(H5::PredType::NATIVE_INT16);
126 break;
127 case WriteStorageType::UINT16:
128 dtype = &(H5::PredType::NATIVE_UINT16);
129 break;
130 case WriteStorageType::INT32:
131 dtype = &(H5::PredType::NATIVE_INT32);
132 break;
133 case WriteStorageType::UINT32:
134 dtype = &(H5::PredType::NATIVE_UINT32);
135 break;
136 case WriteStorageType::DOUBLE:
137 dtype = &(H5::PredType::NATIVE_DOUBLE);
138 break;
139 default:
140 throw std::runtime_error("automatic HDF5 output type must be resolved before creating a HDF5 dataset");
141 }
142 return create_1d_compressed_hdf5_dataset(location, *dtype, name, length, deflate_level, chunk);
143}
144
145template<typename Left_, typename Right_>
146bool is_less_than_or_equal(Left_ l, Right_ r) {
147 constexpr bool lsigned = std::is_signed<Left_>::value;
148 constexpr bool rsigned = std::is_signed<Right_>::value;
149 if constexpr(lsigned == rsigned) {
150 return l <= r;
151 } else if constexpr(lsigned) {
152 return l <= 0 || static_cast<typename std::make_unsigned<Left_>::type>(l) <= r;
153 } else {
154 return r >= 0 && l <= static_cast<typename std::make_unsigned<Right_>::type>(r);
155 }
156}
157
158template<typename Native_, typename Max_>
159bool fits_upper_limit(Max_ max) {
160 constexpr auto native_max = std::numeric_limits<Native_>::max();
161 if constexpr(std::is_integral<Max_>::value) { // Native_ is already integral, so no need to check that.
162 return is_less_than_or_equal(max, native_max);
163 } else {
164 return max <= static_cast<double>(native_max);
165 }
166}
167
168template<typename Native_, typename Min_>
169bool fits_lower_limit(Min_ min) {
170 constexpr auto native_min = std::numeric_limits<Native_>::min();
171 if constexpr(std::is_integral<Min_>::value) {
172 return is_less_than_or_equal(native_min, min);
173 } else {
174 return min >= static_cast<double>(native_min);
175 }
176}
184template<typename Value_, typename Index_>
185struct WriteSparseHdf5Statistics {
186 Value_ lower_data = 0;
187 Value_ upper_data = 0;
188 Index_ upper_index = 0;
189 hsize_t non_zeros = 0;
190 bool non_integer = false;
191
192 void add_value(Value_ val) {
193 if constexpr(!std::is_integral<Value_>::value) {
194 if (std::trunc(val) != val || !std::isfinite(val)) {
195 non_integer = true;
196 }
197 }
198
199 if (val < lower_data) {
200 lower_data = val;
201 } else if (val > upper_data) {
202 upper_data = val;
203 }
204 }
205
206 void add_index(Index_ idx) {
207 if (idx > upper_index) {
208 upper_index = idx;
209 }
210 }
211};
212
213template<typename Value_, typename Index_>
214void update_hdf5_stats(const tatami::SparseRange<Value_, Index_>& extracted, WriteSparseHdf5Statistics<Value_, Index_>& output, bool infer_value, bool infer_index) {
215 // We need to protect the addition just in case it overflows from having too many non-zero elements.
216 output.non_zeros = sanisizer::sum<hsize_t>(output.non_zeros, extracted.number);
217
218 if (infer_value) {
219 for (Index_ i = 0; i < extracted.number; ++i) {
220 output.add_value(extracted.value[i]);
221 }
222 }
223
224 if (infer_index) {
225 for (Index_ i = 0; i < extracted.number; ++i) {
226 output.add_index(extracted.index[i]);
227 }
228 }
229}
230
231template<typename Value_, typename Index_>
232void update_hdf5_stats(const Value_* extracted, Index_ n, WriteSparseHdf5Statistics<Value_, Index_>& output) {
233 Index_ local_nonzero = 0;
234 for (Index_ i = 0; i < n; ++i) {
235 auto val = extracted[i];
236 if (val == 0) {
237 continue;
238 }
239 ++local_nonzero;
240 output.add_value(val);
241 output.add_index(i);
242 }
243
244 // Checking that there aren't overflows, but doing so outside of the hot loop for perf.
245 output.non_zeros = sanisizer::sum<hsize_t>(output.non_zeros, local_nonzero);
246}
247
248template<typename Value_, typename Index_>
249WriteSparseHdf5Statistics<Value_, Index_> write_sparse_hdf5_statistics(const tatami::Matrix<Value_, Index_>& mat, bool infer_value, bool infer_index, int nthreads) {
250 const auto NR = mat.nrow(), NC = mat.ncol();
251
252 WriteSparseHdf5Statistics<Value_, Index_> output;
253 auto collected = sanisizer::create<std::vector<WriteSparseHdf5Statistics<Value_, Index_> > >(nthreads - 1); // nthreads had better be >= 1.
254 int num_used;
255
256 if (mat.sparse()) {
257 tatami::Options opt;
258 opt.sparse_extract_index = infer_index;
259 opt.sparse_extract_value = infer_value;
260
261 if (mat.prefer_rows()) {
262 num_used = tatami::parallelize([&](int t, Index_ start, Index_ len) -> void {
263 WriteSparseHdf5Statistics<Value_, Index_> current_output;
264
265 auto wrk = tatami::consecutive_extractor<true>(mat, true, start, len, opt);
266 std::vector<Value_> xbuffer(NC);
267 std::vector<Index_> ibuffer(NC);
268 for (Index_ r = start, end = start + len; r < end; ++r) {
269 auto extracted = wrk->fetch(r, xbuffer.data(), ibuffer.data());
270 update_hdf5_stats(extracted, current_output, infer_value, infer_index);
271 }
272
273 // Only move to the result buffer at the end, to avoid false sharing between threads.
274 (t ? collected[t - 1] : output) = std::move(current_output);
275 }, NR, nthreads);
276
277 } else {
278 num_used = tatami::parallelize([&](int t, Index_ start, Index_ len) -> void {
279 WriteSparseHdf5Statistics<Value_, Index_> current_output;
280
281 auto wrk = tatami::consecutive_extractor<true>(mat, false, start, len, opt);
282 std::vector<Value_> xbuffer(NR);
283 std::vector<Index_> ibuffer(NR);
284 for (Index_ c = start, end = start + len; c < end; ++c) {
285 auto extracted = wrk->fetch(c, xbuffer.data(), ibuffer.data());
286 update_hdf5_stats(extracted, current_output, infer_value, infer_index);
287 }
288
289 // Only move to the result buffer at the end, to avoid false sharing between threads.
290 (t ? collected[t - 1] : output) = std::move(current_output);
291 }, NC, nthreads);
292 }
293
294 } else {
295 if (mat.prefer_rows()) {
296 num_used = tatami::parallelize([&](int t, Index_ start, Index_ len) -> void {
297 WriteSparseHdf5Statistics<Value_, Index_> current_output;
298
299 auto wrk = tatami::consecutive_extractor<false>(mat, true, start, len);
300 std::vector<Value_> xbuffer(NC);
301 for (Index_ r = start, end = start + len; r < end; ++r) {
302 auto extracted = wrk->fetch(r, xbuffer.data());
303 update_hdf5_stats(extracted, NC, current_output);
304 }
305
306 // Only move to the result buffer at the end, to avoid false sharing between threads.
307 (t ? collected[t - 1] : output) = std::move(current_output);
308 }, NR, nthreads);
309
310 } else {
311 num_used = tatami::parallelize([&](int t, Index_ start, Index_ len) -> void {
312 WriteSparseHdf5Statistics<Value_, Index_> current_output;
313
314 auto wrk = tatami::consecutive_extractor<false>(mat, false, start, len);
315 std::vector<Value_> xbuffer(NR);
316 for (Index_ c = start, end = start + len; c < end; ++c) {
317 auto extracted = wrk->fetch(c, xbuffer.data());
318 update_hdf5_stats(extracted, NR, current_output);
319 }
320
321 // Only move to the result buffer at the end, to avoid false sharing between threads.
322 (t ? collected[t - 1] : output) = std::move(current_output);
323 }, NC, nthreads);
324 }
325 }
326
327 for (int i = 1; i < num_used; ++i) {
328 auto& current = collected[i - 1];
329 output.lower_data = std::min(output.lower_data, current.lower_data);
330 output.upper_data = std::max(output.upper_data, current.upper_data);
331 output.upper_index = std::max(output.upper_index, current.upper_index);
332 output.non_zeros = sanisizer::sum<hsize_t>(output.non_zeros, current.non_zeros);
333 output.non_integer = output.non_integer || current.non_integer;
334 }
335
336 return output;
337}
355template<typename Value_, typename Index_>
357 auto data_type = params.data_type;
358 auto index_type = params.index_type;
359 auto use_auto_data_type = (data_type == WriteStorageType::AUTOMATIC);
360 auto use_auto_index_type = (index_type == WriteStorageType::AUTOMATIC);
361 auto stats = write_sparse_hdf5_statistics(mat, use_auto_data_type, use_auto_index_type, params.num_threads);
362
363 // Choosing the types.
364 if (use_auto_data_type) {
365 if (stats.non_integer && !params.force_integer) {
366 data_type = WriteStorageType::DOUBLE;
367 } else {
368 auto lower_data = stats.lower_data;
369 auto upper_data = stats.upper_data;
370 if (lower_data < 0) {
371 if (fits_lower_limit<std::int8_t>(lower_data) && fits_upper_limit<std::int8_t>(upper_data)) {
372 data_type = WriteStorageType::INT8;
373 } else if (fits_lower_limit<std::int16_t>(lower_data) && fits_upper_limit<std::int16_t>(upper_data)) {
374 data_type = WriteStorageType::INT16;
375 } else {
376 data_type = WriteStorageType::INT32;
377 }
378 } else {
379 if (fits_upper_limit<std::uint8_t>(upper_data)) {
380 data_type = WriteStorageType::UINT8;
381 } else if (fits_upper_limit<std::uint16_t>(upper_data)) {
382 data_type = WriteStorageType::UINT16;
383 } else {
384 data_type = WriteStorageType::UINT32;
385 }
386 }
387 }
388 }
389
390 if (use_auto_index_type) {
391 auto upper_index = stats.upper_index;
392 if (fits_upper_limit<std::uint8_t>(upper_index)) {
393 index_type = WriteStorageType::UINT8;
394 } else if (fits_upper_limit<std::uint16_t>(upper_index)) {
395 index_type = WriteStorageType::UINT16;
396 } else {
397 index_type = WriteStorageType::UINT32;
398 }
399 }
400
401 // Choosing the layout.
402 auto layout = params.columnar;
403 if (layout == WriteStorageLayout::AUTOMATIC) {
404 if (mat.prefer_rows()) {
405 layout = WriteStorageLayout::ROW;
406 } else {
407 layout = WriteStorageLayout::COLUMN;
408 }
409 }
410
411 // And then saving it. This time we have no choice but to iterate by the desired dimension.
412 auto non_zeros = stats.non_zeros;
413 H5::DataSet data_ds = create_1d_compressed_hdf5_dataset(location, data_type, params.data_name, non_zeros, params.deflate_level, params.chunk_size);
414 H5::DataSet index_ds = create_1d_compressed_hdf5_dataset(location, index_type, params.index_name, non_zeros, params.deflate_level, params.chunk_size);
415 hsize_t offset = 0;
416 H5::DataSpace inspace(1, &non_zeros);
417 H5::DataSpace outspace(1, &non_zeros);
418 const auto& dstype = define_mem_type<Value_>();
419 const auto& ixtype = define_mem_type<Index_>();
420
421 Index_ NR = mat.nrow(), NC = mat.ncol();
422 std::vector<hsize_t> ptrs;
423
424 auto fill_datasets = [&](const Value_* vptr, const Index_* iptr, hsize_t count) -> void {
425 if (count) {
426 inspace.setExtentSimple(1, &count);
427 outspace.selectHyperslab(H5S_SELECT_SET, &count, &offset);
428 data_ds.write(vptr, dstype, inspace, outspace);
429 index_ds.write(iptr, ixtype, inspace, outspace);
430 offset += count;
431 }
432 };
433
434 if (mat.sparse()) {
435 if (layout == WriteStorageLayout::ROW) {
436 ptrs.resize(sanisizer::sum<decltype(ptrs.size())>(NR, 1));
439
440 auto wrk = tatami::consecutive_extractor<true>(mat, true, static_cast<Index_>(0), NR);
441 for (Index_ r = 0; r < NR; ++r) {
442 auto extracted = wrk->fetch(r, xbuffer.data(), ibuffer.data());
443 fill_datasets(extracted.value, extracted.index, extracted.number);
444 ptrs[r+1] = ptrs[r] + extracted.number;
445 }
446
447 } else {
448 ptrs.resize(sanisizer::sum<decltype(ptrs.size())>(NC, 1));
451
452 auto wrk = tatami::consecutive_extractor<true>(mat, false, static_cast<Index_>(0), NC);
453 for (Index_ c = 0; c < NC; ++c) {
454 auto extracted = wrk->fetch(c, xbuffer.data(), ibuffer.data());
455 fill_datasets(extracted.value, extracted.index, extracted.number);
456 ptrs[c+1] = ptrs[c] + extracted.number;
457 }
458 }
459
460 } else {
461 std::vector<Value_> sparse_xbuffer;
462 std::vector<Index_> sparse_ibuffer;
463 auto fill_datasets_from_dense = [&](const Value_* extracted, Index_ n) -> hsize_t {
464 sparse_xbuffer.clear();
465 sparse_ibuffer.clear();
466 for (Index_ i = 0; i < n; ++i) {
467 if (extracted[i]) {
468 sparse_xbuffer.push_back(extracted[i]);
469 sparse_ibuffer.push_back(i);
470 }
471 }
472
473 hsize_t count = sparse_xbuffer.size();
474 fill_datasets(sparse_xbuffer.data(), sparse_ibuffer.data(), count);
475 return count;
476 };
477
478 if (layout == WriteStorageLayout::ROW) {
479 ptrs.resize(sanisizer::sum<decltype(ptrs.size())>(NR, 1));
481 auto wrk = tatami::consecutive_extractor<false>(mat, true, static_cast<Index_>(0), NR);
482 for (Index_ r = 0; r < NR; ++r) {
483 auto extracted = wrk->fetch(r, dbuffer.data());
484 auto count = fill_datasets_from_dense(extracted, NC);
485 ptrs[r+1] = ptrs[r] + count;
486 }
487
488 } else {
489 ptrs.resize(sanisizer::sum<decltype(ptrs.size())>(NC, 1));
491 auto wrk = tatami::consecutive_extractor<false>(mat, false, static_cast<Index_>(0), NC);
492 for (Index_ c = 0; c < NC; ++c) {
493 auto extracted = wrk->fetch(c, dbuffer.data());
494 auto count = fill_datasets_from_dense(extracted, NR);
495 ptrs[c+1] = ptrs[c] + count;
496 }
497 }
498 }
499
500 // Saving the pointers.
501 auto ptr_len = sanisizer::cast<hsize_t>(ptrs.size());
502 H5::DataSet ptr_ds = create_1d_compressed_hdf5_dataset(location, H5::PredType::NATIVE_HSIZE, params.ptr_name, ptr_len, params.deflate_level, params.chunk_size);
503 H5::DataSpace ptr_space(1, &ptr_len);
504 ptr_ds.write(ptrs.data(), H5::PredType::NATIVE_HSIZE, ptr_space);
505
506 return;
507}
508
518template<typename Value_, typename Index_>
521 write_compressed_sparse_matrix(mat, location, params);
522 return;
523}
524
528template<typename Value_, typename Index_>
529void write_compressed_sparse_matrix(const tatami::Matrix<Value_, Index_>* mat, H5::Group& location, const WriteCompressedSparseMatrixOptions& params) {
530 return write_compressed_sparse_matrix(*mat, location, params);
531}
532
533template<typename Value_, typename Index_>
534void write_compressed_sparse_matrix(const tatami::Matrix<Value_, Index_>* mat, H5::Group& location) {
535 return write_compressed_sparse_matrix(*mat, location);
536}
541}
542
543#endif
virtual Index_ ncol() const=0
virtual Index_ nrow() const=0
virtual bool prefer_rows() const=0
virtual std::unique_ptr< MyopicSparseExtractor< Value_, Index_ > > sparse(bool row, const Options &opt) const=0
Representations for matrix data in HDF5 files.
Definition CompressedSparseMatrix.hpp:24
WriteStorageLayout
Definition utils.hpp:24
WriteStorageType
Definition utils.hpp:29
void write_compressed_sparse_matrix(const tatami::Matrix< Value_, Index_ > &mat, H5::Group &location, const WriteCompressedSparseMatrixOptions &params)
Definition write_compressed_sparse_matrix.hpp:356
int parallelize(Function_ fun, const Index_ tasks, const int workers)
Container_ create_container_of_Index_size(const Index_ x, Args_ &&... args)
auto consecutive_extractor(const Matrix< Value_, Index_ > &matrix, const bool row, const Index_ iter_start, const Index_ iter_length, Args_ &&... args)
bool sparse_extract_index
bool sparse_extract_value
const Value_ * value
const Index_ * index
Parameters for write_compressed_sparse_matrix().
Definition write_compressed_sparse_matrix.hpp:25
std::string ptr_name
Definition write_compressed_sparse_matrix.hpp:50
WriteStorageType index_type
Definition write_compressed_sparse_matrix.hpp:76
bool force_integer
Definition write_compressed_sparse_matrix.hpp:70
int deflate_level
Definition write_compressed_sparse_matrix.hpp:81
std::string data_name
Definition write_compressed_sparse_matrix.hpp:38
std::string index_name
Definition write_compressed_sparse_matrix.hpp:44
WriteStorageType data_type
Definition write_compressed_sparse_matrix.hpp:62
hsize_t chunk_size
Definition write_compressed_sparse_matrix.hpp:86
int num_threads
Definition write_compressed_sparse_matrix.hpp:93
WriteStorageLayout columnar
Definition write_compressed_sparse_matrix.hpp:56
Utilities for HDF5 extraction.