tatami_hdf5
tatami bindings for HDF5-backed matrices
Loading...
Searching...
No Matches
write_compressed_sparse_matrix.hpp
Go to the documentation of this file.
1#ifndef TATAMI_WRITE_SPARSE_MATRIX_TO_HDF5_HPP
2#define TATAMI_WRITE_SPARSE_MATRIX_TO_HDF5_HPP
3
4#include "tatami/tatami.hpp"
5#include "utils.hpp"
6
7#include "H5Cpp.h"
8
9#include <cstdint>
10#include <string>
11#include <vector>
12#include <cmath>
13#include <limits>
14
20namespace tatami_hdf5 {
21
29 WriteCompressedSparseMatrixOptions() : data_name("data"), index_name("indices"), ptr_name("indptr") {}
38 std::string data_name;
39
44 std::string index_name;
45
50 std::string ptr_name;
51
56 WriteStorageLayout columnar = WriteStorageLayout::AUTOMATIC;
57
62 WriteStorageType data_type = WriteStorageType::AUTOMATIC;
63
70 bool force_integer = false;
71
76 WriteStorageType index_type = WriteStorageType::AUTOMATIC;
77
82
86 size_t chunk_size = 100000;
87
93 int num_threads = 1;
94};
95
99inline H5::DataSet create_1d_compressed_hdf5_dataset(H5::Group& location, const H5::DataType& dtype, const std::string& name, hsize_t length, int deflate_level, hsize_t chunk) {
100 H5::DataSpace dspace(1, &length);
101 H5::DSetCreatPropList plist;
102
103 if (deflate_level >= 0 && length) {
104 plist.setDeflate(deflate_level);
105 if (chunk > length) {
106 plist.setChunk(1, &length);
107 } else {
108 plist.setChunk(1, &chunk);
109 }
110 }
111
112 return location.createDataSet(name, dtype, dspace, plist);
113}
114
115inline H5::DataSet create_1d_compressed_hdf5_dataset(H5::Group& location, WriteStorageType type, const std::string& name, hsize_t length, int deflate_level, hsize_t chunk) {
116 const H5::PredType* dtype;
117 switch (type) {
118 case WriteStorageType::INT8:
119 dtype = &(H5::PredType::NATIVE_INT8);
120 break;
121 case WriteStorageType::UINT8:
122 dtype = &(H5::PredType::NATIVE_UINT8);
123 break;
124 case WriteStorageType::INT16:
125 dtype = &(H5::PredType::NATIVE_INT16);
126 break;
127 case WriteStorageType::UINT16:
128 dtype = &(H5::PredType::NATIVE_UINT16);
129 break;
130 case WriteStorageType::INT32:
131 dtype = &(H5::PredType::NATIVE_INT32);
132 break;
133 case WriteStorageType::UINT32:
134 dtype = &(H5::PredType::NATIVE_UINT32);
135 break;
136 case WriteStorageType::DOUBLE:
137 dtype = &(H5::PredType::NATIVE_DOUBLE);
138 break;
139 default:
140 throw std::runtime_error("automatic HDF5 output type must be resolved before creating a HDF5 dataset");
141 }
142 return create_1d_compressed_hdf5_dataset(location, *dtype, name, length, deflate_level, chunk);
143}
144
145template<typename Native>
146bool fits_upper_limit(int64_t max) {
147 int64_t limit = std::numeric_limits<Native>::max();
148 return limit >= max;
149}
150
151template<typename Native>
152bool fits_lower_limit(int64_t min) {
153 int64_t limit = std::numeric_limits<Native>::min();
154 return limit <= min;
155}
163template<typename Value_, typename Index_>
164struct WriteSparseHdf5Statistics {
165 Value_ lower_data = 0;
166 Value_ upper_data = 0;
167 Index_ upper_index = 0;
168 hsize_t non_zeros = 0;
169 bool non_integer = false;
170
171 void add_value(Value_ val) {
172 if constexpr(!std::is_integral<Value_>::value) {
173 if (std::trunc(val) != val || !std::isfinite(val)) {
174 non_integer = true;
175 }
176 }
177
178 if (val < lower_data) {
179 lower_data = val;
180 } else if (val > upper_data) {
181 upper_data = val;
182 }
183 }
184
185 void add_index(Index_ idx) {
186 if (idx > upper_index) {
187 upper_index = idx;
188 }
189 }
190};
191
192template<typename Value_, typename Index_>
193void update_hdf5_stats(const tatami::SparseRange<Value_, Index_>& extracted, WriteSparseHdf5Statistics<Value_, Index_>& output, bool infer_value, bool infer_index) {
194 output.non_zeros += extracted.number;
195
196 if (infer_value) {
197 for (Index_ i = 0; i < extracted.number; ++i) {
198 output.add_value(extracted.value[i]);
199 }
200 }
201
202 if (infer_index) {
203 for (Index_ i = 0; i < extracted.number; ++i) {
204 output.add_index(extracted.index[i]);
205 }
206 }
207}
208
209template<typename Value_, typename Index_>
210void update_hdf5_stats(const Value_* extracted, Index_ n, WriteSparseHdf5Statistics<Value_, Index_>& output) {
211 for (Index_ i = 0; i < n; ++i) {
212 auto val = extracted[i];
213 if (val == 0) {
214 continue;
215 }
216 ++output.non_zeros;
217 output.add_value(val);
218 output.add_index(i);
219 }
220}
221
222template<typename Value_, typename Index_>
223WriteSparseHdf5Statistics<Value_, Index_> write_sparse_hdf5_statistics(const tatami::Matrix<Value_, Index_>* mat, bool infer_value, bool infer_index, int nthreads) {
224 auto NR = mat->nrow(), NC = mat->ncol();
225 std::vector<WriteSparseHdf5Statistics<Value_, Index_> > collected(nthreads);
226
227 if (mat->sparse()) {
228 tatami::Options opt;
229 opt.sparse_extract_index = infer_index;
230 opt.sparse_extract_value = infer_value;
231
232 if (mat->prefer_rows()) {
233 tatami::parallelize([&](size_t t, Index_ start, Index_ len) -> void {
234 auto wrk = tatami::consecutive_extractor<true>(mat, true, start, len, opt);
235 std::vector<Value_> xbuffer(NC);
236 std::vector<Index_> ibuffer(NC);
237 for (size_t r = start, end = start + len; r < end; ++r) {
238 auto extracted = wrk->fetch(r, xbuffer.data(), ibuffer.data());
239 update_hdf5_stats(extracted, collected[t], infer_value, infer_index);
240 }
241 }, NR, nthreads);
242
243 } else {
244 tatami::parallelize([&](size_t t, Index_ start, Index_ len) -> void {
245 auto wrk = tatami::consecutive_extractor<true>(mat, false, start, len, opt);
246 std::vector<Value_> xbuffer(NR);
247 std::vector<Index_> ibuffer(NR);
248 for (size_t c = start, end = start + len; c < end; ++c) {
249 auto extracted = wrk->fetch(c, xbuffer.data(), ibuffer.data());
250 update_hdf5_stats(extracted, collected[t], infer_value, infer_index);
251 }
252 }, NC, nthreads);
253 }
254
255 } else {
256 if (mat->prefer_rows()) {
257 tatami::parallelize([&](size_t t, Index_ start, Index_ len) -> void {
258 auto wrk = tatami::consecutive_extractor<false>(mat, true, start, len);
259 std::vector<Value_> xbuffer(NC);
260 for (size_t r = start, end = start + len; r < end; ++r) {
261 auto extracted = wrk->fetch(r, xbuffer.data());
262 update_hdf5_stats(extracted, NC, collected[t]);
263 }
264 }, NR, nthreads);
265
266 } else {
267 tatami::parallelize([&](size_t t, Index_ start, Index_ len) -> void {
268 auto wrk = tatami::consecutive_extractor<false>(mat, false, start, len);
269 std::vector<Value_> xbuffer(NR);
270 for (size_t c = start, end = start + len; c < end; ++c) {
271 auto extracted = wrk->fetch(c, xbuffer.data());
272 update_hdf5_stats(extracted, NR, collected[t]);
273 }
274 }, NC, nthreads);
275 }
276 }
277
278 auto& first = collected.front();
279 for (int i = 1; i < nthreads; ++i) {
280 auto& current = collected[i];
281 first.lower_data = std::min(first.lower_data, current.lower_data);
282 first.upper_data = std::max(first.upper_data, current.upper_data);
283 first.upper_index = std::max(first.upper_index, current.upper_index);
284 first.non_zeros += current.non_zeros;
285 first.non_integer = first.non_integer || current.non_integer;
286 }
287
288 return std::move(first); // better be at least one thread.
289}
307template<typename Value_, typename Index_>
309 auto data_type = params.data_type;
310 auto index_type = params.index_type;
311 auto use_auto_data_type = (data_type == WriteStorageType::AUTOMATIC);
312 auto use_auto_index_type = (index_type == WriteStorageType::AUTOMATIC);
313 auto stats = write_sparse_hdf5_statistics(mat, use_auto_data_type, use_auto_index_type, params.num_threads);
314
315 // Choosing the types.
316 if (use_auto_data_type) {
317 if (stats.non_integer && !params.force_integer) {
318 data_type = WriteStorageType::DOUBLE;
319 } else {
320 auto lower_data = stats.lower_data;
321 auto upper_data = stats.upper_data;
322 if (lower_data < 0) {
323 if (fits_lower_limit<int8_t>(lower_data) && fits_upper_limit<int8_t>(upper_data)) {
324 data_type = WriteStorageType::INT8;
325 } else if (fits_lower_limit<int16_t>(lower_data) && fits_upper_limit<int16_t>(upper_data)) {
326 data_type = WriteStorageType::INT16;
327 } else {
328 data_type = WriteStorageType::INT32;
329 }
330 } else {
331 if (fits_upper_limit<uint8_t>(upper_data)) {
332 data_type = WriteStorageType::UINT8;
333 } else if (fits_upper_limit<uint16_t>(upper_data)) {
334 data_type = WriteStorageType::UINT16;
335 } else {
336 data_type = WriteStorageType::UINT32;
337 }
338 }
339 }
340 }
341
342 if (use_auto_index_type) {
343 auto upper_index = stats.upper_index;
344 if (fits_upper_limit<uint8_t>(upper_index)) {
345 index_type = WriteStorageType::UINT8;
346 } else if (fits_upper_limit<uint16_t>(upper_index)) {
347 index_type = WriteStorageType::UINT16;
348 } else {
349 index_type = WriteStorageType::UINT32;
350 }
351 }
352
353 // Choosing the layout.
354 auto layout = params.columnar;
355 if (layout == WriteStorageLayout::AUTOMATIC) {
356 if (mat->prefer_rows()) {
357 layout = WriteStorageLayout::ROW;
358 } else {
359 layout = WriteStorageLayout::COLUMN;
360 }
361 }
362
363 // And then saving it. This time we have no choice but to iterate by the desired dimension.
364 auto non_zeros = stats.non_zeros;
365 H5::DataSet data_ds = create_1d_compressed_hdf5_dataset(location, data_type, params.data_name, non_zeros, params.deflate_level, params.chunk_size);
366 H5::DataSet index_ds = create_1d_compressed_hdf5_dataset(location, index_type, params.index_name, non_zeros, params.deflate_level, params.chunk_size);
367 hsize_t offset = 0;
368 H5::DataSpace inspace(1, &non_zeros);
369 H5::DataSpace outspace(1, &non_zeros);
370 const auto& dstype = define_mem_type<Value_>();
371 const auto& ixtype = define_mem_type<Index_>();
372
373 size_t NR = mat->nrow(), NC = mat->ncol(); // use size_t to avoid overflow on +1.
374 std::vector<hsize_t> ptrs;
375
376 auto fill_datasets = [&](const Value_* vptr, const Index_* iptr, hsize_t count) -> void {
377 if (count) {
378 inspace.setExtentSimple(1, &count);
379 outspace.selectHyperslab(H5S_SELECT_SET, &count, &offset);
380 data_ds.write(vptr, dstype, inspace, outspace);
381 index_ds.write(iptr, ixtype, inspace, outspace);
382 offset += count;
383 }
384 };
385
386 if (mat->sparse()) {
387 if (layout == WriteStorageLayout::ROW) {
388 ptrs.resize(NR + 1);
389 std::vector<Value_> xbuffer(NC);
390 std::vector<Index_> ibuffer(NC);
391
392 auto wrk = tatami::consecutive_extractor<true>(mat, true, static_cast<Index_>(0), static_cast<Index_>(NR));
393 for (size_t r = 0; r < NR; ++r) {
394 auto extracted = wrk->fetch(r, xbuffer.data(), ibuffer.data());
395 fill_datasets(extracted.value, extracted.index, extracted.number);
396 ptrs[r+1] = ptrs[r] + extracted.number;
397 }
398
399 } else {
400 ptrs.resize(NC + 1);
401 std::vector<Value_> xbuffer(NR);
402 std::vector<Index_> ibuffer(NR);
403
404 auto wrk = tatami::consecutive_extractor<true>(mat, false, static_cast<Index_>(0), static_cast<Index_>(NC));
405 for (size_t c = 0; c < NC; ++c) {
406 auto extracted = wrk->fetch(c, xbuffer.data(), ibuffer.data());
407 fill_datasets(extracted.value, extracted.index, extracted.number);
408 ptrs[c+1] = ptrs[c] + extracted.number;
409 }
410 }
411
412 } else {
413 std::vector<Value_> sparse_xbuffer;
414 std::vector<Index_> sparse_ibuffer;
415 auto fill_datasets_from_dense = [&](const Value_* extracted, Index_ n) -> hsize_t {
416 sparse_xbuffer.clear();
417 sparse_ibuffer.clear();
418 for (Index_ i = 0; i < n; ++i) {
419 if (extracted[i]) {
420 sparse_xbuffer.push_back(extracted[i]);
421 sparse_ibuffer.push_back(i);
422 }
423 }
424
425 hsize_t count = sparse_xbuffer.size();
426 fill_datasets(sparse_xbuffer.data(), sparse_ibuffer.data(), count);
427 return count;
428 };
429
430 if (layout == WriteStorageLayout::ROW) {
431 ptrs.resize(NR + 1);
432 std::vector<Value_> dbuffer(NC);
433 auto wrk = tatami::consecutive_extractor<false>(mat, true, static_cast<Index_>(0), static_cast<Index_>(NR));
434 for (size_t r = 0; r < NR; ++r) {
435 auto extracted = wrk->fetch(r, dbuffer.data());
436 auto count = fill_datasets_from_dense(extracted, NC);
437 ptrs[r+1] = ptrs[r] + count;
438 }
439
440 } else {
441 ptrs.resize(NC + 1);
442 std::vector<Value_> dbuffer(NR);
443 auto wrk = tatami::consecutive_extractor<false>(mat, false, static_cast<Index_>(0), static_cast<Index_>(NC));
444 for (size_t c = 0; c < NC; ++c) {
445 auto extracted = wrk->fetch(c, dbuffer.data());
446 auto count = fill_datasets_from_dense(extracted, NR);
447 ptrs[c+1] = ptrs[c] + count;
448 }
449 }
450 }
451
452 // Saving the pointers.
453 hsize_t ptr_len = ptrs.size();
454 H5::DataSet ptr_ds = create_1d_compressed_hdf5_dataset(location, H5::PredType::NATIVE_HSIZE, params.ptr_name, ptr_len, params.deflate_level, params.chunk_size);
455 H5::DataSpace ptr_space(1, &ptr_len);
456 ptr_ds.write(ptrs.data(), H5::PredType::NATIVE_HSIZE, ptr_space);
457
458 return;
459}
460
472template<typename Value_, typename Index_>
475 write_compressed_sparse_matrix(mat, location, params);
476 return;
477}
478
479}
480
481#endif
virtual Index_ ncol() const=0
virtual Index_ nrow() const=0
virtual bool prefer_rows() const=0
virtual std::unique_ptr< MyopicSparseExtractor< Value_, Index_ > > sparse(bool row, const Options &opt) const=0
Representations for matrix data in HDF5 files.
Definition CompressedSparseMatrix.hpp:23
WriteStorageLayout
Definition utils.hpp:24
WriteStorageType
Definition utils.hpp:29
void write_compressed_sparse_matrix(const tatami::Matrix< Value_, Index_ > *mat, H5::Group &location, const WriteCompressedSparseMatrixOptions &params)
Definition write_compressed_sparse_matrix.hpp:308
void parallelize(Function_ fun, Index_ tasks, int threads)
bool sparse_extract_index
bool sparse_extract_value
const Value_ * value
const Index_ * index
Parameters for write_compressed_sparse_matrix().
Definition write_compressed_sparse_matrix.hpp:25
std::string ptr_name
Definition write_compressed_sparse_matrix.hpp:50
WriteStorageType index_type
Definition write_compressed_sparse_matrix.hpp:76
bool force_integer
Definition write_compressed_sparse_matrix.hpp:70
int deflate_level
Definition write_compressed_sparse_matrix.hpp:81
std::string data_name
Definition write_compressed_sparse_matrix.hpp:38
std::string index_name
Definition write_compressed_sparse_matrix.hpp:44
WriteStorageType data_type
Definition write_compressed_sparse_matrix.hpp:62
int num_threads
Definition write_compressed_sparse_matrix.hpp:93
WriteStorageLayout columnar
Definition write_compressed_sparse_matrix.hpp:56
size_t chunk_size
Definition write_compressed_sparse_matrix.hpp:86
Utilities for HDF5 extraction.