Citlali
Loading...
Searching...
No Matches
despike.h
Go to the documentation of this file.
1#pragma once
2
3#include <boost/random.hpp>
4#include <boost/random/random_device.hpp>
5
6#include <Eigen/Core>
7
8#include <tula/logging.h>
9
10#include <tula/algorithm/mlinterp/mlinterp.hpp>
12
13namespace timestream {
14
15class Despiker {
16public:
17 // get logger
18 std::shared_ptr<spdlog::logger> logger = spdlog::get("citlali_logger");
19
20 // spike sigma, time constant, sample rate
22 double fsmp;
23
24 // for window size
25 bool run_filter = false;
26
27 // size of region to merge flags
28 int size = 10;
29
30 // standard deviation limit
31 double sigest_lim = 0;//1e-8;
32
33 // grouping for replacing spikes
34 std::string grouping;
35
36 // use all detectors in replacing flagged scans
37 bool use_all_det = true;
38
39 // the main despiking routine
40 template <typename DerivedA, typename DerivedB, typename apt_t>
41 void despike(Eigen::DenseBase<DerivedA> &, Eigen::DenseBase<DerivedB> &, apt_t &);
42
43 // replace flagged data with interpolation
44 template<typename DerivedA, typename DerivedB, typename apt_t>
45 void replace_spikes(Eigen::DenseBase<DerivedA> &, Eigen::DenseBase<DerivedB>&, apt_t &,
46 Eigen::Index);
47
48private:
49 // this function loops through the delta timestream values and finds the
50 // spikes setting the corresponding flags to zero and calculating n_spikes
51 template <typename DerivedA, typename DerivedB, typename DerivedC>
52 void spike_finder(Eigen::DenseBase<DerivedA> &flags,
53 Eigen::DenseBase<DerivedB> &delta,
54 Eigen::DenseBase<DerivedC> &diff, int &n_spikes,
55 double &cutoff) {
56
57 Eigen::Index n_pts = flags.size();
58
59 // set flag matrix to zero if scan delta is above the cutoff
60 flags.segment(1,n_pts - 2) =
61 (diff.segment(1,n_pts - 2).array() > cutoff).select(1, flags.segment(1,n_pts - 2));
62
63 // set corresponding delta values to zero
64 delta.segment(1,n_pts - 2) =
65 (diff.segment(1,n_pts - 2).array() > cutoff).select(0, delta.segment(1,n_pts - 2));
66
67 // update difference vector and cutoff
68 diff = abs(delta.derived().array() - delta.mean());
70 }
71
72 template <typename Derived>
73 auto make_window(Eigen::DenseBase<Derived> &spike_loc, int n_spikes,
74 Eigen::Index n_pts) {
75 // window first index, last index, and size
76 int win_index_0, win_index_1, win_size;
77
78 // find biggest spike-free window
79 // first deal with the harder case of multiple spikes
80 // remember that there are n_spikes + 1 possible windows
81 // only do if more than one spike
82 if (n_spikes > 1) {
83 Eigen::Matrix<int, Eigen::Dynamic, 1> delta_spike_loc(n_spikes + 1);
84 // first element is the distance to first spike
85 delta_spike_loc(0) = spike_loc(0);
86 // last element is the distance to last spike
87 delta_spike_loc(n_spikes) = n_pts - spike_loc(n_spikes - 1);
88 // populate the delta spike location vector
89 delta_spike_loc.segment(1,n_spikes - 1) =
90 spike_loc.tail(n_spikes - 1) - spike_loc.head(n_spikes - 1);
91
92 // get the maximum and index of the delta spike locations
93 Eigen::Index mx_window_index;
94 delta_spike_loc.maxCoeff(&mx_window_index);
95
96 logger->info("delta_spike_loc {} fsmp {} mx_window_index {}",delta_spike_loc,fsmp,mx_window_index);
97 logger->info("spike_loc(mx_window_index) {} spike_loc(mx_window_index-1) {}",spike_loc(mx_window_index),
98 spike_loc(mx_window_index-1));
99
100 // set the starting and ending indices for the window
101 if (mx_window_index == 0) {
102 win_index_0 = 0;
103 win_index_1 = spike_loc(1);// - fsmp;
104 }
105 else {
106 if (mx_window_index == n_spikes) {
107 win_index_0 = spike_loc(n_spikes - 1);
108 win_index_1 = n_pts;
109 }
110 // leave a 2 second region after the spike beginning the
111 // window and a 1 second region before the spike ending the window
112 else {
113 win_index_0 = spike_loc(mx_window_index - 1);// + 2 * fsmp;
114 win_index_1 = spike_loc(mx_window_index);// - fsmp;
115 }
116 }
117 }
118 else {
119 if (n_pts - spike_loc(0) > spike_loc(0)) {
120 win_index_0 = spike_loc(0) + 2 * fsmp;
121 win_index_1 = n_pts - 1;
122 }
123 else {
124 win_index_0 = 0;
125 win_index_1 = spike_loc(0) - fsmp;
126 }
127 }
128
129 logger->info("win_index_0 {}", win_index_0);
130 logger->info("win_index_1 {}", win_index_1);
131
132 // limit the maximum window size
133 if ((win_index_1 - win_index_0 - 1) / fsmp > size) {
134 win_index_1 = win_index_0 + size * fsmp + 1;
135 }
136
137 win_size = win_index_1 - win_index_0 - 1;
138
139 if (win_index_0 > win_index_1) {
140 logger->error("despike failed: win_index_0 > win_index_1");
141 logger->error("scan size {}", n_pts);
142 logger->error("spike_loc {}", spike_loc.derived());
143 logger->error("win_index_0 {}", win_index_0);
144 logger->error("win_index_1 {}", win_index_1);
145 logger->error("win_size {}", win_size);
146
147 std::exit(EXIT_FAILURE);
148 }
149
150 return std::tuple<int, int, int>(win_index_0, win_index_1, win_size);
151 }
152};
153
154template <typename DerivedA, typename DerivedB, typename apt_t>
155void Despiker::despike(Eigen::DenseBase<DerivedA> &scans,
156 Eigen::DenseBase<DerivedB> &flags,
157 apt_t &apt) {
158 Eigen::Index n_pts = scans.rows();
159 Eigen::Index n_dets = scans.cols();
160
161 // loop through detectors
162 for (Eigen::Index det=0; det<n_dets; det++) {
163 // only run if detector is good
164 if (apt["flag"](det)==0) {
165 // get detector's flags
166 Eigen::Matrix<bool, Eigen::Dynamic, 1> det_flags = flags.col(det);
167
168 // total number of spikes
169 int n_spikes = 0;
170
171 // array of delta's between adjacent points
172 Eigen::VectorXd delta = scans.col(det).tail(n_pts - 1) - scans.col(det).head(n_pts - 1);
173
174 // minimum amplitude of spike
175 double cutoff = min_spike_sigma * engine_utils::calc_std_dev(delta);
176
177 // mean subtracted delta array
178 Eigen::VectorXd diff = abs(delta.array() - delta.mean());
179
180 // run the spike finder,
181 spike_finder(det_flags, delta, diff, n_spikes, cutoff);
182
183 // variable to control spike_finder while loop
184 bool new_spikes_found = ((diff.segment(1,n_pts - 2).array() > cutoff).count() > 0) ? 1 : 0;
185
186 // keep despiking recursively to remove effects on the mean from large spikes
187 while (new_spikes_found) {
188 // if no new spikes found, set new_found to zero to end while loop
189 new_spikes_found = ((diff.segment(1,n_pts - 2).array() > cutoff).count() > 0) ? 1 : 0;
190
191 // only run if there are spikes
192 if (new_spikes_found) {
193 spike_finder(det_flags, delta, diff, n_spikes, cutoff);
194 }
195 }
196
197 // count up the number of spikes
198 n_spikes = (det_flags.head(n_pts - 1).array() == 1).count();
199
200 // if there are other spikes within set number of samples after a spike, set only the
201 // center value to be a spike
202 for (Eigen::Index i = 0; i < n_pts; ++i) {
203 if (det_flags(i) == 1) {
204 int size_loop = size;
205 // check the size of the region to set un_flagged if a flag is found.
206 if ((n_pts - i - 1) < size_loop) {
207 logger->info("rng {} {} {}", (n_pts - i - 1), size,i );
208 size_loop = n_pts - i - 1;
209 }
210
211 // count up the flags in the region
212 int c = (det_flags.segment(i + 1, size_loop).array() == 1).count();
213
214 // if flags are found
215 if (c > 0) {
216 // remove those flags from the total count
217 n_spikes -= c;
218 // set region to un_flagged
219 det_flags.segment(i + 1, size_loop).setZero();
220
221 // is this a bug? if n_pts - i <= size/2, i + size/2 >= n_pts
222 // for now, let's limit it to i + size/2 < n_pts since the
223 // start and end of the scan are not used due to
224 // filtering
225 if ((i + size_loop/2) < n_pts) {
226 det_flags(i + size_loop/2) = 1;
227 }
228 }
229
230 // increment so we go to the next sample region
231 i = i + size_loop - 1;
232 }
233 }
234
235 // now loop through if spikes were found
236 if (n_spikes > 0) {
237 // recount spikes
238 n_spikes = (det_flags.head(n_pts - 1).array() == 1).count();
239
240 logger->info("n_spikes 3 {} {}", n_spikes, det);
241
242 // vector for spike indices
243 Eigen::Matrix<int, Eigen::Dynamic, 1> spike_loc(n_spikes);
244 // amplitude of spike
245 Eigen::VectorXd spike_vals(n_spikes);
246
247 // populate scan location and amplitude vectors
248 int count = 0;
249 for (Eigen::Index i = 0; i < n_pts - 1; ++i) {
250 if (det_flags(i) == 1) {
251 spike_loc(count) = i + 1;
252 spike_vals(count) = scans(det,i+1) - scans(det,i);
253 count++;
254 }
255 }
256
257 // get the largest window that is without spikes
258 auto [win_index_0, win_index_1, win_size] =
259 make_window(spike_loc, n_spikes, n_pts);
260
261 // create a sub-array with values from the largest spike-free window
262 Eigen::VectorXd sub_vals =
263 scans.col(det).segment(win_index_0, win_size);
264
265 // copy of the sub-array for smoothing
266 Eigen::VectorXd smoothed_sub_vals = Eigen::VectorXd::Zero(win_size);
267
268 // smooth the sub-array with a box-car filter
269 engine_utils::smooth<engine_utils::SmoothType::boxcar>(sub_vals, smoothed_sub_vals, size);
270
271 // estimate the standard deviation
272 sub_vals -= smoothed_sub_vals;
273 auto sigest = engine_utils::calc_std_dev(sub_vals);
274 if (sigest < sigest_lim) {
275 sigest = sigest_lim;
276 }
277
278 // calculate the decay length of all the spikes
279 Eigen::VectorXd decay_length = -fsmp * time_constant_sec *
280 ((sigest / spike_vals.array()).abs()).log();
281
282 // if a decay length is less than 6, set it to 6
283 decay_length = (decay_length.array() < 6.).select(6., decay_length);
284
285
286 // exit if the decay length is too large
287 if ((decay_length.array() > size * fsmp).any()) {
288 logger->error("decay length is longer than {} * fsmp. mission "
289 "failed, we'll get em next time.",size);
290 std::exit(EXIT_FAILURE);
291 }
292
293 // due to filtering later, we flag a region with the size of the filter
294 // around each spike
295 if (run_filter) {
296 // half of the total despike window - 1
297 int decay_window = (window_size - 1) / 2;
298
299 for (Eigen::Index i=0; i<n_spikes; ++i) {
300 if (spike_loc(i) - decay_window >= 0 &&
301 spike_loc(i) + decay_window + 1 < n_pts) {
302 det_flags
303 .segment(spike_loc(i) - decay_window, 2*window_size + 1)
304 .setOnes();
305 }
306 }
307 }
308
309 // if lowpassing/highpassing skipped, use the decay length instead
310 else {
311 for (Eigen::Index i=0; i<n_spikes; ++i) {
312 if (spike_loc(i) - decay_length(i) >= 0 &&
313 spike_loc(i) + decay_length(i) + 1 < n_pts) {
314 det_flags
315 .segment(spike_loc(i) - decay_length(i), 2*decay_length(i) + 1)
316 .setOnes();
317 }
318 }
319 }
320
321 } // end of "if (n_spikes > 0)" loop
322 flags.col(det) = det_flags;
323 } // end of apt["flag"] loop
324 } // end of "for (Eigen::Index det = 0; det < n_dets; det++)" loop
325}
326
327template<typename DerivedA, typename DerivedB, typename apt_t>
328void Despiker::replace_spikes(Eigen::DenseBase<DerivedA> &scans, Eigen::DenseBase<DerivedB> &flags,
329 apt_t &apt, Eigen::Index start_det) {
330
331 // declare random number generator
332 thread_local boost::random::mt19937 eng;
333
334 Eigen::Index n_dets = flags.cols();
335 Eigen::Index n_pts = flags.rows();
336
337 // figure out if there are any flag-free detectors
338 Eigen::Index n_flagged = 0;
339
340 // if spike_free(detector) == 1, it contains a spike
341 // otherwise none found
342 auto spike_free = flags.colwise().maxCoeff();
343 n_flagged = n_dets - spike_free.template cast<int>().sum();
344
345 logger->info("has spikes {}", spike_free);
346 logger->info("n_flagged {}", n_flagged);
347
348 for (Eigen::Index det = 0; det < n_dets; det++) {
349 if (apt["flag"](det + start_det)==0) {
350 if (spike_free(det)) {
351 // condition flags so that if there is a spike we can make
352 // one long flagged or un-flagged region.
353 // first do spikes from 0 to 1
354 for (Eigen::Index j = 1; j < n_pts - 1; ++j) {
355 if (flags(j, det) == 0 && flags(j - 1, det) == 1 && flags(j + 1, det) == 1) {
356 flags(j, det) = 0;
357 }
358 }
359 // now do spikes from 1 to 0
360 for (Eigen::Index j = 1; j < n_pts - 1; ++j) {
361 if (flags(j, det) == 1 && flags(j - 1, det) == 0 && flags(j + 1, det) == 0) {
362 flags(j, det) = 1;
363 }
364 }
365 // and the first and last samples
366 flags(0, det) = flags(1, det);
367 flags(n_pts - 1, det) = flags(n_pts - 2, det);
368
369 // count up the number of flagged regions of data in the scan
370 Eigen::Index n_flagged_regions = 0;
371
372 if (flags(n_pts - 1, det) == 1) {
373 n_flagged_regions++;
374 }
375
376 n_flagged_regions
377 += ((flags.col(det).tail(n_pts - 1) - flags.col(det).head(n_pts - 1)).array() < 0)
378 .count()/ 2;
379 if (n_flagged_regions == 0) {
380 break;
381 }
382
383 logger->info("n_flagged_regions {}",n_flagged_regions);
384
385 // find the start and end index for each flagged region
386 Eigen::Matrix<int, Eigen::Dynamic, 1> si_flags(n_flagged_regions);
387 Eigen::Matrix<int, Eigen::Dynamic, 1> ei_flags(n_flagged_regions);
388
389 si_flags.setConstant(-1);
390 ei_flags.setConstant(-1);
391
392 int count = 0;
393 Eigen::Index j = 0;
394
395 while (j < n_pts) {
396 if (flags(j, det) == 1) {
397 int jstart = j;
398 int samp_count = 0;
399
400 while (flags(j, det) == 1 && j <= n_pts - 1) {
401 samp_count++;
402 j++;
403 }
404 if (samp_count > 1) {
405 si_flags(count) = jstart;
406 ei_flags(count) = j - 1;
407 count++;
408 } else {
409 j++;
410 }
411 } else {
412 j++;
413 }
414 }
415
416 logger->info("count {}", count);
417
418 // now loop on the number of flagged regions for the fix
419 Eigen::VectorXd xx(2);
420 Eigen::VectorXd yy(2);
421 Eigen::Matrix<Eigen::Index, 1, 1> tn_pts;
422 tn_pts << 2;
423
424 for (Eigen::Index j = 0; j < n_flagged_regions; ++j) {
425 // determine the linear baseline for flagged region
426 //but use flat level if flagged at endpoints
427 Eigen::Index n_flags = ei_flags(j) - si_flags(j);
428 Eigen::VectorXd lin_offset(n_flags);
429
430 if (si_flags(j) == 0) {
431 lin_offset.setConstant(scans(ei_flags(j) + 1, det));
432 }
433
434 else if (ei_flags(j) == n_pts - 1) {
435 lin_offset.setConstant(scans(si_flags(j) - 1, det));
436 }
437
438 else {
439 // linearly interpolate between the before and after good samples
440 xx(0) = si_flags(j) - 1;
441 xx(1) = ei_flags(j) + 1;
442 yy(0) = scans(si_flags(j) - 1, det);
443 yy(1) = scans(ei_flags(j) + 1, det);
444
445 Eigen::VectorXd xlin_offset =
446 Eigen::VectorXd::LinSpaced(n_flags, si_flags(j), si_flags(j) + n_flags - 1);
447
448 mlinterp::interp(tn_pts.data(), n_flags, yy.data(), lin_offset.data(), xx.data(),
449 xlin_offset.data());
450 logger->info("xlin_offset {}", xlin_offset);
451 }
452
453 logger->info("xx {}", xx);
454 logger->info("yy {}", yy);
455 logger->info("lin_offset {}", lin_offset);
456
457 // all non-flagged detectors repeat for all detectors without spikes
458 // count up spike-free detectors and store their values
459 int det_count = 0;
460 if (use_all_det) {
461 det_count = (apt["flag"].segment(start_det,n_dets).array()==0).count();
462 }
463 else {
464 for (Eigen::Index ii=0;ii<n_dets;ii++) {
465 if (!spike_free(ii) && apt["flag"](ii + start_det)==0) {
466 det_count++;
467 }
468 }
469 //det_count = spike_free.template cast<int>().sum();
470 }
471
472 logger->info("det_count {}", det_count);
473
474 Eigen::MatrixXd detm(n_flags, det_count);
475 detm.setConstant(-99);
476 Eigen::VectorXd res(det_count);
477
478 logger->info("si {}", si_flags);
479 int c = 0;
480 for (Eigen::Index ii = 0; ii < n_dets; ii++) {
481 if ((spike_free(ii) || use_all_det) && apt["flag"](ii + start_det)==0) {
482 detm.col(c) =
483 scans.block(si_flags(j), ii, n_flags, 1);
484 res(c) = apt["responsivity"](c + start_det);
485 c++;
486 }
487 }
488
489 detm.transposeInPlace();
490
491 logger->info("detm {}", detm);
492
493 // for each of these go through and redo the offset
494 Eigen::MatrixXd lin_offset_others(det_count, n_flags);
495
496 // first sample in scan is flagged so offset is flat
497 // with the value of the last sample in the flagged region
498 if (si_flags(j) == 0) {
499 lin_offset_others = detm.col(0).replicate(1, n_flags);
500 }
501
502 // last sample in scan is flagged so offset is flat
503 // with the value of the first sample in the flagged region
504 else if (ei_flags(j) == n_pts - 1) {
505 lin_offset_others = detm.col(n_flags - 1).replicate(1, n_flags);
506 }
507
508 else {
509 Eigen::VectorXd tmp_vec(n_flags);
510 Eigen::VectorXd xlin_offset =
511 Eigen::VectorXd::LinSpaced(n_flags, si_flags(j), si_flags(j) + n_flags - 1);
512
513 xx(0) = si_flags(j) - 1;
514 xx(1) = ei_flags(j) + 1;
515 // do we need this loop?
516 for (Eigen::Index ii = 0; ii < det_count; ii++) {
517 yy(0) = detm(ii, 0);
518 yy(1) = detm(ii, n_flags - 1);
519
520 mlinterp::interp(tn_pts.data(), n_flags, yy.data(), tmp_vec.data(), xx.data(),
521 xlin_offset.data());
522 lin_offset_others.row(ii) = tmp_vec;
523 }
524
525 logger->info("xlin_offset {}", xlin_offset);
526 }
527
528 logger->info("xx {}", xx);
529 logger->info("yy {}", yy);
530 logger->info("lin_offset_others {}", lin_offset_others);
531
532 detm.noalias() = detm - lin_offset_others;
533
534 logger->info("detm {}", detm);
535
536 // scale det by responsivities and average to make sky model
537 Eigen::VectorXd sky_model = Eigen::VectorXd::Zero(n_flags);
538
539 //sky_model = sky_model.array() + (detm.array().colwise() / res.array()).rowwise().sum();
540 //sky_model /= det_count;
541
542 for (Eigen::Index ii=0; ii<det_count; ii++) {
543 for (Eigen::Index l=0; l<n_flags; l++) {
544 sky_model(l) += detm(ii,l)/res(ii);
545 }
546 }
547
548 sky_model = sky_model/det_count;
549
550 logger->info("sky_model {}",sky_model);
551
552 Eigen::VectorXd std_dev_ff = Eigen::VectorXd::Zero(det_count);
553
554 for (Eigen::Index ii = 0; ii < det_count; ii++) {
555 Eigen::VectorXd tmp_vec = detm.row(ii).array() / res.array() - sky_model.transpose().array();
556
557 double tmp_mean = tmp_vec.mean();
558
559 std_dev_ff(ii) = (tmp_vec.array() - tmp_mean).pow(2).sum();
560 std_dev_ff(ii) = (n_flags == 1.) ? std_dev_ff(ii) / n_flags
561 : std_dev_ff(ii) / (n_flags - 1.);
562
563 }
564
565 logger->info("std_dev_ff {}",std_dev_ff);
566
567 double mean_std_dev = (std_dev_ff.array().sqrt()).sum() / det_count;
568
569 // add noise to the fake signal
570 //mean_std_dev *= apt["responsivity"](det + start_det); // not used
571
572 // boost random number generator
573 boost::random::normal_distribution<> rands{0, mean_std_dev};
574
575 Eigen::VectorXd error =
576 Eigen::VectorXd::Zero(n_flags).unaryExpr([&](double dummy){return rands(eng);});
577
578 logger->info("error {}", error);
579
580 // the noiseless fake data is then the sky model plus the
581 // flagged detectors linear offset
582 //Eigen::VectorXd fake = sky_model.array() * apt["responsivity"](det + start_det) + lin_offset.array() + error.array();
583 Eigen::VectorXd fake = sky_model.array() + lin_offset.array() + error.array();
584
585 logger->info("fake {}", fake);
586
587 logger->info("mean std dev {}", mean_std_dev);
588
589 scans.col(det).segment(si_flags(j), n_flags) = fake;
590 } // flagged regions
591 } // if it has spikes
592 } // apt flag
593 } // main detector loop
594}
595
596} // namespace timestream
Definition despike.h:15
int size
Definition despike.h:28
bool use_all_det
Definition despike.h:37
double fsmp
Definition despike.h:22
void despike(Eigen::DenseBase< DerivedA > &, Eigen::DenseBase< DerivedB > &, apt_t &)
Definition despike.h:155
std::string grouping
Definition despike.h:34
void replace_spikes(Eigen::DenseBase< DerivedA > &, Eigen::DenseBase< DerivedB > &, apt_t &, Eigen::Index)
Definition despike.h:328
double sigest_lim
Definition despike.h:31
double time_constant_sec
Definition despike.h:21
std::shared_ptr< spdlog::logger > logger
Definition despike.h:18
double window_size
Definition despike.h:21
bool run_filter
Definition despike.h:25
double min_spike_sigma
Definition despike.h:21
auto make_window(Eigen::DenseBase< Derived > &spike_loc, int n_spikes, Eigen::Index n_pts)
Definition despike.h:73
void spike_finder(Eigen::DenseBase< DerivedA > &flags, Eigen::DenseBase< DerivedB > &delta, Eigen::DenseBase< DerivedC > &diff, int &n_spikes, double &cutoff)
Definition despike.h:52
auto calc_std_dev(Eigen::DenseBase< DerivedA > &data)
Definition utils.h:336
Definition clean.h:10