Citlali
Loading...
Searching...
No Matches
lali.h
Go to the documentation of this file.
1#pragma once
2
3#include <mutex>
4
6
10
11// selects the type of TCData
12using timestream::TCDataKind;
13
14class Lali: public Engine {
15public:
16 // initial setup for each obs
17 void setup();
18
19 // main grppi pipeline
20 template <class KidsProc, class RawObs>
21 void pipeline(KidsProc &, RawObs &);
22
23 // run the reduction for the obs
24 auto run();
25
26 // output files
27 template <mapmaking::MapType map_type>
28 void output();
29};
30
32 // run obsnum setup
34}
35
36template <class KidsProc, class RawObs>
37void Lali::pipeline(KidsProc &kidsproc, RawObs &rawobs) {
39
40 // initialize number of completed scans
41 n_scans_done = 0;
42
43 // declare random number generator
44 boost::random::mt19937 eng;
45
46 // boost random number generator (0,1)
47 boost::random::uniform_int_distribution<> rands{0,1};
48
49 // progress bar
50 tula::logging::progressbar pb(
51 [&](const auto &msg) { logger->info("{}", msg); }, 100, "citlali progress ");
52
53 // grppi generator function. gets time chunk data from files sequentially and passes them to grppi::farm
54 grppi::pipeline(tula::grppi_utils::dyn_ex(parallel_policy),
55 [&]() -> std::optional<tuple_t> {
56 // variable to hold current scan
57 static int scan = 0;
58 // loop through scans
59 while (scan < telescope.scan_indices.cols()) {
60 // update progress bar
61 pb.count(telescope.scan_indices.cols(), 1);
62
63 // create rtcdata
64 TCData<TCDataKind::RTC, Eigen::MatrixXd> rtcdata;
65 // get scan indices
66 rtcdata.scan_indices.data = telescope.scan_indices.col(scan);
67 // current scan
68 rtcdata.index.data = scan;
69
70 // populate noise matrix (do outside of parallelized region for thread safety)
71 if (run_noise) {
72 if (omb.randomize_dets) {
73 // n_noise x n_dets
74 rtcdata.noise.data = Eigen::Matrix<int, Eigen::Dynamic, Eigen::Dynamic>::Zero(omb.n_noise, calib.n_dets)
75 .unaryExpr([&](int dummy){ return 2 * rands(eng) - 1; });
76 } else {
77 // n_noise
78 rtcdata.noise.data = Eigen::Matrix<int, Eigen::Dynamic, 1>::Zero(omb.n_noise)
79 .unaryExpr([&](int dummy){ return 2 * rands(eng) - 1; });
80 }
81 }
82 // vector to store kids data
83 std::vector<kids::KidsData<kids::KidsDataKind::RawTimeStream>> scan_rawobs;
84 // get kids data
85 if (!interp_over_gaps) {
86 scan_rawobs = kidsproc.load_rawobs(rawobs, scan, telescope.scan_indices, start_indices, end_indices);
87 }
88 else {
89 scan_rawobs = kidsproc.load_rawobs_gaps(rawobs, scan, telescope.scan_indices, start_indices,
90 t_common, nw_times, 1 / (2 * telescope.fsmp));
91 }
92 // current length of outer scans
93 Eigen::Index sl = rtcdata.scan_indices.data(3) - rtcdata.scan_indices.data(2) + 1;
94
95 // get raw tod from files
96 if (!interp_over_gaps) {
97 rtcdata.scans.data = kidsproc.populate_rtc(scan_rawobs, sl, calib.n_dets, tod_type);
98 }
99 else {
100 rtcdata.scans.data = kidsproc.populate_rtc_gaps(scan_rawobs, t_common, nw_times, masks, scan, 1 / (2 * telescope.fsmp),
102 }
103 // try and clear input vector
104 std::vector<kids::KidsData<kids::KidsDataKind::RawTimeStream>>().swap(scan_rawobs);
105
106 // increment scan
107 scan++;
108 // return rtcdata
109 return rtcdata;
110 }
111 // reset scan to zero for each obs
112 scan = 0;
113 return {};
114 },
115
116 // run the farm
117 run());
118
119 if (run_mapmaking) {
120 // normalize maps
121 logger->info("normalizing maps");
122 if (map_method != "maximum_likelihood") {
123 if (rtcproc.run_polarization) {
124 omb.normalize_polarized_maps();
125 }
126 else {
127 omb.normalize_maps();
128 }
129 }
130 // calculate map psds
131 logger->info("calculating map psd");
132 omb.calc_map_psd();
133 // calculate map histograms
134 logger->info("calculating map histogram");
135 omb.calc_map_hist();
136 // calculate mean error
137 omb.calc_median_err();
138 // calculate mean rms
139 omb.calc_median_rms();
140
141 // write map summary
142 if (verbose_mode) {
143 write_map_summary(omb);
144 }
145 }
146}
147
148auto Lali::run() {
150
151 auto farm = grppi::farm(n_threads,[&](input_t &rtcdata) {
152 // starting index for scan
153 Eigen::Index si = rtcdata.scan_indices.data(2);
154 // current length of outer scans
155 Eigen::Index sl = rtcdata.scan_indices.data(3) - rtcdata.scan_indices.data(2) + 1;
156
157 // copy scan's telescope vectors
158 for (const auto& x: telescope.tel_data) {
159 rtcdata.tel_data.data[x.first] = telescope.tel_data[x.first].segment(si,sl);
160 }
161
162 // copy pointing offsets
163 for (const auto& [axis,offset]: pointing_offsets_arcsec) {
164 rtcdata.pointing_offsets_arcsec.data[axis] = offset.segment(si,sl);
165 }
166
167 // get hwpr
169 if (calib.run_hwpr) {
170 rtcdata.hwpr_angle.data = calib.hwpr_angle.segment(si + hwpr_start_indices, sl);
171 }
172 }
173
174 // set up flags
175 rtcdata.flags.data.resize(rtcdata.scans.data.rows(), rtcdata.scans.data.cols());
176 rtcdata.flags.data.setConstant(0);
177
178 if (interp_over_gaps) {
179 int i = 0;
180 for (auto const& [key, val] : calib.nw_limits) {
181 auto& mask = masks[i];
182
183 Eigen::Index start = std::get<0>(calib.nw_limits[key]);
184 Eigen::Index end = std::get<1>(calib.nw_limits[key]) - 1;
185
186 for (int j = 0; j < rtcdata.flags.data.rows(); ++j) {
187 int start_index = j;
188 int size = 1;
190 start_index = std::max(j, static_cast<int>(j - rtcproc.filter.n_terms));
191 int end_index = std::min(j + rtcproc.filter.n_terms, rtcdata.flags.data.rows() - 1);
192 size = end_index - start_index + 1;
193 }
194 if (mask(j + si) == 0) {
195 rtcdata.flags.data.block(start_index, start, size, end - start + 1).setOnes();
196 }
197 }
198 logger->debug("{}/{} gaps flagged", rtcdata.flags.data.col(start).template cast<int>().sum(), rtcdata.flags.data.rows());
199 i++;
200 }
201 }
202
203 // create PTCData
205
206 logger->info("starting scan {}. {}/{} scans completed", rtcdata.index.data + 1, n_scans_done,
207 telescope.scan_indices.cols());
208
209 // run rtcproc
210 logger->info("raw time chunk processing for scan {}", rtcdata.index.data + 1);
211 auto map_indices = rtcproc.run(rtcdata, ptcdata, calib, telescope, omb.pixel_size_rad, map_grouping);
212
213 // remove flagged detectors
215
216 // remove outliers before cleaning
217 auto calib_scan = rtcproc.remove_bad_dets(ptcdata, calib, map_grouping);
218
219 // remove duplicate tones
220 if (!telescope.sim_obs) {
221 calib_scan = rtcproc.remove_nearby_tones(ptcdata, calib, map_grouping);
222 }
223
224 // write rtc timestreams
225 if (run_tod_output && !tod_filename.empty()) {
226 if (tod_output_type == "rtc" || tod_output_type == "both") {
227 logger->info("writing raw time chunk");
229 ptcdata.pointing_offsets_arcsec.data, calib);
230 }
231 }
232
233 // if running fruit loops and a map has been read in
234 if (ptcproc.run_fruit_loops && !ptcproc.tod_mb.signal.empty()) {
235 logger->info("subtracting map from tod");
236 // subtract map
238 map_indices, telescope.pixel_axes,
240 }
241
242 // run cleaning
243 logger->info("processed time chunk processing for scan {}", ptcdata.index.data + 1);
244 ptcproc.run(ptcdata, ptcdata, calib, telescope.pixel_axes, map_grouping);
245
246 // if running fruit loops and a map has been read in
247 if (ptcproc.run_fruit_loops && !ptcproc.tod_mb.signal.empty()) {
248 if (run_mapmaking && run_noise) {
249 // calculate weights
250 logger->info("calculating weights");
252
253 // reset weights to median
254 auto calib_scans = ptcproc.reset_weights(ptcdata, calib, map_grouping);
255
256 // populate noise maps only
257 bool run_omb = false;
258 logger->info("populating noise maps");
259 if (map_method=="naive") {
260 naive_mm.populate_maps_naive(ptcdata, omb, cmb, map_indices, telescope.pixel_axes,
261 calib.apt, telescope.d_fsmp, run_omb, run_noise);
262 }
263 else if (map_method=="jinc") {
264 jinc_mm.populate_maps_jinc(ptcdata, omb, cmb, map_indices, telescope.pixel_axes,
265 calib.apt, telescope.d_fsmp, run_omb, run_noise);
266 }
267 }
268 logger->info("adding map to tod");
269 // add map back
271 map_indices, telescope.pixel_axes,
273 }
274
275 // remove outliers after cleaning
276 calib_scan = ptcproc.remove_bad_dets(ptcdata, calib, map_grouping);
277
278 // calculate weights
279 logger->info("calculating weights");
281
282 // reset weights to median
283 calib_scan = ptcproc.reset_weights(ptcdata, calib, map_grouping);
284
285 // write ptc timestreams
286 if (run_tod_output && !tod_filename.empty()) {
287 if (tod_output_type == "ptc" || tod_output_type == "both") {
288 logger->info("writing processed time chunk");
290 ptcdata.pointing_offsets_arcsec.data, calib);
291 }
292 }
293
294 // write out chunk summary
295 if (verbose_mode) {
296 write_chunk_summary(ptcdata);
297 }
298
299 // write stats
300 logger->debug("calculating stats");
301 diagnostics.calc_stats(ptcdata);
302
303 // populate maps
304 if (run_mapmaking) {
305 // make signal, weight, kernel, and coverage maps
306 bool run_omb = true;
307 bool run_noise_fruit = run_noise;
308
309 // if running fruit loops, noise maps are made on source
310 // subtracted timestreams so don't make them here
311 if (ptcproc.run_fruit_loops && !ptcproc.tod_mb.signal.empty()) {
312 run_noise_fruit = false;
313 }
314
315 // populate maps with current time chunk
316 logger->info("populating maps");
317 if (map_method=="naive") {
318 naive_mm.populate_maps_naive(ptcdata, omb, cmb, map_indices, telescope.pixel_axes,
319 calib.apt, telescope.d_fsmp, run_omb, run_noise_fruit);
320 }
321 else if (map_method=="jinc") {
322 jinc_mm.populate_maps_jinc(ptcdata, omb, cmb, map_indices, telescope.pixel_axes,
323 calib.apt, telescope.d_fsmp, run_omb, run_noise_fruit);
324 }
325 else if (map_method=="maximum_likelihood") {
326 ml_mm.populate_maps_ml(ptcdata, omb, cmb, map_indices, telescope.pixel_axes,
327 calib, telescope.d_fsmp, run_omb, run_noise_fruit);
328 }
329 }
330
331 // increment number of completed scans
332 n_scans_done++;
333 logger->info("done with scan {}. {}/{} scans completed", ptcdata.index.data + 1, n_scans_done, telescope.scan_indices.cols());
334 });
335
336 return farm;
337}
338
339template <mapmaking::MapType map_type>
341 // pointer to map buffer
342 mapmaking::MapBuffer* mb = nullptr;
343 // pointer to data file fits vector
344 std::vector<fitsIO<file_type_enum::write_fits, CCfits::ExtHDU*>>* f_io = nullptr;
345 // pointer to noise file fits vector
346 std::vector<fitsIO<file_type_enum::write_fits, CCfits::ExtHDU*>>* n_io = nullptr;
347
348 // directory name
349 std::string dir_name;
350
351 // set common variables depending on map_type
352 if constexpr (map_type == mapmaking::RawObs || map_type == mapmaking::FilteredObs) {
353 mb = &omb;
354 dir_name = obsnum_dir_name + (map_type == mapmaking::RawObs ? "raw/" : "filtered/");
355 f_io = (map_type == mapmaking::RawObs) ? &fits_io_vec : &filtered_fits_io_vec;
357
358 if constexpr (map_type == mapmaking::RawObs) {
359 // write stats file
360 write_stats();
361 if (run_tod_output && !tod_filename.empty()) {
362 // add tod header information
364 }
365 }
366 }
367 else if constexpr (map_type == mapmaking::RawCoadd || map_type == mapmaking::FilteredCoadd) {
368 mb = &cmb;
369 dir_name = coadd_dir_name + (map_type == mapmaking::RawCoadd ? "raw/" : "filtered/");
372 }
373
374 if (run_mapmaking) {
375 // wiener filtered maps write before this and are deleted from the vector.
376 if (!f_io->empty()) {
377 {
378 // progress bar
379 tula::logging::progressbar pb(
380 [&](const auto &msg) { logger->info("{}", msg); }, 100, "output progress ");
381
382 for (Eigen::Index i=0; i<f_io->size(); ++i) {
383 // get the array for the given map
384 // add primary hdu
385 logger->debug("adding primary header to file {}",i);
386 add_phdu(f_io, mb, i);
387
388 // add primary hdu to noise maps
389 if (!mb->noise.empty()) {
390 logger->debug("adding primary header to noise file {}",i);
391 add_phdu(n_io, mb, i);
392 }
393 }
394
395 logger->debug("done adding primary headers");
396
397 // write the maps
398 for (Eigen::Index i=0; i<n_maps; ++i) {
399 // update progress bar
400 pb.count(n_maps, 1);
401 write_maps(f_io,n_io,mb,i);
402 }
403 }
404
405 logger->info("maps have been written to:");
406 for (Eigen::Index i=0; i<f_io->size(); ++i) {
407 logger->info("{}.fits",f_io->at(i).filepath);
408 }
409 }
410
411 // clear fits file vectors to ensure its closed.
412 std::vector<fitsIO<file_type_enum::write_fits, CCfits::ExtHDU*>>().swap(*f_io);
413 std::vector<fitsIO<file_type_enum::write_fits, CCfits::ExtHDU*>>().swap(*n_io);
414
415 // write psd and histogram files
416 logger->debug("writing psds");
417 write_psd<map_type>(mb, dir_name);
418 logger->debug("writing histograms");
419 write_hist<map_type>(mb, dir_name);
420
421 // write source table
422 if (run_source_finder) {
423 logger->debug("writing source table");
424 write_sources<map_type>(mb, dir_name);
425 }
426 }
427}
Definition engine.h:155
Eigen::Index hwpr_start_indices
Definition engine.h:208
void obsnum_setup()
Definition engine.h:353
std::string obsnum_dir_name
Definition engine.h:184
void write_stats()
Definition engine.h:2251
std::string parallel_policy
Definition engine.h:196
std::map< std::string, Eigen::VectorXd > pointing_offsets_arcsec
Definition engine.h:241
std::vector< fitsIO< file_type_enum::write_fits, CCfits::ExtHDU * > > filtered_coadd_fits_io_vec
Definition engine.h:251
std::vector< Eigen::Index > start_indices
Definition engine.h:205
std::vector< Eigen::VectorXd > nw_times
Definition engine.h:166
std::vector< fitsIO< file_type_enum::write_fits, CCfits::ExtHDU * > > noise_fits_io_vec
Definition engine.h:246
std::vector< fitsIO< file_type_enum::write_fits, CCfits::ExtHDU * > > coadd_noise_fits_io_vec
Definition engine.h:250
std::string tod_output_type
Definition engine.h:223
std::string coadd_dir_name
Definition engine.h:184
std::vector< fitsIO< file_type_enum::write_fits, CCfits::ExtHDU * > > filtered_noise_fits_io_vec
Definition engine.h:247
bool verbose_mode
Definition engine.h:172
int n_maps
Definition engine.h:229
std::vector< fitsIO< file_type_enum::write_fits, CCfits::ExtHDU * > > filtered_fits_io_vec
Definition engine.h:247
int n_scans_done
Definition engine.h:199
void add_tod_header()
Definition engine.h:1054
std::vector< Eigen::Index > end_indices
Definition engine.h:205
Eigen::VectorXd t_common
Definition engine.h:164
std::vector< fitsIO< file_type_enum::write_fits, CCfits::ExtHDU * > > filtered_coadd_noise_fits_io_vec
Definition engine.h:251
void add_phdu(fits_io_type &, map_buffer_t &, Eigen::Index)
Definition engine.h:1737
std::vector< Eigen::VectorXi > masks
Definition engine.h:165
std::string map_grouping
Definition engine.h:226
std::shared_ptr< spdlog::logger > logger
Definition engine.h:161
std::map< std::string, std::string > tod_filename
Definition engine.h:187
std::string tod_type
Definition engine.h:211
std::vector< fitsIO< file_type_enum::write_fits, CCfits::ExtHDU * > > coadd_fits_io_vec
Definition engine.h:250
void write_chunk_summary(TCData< tc_t, Eigen::MatrixXd > &)
Definition engine.h:1529
std::vector< fitsIO< file_type_enum::write_fits, CCfits::ExtHDU * > > fits_io_vec
Definition engine.h:246
int n_threads
Definition engine.h:193
std::string map_method
Definition engine.h:226
void write_maps(fits_io_type &, fits_io_type &, map_buffer_t &, Eigen::Index)
Definition engine.h:2030
Definition lali.h:14
void setup()
Definition lali.h:31
void output()
Definition lali.h:340
auto run()
Definition lali.h:148
void pipeline(KidsProc &, RawObs &)
Definition lali.h:37
bool run_hwpr
Definition calib.h:41
std::map< std::string, Eigen::VectorXd > apt
Definition calib.h:22
Eigen::VectorXd hwpr_angle
Definition calib.h:24
std::map< Eigen::Index, std::tuple< Eigen::Index, Eigen::Index > > nw_limits
Definition calib.h:56
Eigen::Index n_dets
Definition calib.h:47
void calc_stats(timestream::TCData< tcdata_kind, Eigen::MatrixXd > &)
Definition diagnostics.h:61
bool sim_obs
Definition telescope.h:20
std::map< std::string, Eigen::VectorXd > tel_data
Definition telescope.h:54
double fsmp
Definition telescope.h:42
double d_fsmp
Definition telescope.h:42
Eigen::MatrixXI scan_indices
Definition telescope.h:51
std::string pixel_axes
Definition telescope.h:59
void populate_maps_jinc(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, map_buffer_t &, map_buffer_t &, Eigen::DenseBase< Derived > &, std::string &, apt_t &, double, bool, bool)
Definition jinc_mm.h:211
void populate_maps_ml(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, map_buffer_t &, map_buffer_t &, Eigen::DenseBase< Derived > &, std::string &, calib_t &, double, bool, bool)
Definition ml_mm.h:37
Definition map.h:45
std::vector< Eigen::MatrixXd > signal
Definition map.h:78
std::vector< Eigen::Tensor< double, 3 > > noise
Definition map.h:81
double pixel_size_rad
Definition map.h:69
void populate_maps_naive(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, map_buffer_t &, map_buffer_t &, Eigen::DenseBase< Derived > &, std::string &, apt_t &, double, bool, bool)
Definition naive_mm.h:95
Eigen::Index n_terms
Definition filter.h:19
Definition ptcproc.h:19
void append_to_netcdf(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, std::string, std::string, std::string &, pointing_offset_t &, calib_t &)
Definition ptcproc.h:498
void calc_weights(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, apt_type &, tel_type &)
Definition ptcproc.h:300
void run(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, TCData< TCDataKind::PTC, Eigen::MatrixXd > &, calib_type &, std::string, std::string)
Definition ptcproc.h:173
auto reset_weights(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, calib_t &, std::string)
Definition ptcproc.h:383
Definition rtcproc.h:18
void remove_flagged_dets(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, apt_t &)
Definition rtcproc.h:493
bool run_polarization
Definition rtcproc.h:23
timestream::Filter filter
Definition rtcproc.h:35
bool run_tod_filter
Definition rtcproc.h:26
auto remove_nearby_tones(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, calib_t &, std::string)
Definition rtcproc.h:516
void append_to_netcdf(TCData< TCDataKind::PTC, Eigen::MatrixXd > &, std::string, std::string, std::string &, pointing_offset_t &, calib_t &)
Definition rtcproc.h:553
auto run(TCData< TCDataKind::RTC, Eigen::MatrixXd > &, TCData< TCDataKind::PTC, Eigen::MatrixXd > &, calib_t &, telescope_t &, double, std::string)
Definition rtcproc.h:261
void map_to_tod(mb_t &, TCData< tcdata_t, Eigen::MatrixXd > &, calib_t &, Eigen::DenseBase< Derived > &, std::string, std::string)
Definition timestream.h:624
auto remove_bad_dets(TCData< tcdata_t, Eigen::MatrixXd > &, calib_t &, std::string)
Definition timestream.h:703
@ NegativeMap
Definition timestream.h:272
@ Map
Definition timestream.h:271
bool run_fruit_loops
Definition timestream.h:279
mapmaking::MapBuffer tod_mb
Definition timestream.h:296
@ FilteredObs
Definition map.h:19
@ FilteredCoadd
Definition map.h:21
@ RawObs
Definition map.h:18
@ RawCoadd
Definition map.h:20
int run(const rc_t &rc)
Definition main.cpp:104
The raw obs struct This represents a single observation that contains a set of data items and calibra...
Definition io.h:50
mapmaking::NaiveMapmaker naive_mm
Definition engine.h:109
engine::Telescope telescope
Definition engine.h:96
mapmaking::MLMapmaker ml_mm
Definition engine.h:111
engine::Diagnostics diagnostics
Definition engine.h:98
timestream::PTCProc ptcproc
Definition engine.h:105
mapmaking::JincMapmaker jinc_mm
Definition engine.h:110
mapmaking::MapBuffer omb
Definition engine.h:108
mapmaking::MapBuffer cmb
Definition engine.h:108
timestream::RTCProc rtcproc
Definition engine.h:102
engine::Calib calib
Definition engine.h:95
bool run_noise
Definition engine.h:86
bool interp_over_gaps
Definition engine.h:73
bool run_source_finder
Definition engine.h:90
bool run_tod_output
Definition engine.h:81
bool run_mapmaking
Definition engine.h:84
TC data class.
Definition timestream.h:55