Skip to content

Accessing stellar spectra

Define a base Frame object to interface with the observations.

This ensures that ASTRA is fully agnostic to the instrument that we are using, as long as our data is properly loaded in the children classes of the Frame object.

It will also provide a common set of names for commonly used header values.

Frame

Bases: Spectrum, Spectral_Modelling, Spectral_Normalization

Base Class for the different instruments.

Providing a shared interface to spectral data and header information.

This class defines a set of Keywords, consistent for all ASTRA supported Instruments, which can be accessed through the proper methods. The internal keywords are initialized to a default value, which the Frame will use if the instrument does not provide that metric/value. Furthermore, all RV-related metrics are returned as astropy.Quantity objects (or lists of such objects). For such cases, one can use :func:~ASTRAutils.units.convert_data to convert data to different units and/or to floats

The supported list of keywords, and the default initialization values is:

Internal KW name Default intialization
BERV np.nan * kilometer_second
previous_SBART_RV np.nan * kilometer_second
previous_SBART_RV_ERR np.nan * kilometer_second
DRS_CCF_MASK ""
DRS_FLUX_CORRECTION_TEMPLATE ""
DRS_RV np.nan * kilometer_second
DRS_RV_ERR np.nan * kilometer_second
drift np.nan * kilometer_second
drift_ERR np.nan * kilometer_second
relative_humidity np.nan, # for telfi
ambient_temperature np.nan [Kelvin], # for telfi
airmass np.nan
orderwise_SNRs []
OBJECT None
MAX_BERV np.nan * kilometer_second
BJD None
MJD None
DRS-VERSION None
MD5-CHECK None
ISO-DATE None
CONTRAST 0
CONTRAST_ERR 0
FWHM 0, # Store this as km/
FWHM_ERR 0, # Store this as km/
BIS SPAN 0, # Store this as km/
BIS SPAN_ERR 0, # Store this as km/
EXPTIME 0
RA None
DEC None
SPEC_TYPE "None", # This keyword is simply loading the CCF mask..
DET_BINX None
DET_BINY None
seeing None
MOON PHASE 0
MOON DISTANCE 0
INS MODE "None"
INS NAME "None"
PROG ID "None"
DATE_NIGHT "None"
Source code in src/ASTRA/base_models/Frame.py
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
class Frame(Spectrum, Spectral_Modelling, Spectral_Normalization):
    """Base Class for the different instruments.

    Providing a shared interface to spectral data and header information.

    This class defines a set of Keywords, consistent for all ASTRA supported Instruments, which can be accessed
    through the proper methods. The internal keywords are initialized to a default value, which the Frame will use
    if the instrument does  not provide that metric/value. Furthermore, all RV-related metrics are returned as
    astropy.Quantity objects (or lists of such objects). For such cases, one can use
    :func:`~ASTRAutils.units.convert_data` to convert data to different units and/or to floats


    The supported list of keywords, and the default initialization values is:


    Internal KW name     |  Default intialization
    ------------ | -------------
        BERV| np.nan * kilometer_second
    previous_SBART_RV| np.nan * kilometer_second
    previous_SBART_RV_ERR| np.nan * kilometer_second
    DRS_CCF_MASK| ""
    DRS_FLUX_CORRECTION_TEMPLATE| ""
    DRS_RV| np.nan * kilometer_second
    DRS_RV_ERR| np.nan * kilometer_second
    drift| np.nan * kilometer_second
    drift_ERR| np.nan * kilometer_second
    relative_humidity| np.nan,  # for telfi
    ambient_temperature| np.nan [Kelvin],  # for telfi
    airmass| np.nan
    orderwise_SNRs| []
    OBJECT| None
    MAX_BERV| np.nan * kilometer_second
    BJD| None
    MJD| None
    DRS-VERSION| None
    MD5-CHECK| None
    ISO-DATE| None
    CONTRAST| 0
    CONTRAST_ERR| 0
    FWHM| 0,  # Store this as km/
    FWHM_ERR| 0,  # Store this as km/
    BIS SPAN| 0,  # Store this as km/
    BIS SPAN_ERR| 0,  # Store this as km/
    EXPTIME| 0
    RA| None
    DEC| None
    SPEC_TYPE "None",  # This keyword is simply loading the CCF mask..
    DET_BINX| None
    DET_BINY| None
    seeing| None
    MOON PHASE| 0
    MOON DISTANCE| 0
    INS MODE| "None"
    INS NAME| "None"
    PROG ID| "None"
    DATE_NIGHT| "None"

    """

    _object_type = "Frame"
    _name = ""

    sub_instruments: dict[str, datetime.datetime] = {}
    # Dict of options and default values for them. Specific for each instrument

    _default_params = DefaultValues(
        bypass_QualCheck=UserParam(False, constraint=BooleanValue),
        open_without_BervCorr=UserParam(
            False,
            constraint=BooleanValue,
            description=(
                "Ensure that the Frame is not BERV corrected, independently "
                "of correction being applied or not in the official pipeline"
            ),
        ),
        apply_FluxCorr=UserParam(False, constraint=ValueFromIterable((False,))),
        use_air_wavelengths=UserParam(
            False,
            constraint=BooleanValue,
            description="Use air wavelengths, instead of the vacuum ones",
        ),
        apply_FluxBalance_Norm=UserParam(False, constraint=ValueFromIterable((False,))),
        reject_order_percentage=UserParam(0.25, constraint=ValueInInterval((0, 1), include_edges=True)),
        # If the SNR is smaller, discard the order:
        minimum_order_SNR=UserParam(
            5,
            constraint=Positive_Value_Constraint,
            description="SNR threshold under which the spectral order is rejected",
        ),
        MAX_ORDER_REJECTION=UserParam(
            50,
            constraint=ValueInInterval((0, 100)),
            description="Maximum percentage of orders that a Frame can reject before being considered invalid",
        ),
        bypass_ST_designation=UserParam(default_value=None, constraint=ValueFromIterable((None, "S2D", "S1D"))),
        IS_SA_CORRECTED=UserParam(
            False,
            constraint=BooleanValue,
            description=(
                "Indicates if the SA correction is already accounted in the BERV."
                " By default False, as the majority of instruments do not have it included in their BERV calculation"
            ),
        ),
        REJECT_NEGATIVE_FLUXES=UserParam(
            True,
            constraint=BooleanValue,
            description="Reject any flux value that falls below zero. Default: True",
        ),
        SIGMA_CLIP_FLUX_VALUES=UserParam(
            -1,
            constraint=NumericValue,
            description="Sigma clip the flux values. Disabled if the value is -1 (the default),"
            " as it is scarcely needed and we have outlier detection on the RV extraction",
        ),
        USE_APPROX_BERV_CORRECTION=UserParam(
            False,
            constraint=BooleanValue,
            description="Use the approximated BERV correction",
        ),
    )

    order_intervals: dict[DETECTOR_DEFINITION, list[int]] = {
        DETECTOR_DEFINITION.WHITE_LIGHT: [],
        DETECTOR_DEFINITION.RED_DET: [],
        DETECTOR_DEFINITION.BLUE_DET: [],
    }

    def __init__(
        self,
        inst_name: str,
        array_size: Dict[str, tuple],
        file_path: Path,
        frameID: int,
        KW_map: Dict[str, str],
        available_indicators: tuple,
        user_configs: Optional[Dict[str, Any]] = None,
        reject_subInstruments: Optional[Iterable[str]] = None,
        need_external_data_load: bool = False,
        init_log: bool = True,
        quiet_user_params: bool = True,
    ):
        """Init for all instruments.

        The Frame object is initialized with the following set of Keywords:

        Parameters
        ----------
        inst_name
            Name of the instrument
        array_size
            Size of the data that will be loaded from disk. Follow the format [Number order, Number pixels]
        file_path
            Path to the file that is going to be opened
        frameID
            Numerical value that represents the frame's ID inside the :class:`~ASTRAdata_objects.DataClass.DataClass`
        KW_map
            Dictionary where the keys are names of internal Keywords and the values represent the keyword name on the
            header of the .fits files
        available_indicators
            Names of available activity indicators for the instrument
        user_configs
            User configs information to be loaded in the parent class
        reject_subInstruments
            List of subInstruments to completely reject
        need_external_data_load
            True if the instrument must load data from a file that is not the one specified on the "file_path" argument
        use_approximated_BERV_correction
            If True, uses the approximated berv_factor
        init_log
            If True create a log entry with the filename
        quiet_user_params
            If True, there are no logs for the generation of the user parameters of each Frame

        """
        user_configs: dict = {} if user_configs is None else user_configs
        self.instrument_properties = {
            "name": inst_name,
            "array_sizes": array_size,
            "array_size": None,
            "wavelength_coverage": (),
            "resolution": None,
            "EarthLocation": None,
            "site_pressure": None,  # pressure in hPa
            "is_drift_corrected": None,  # True if the S2D files are already corrected from the drift
        }

        self.frameID = frameID
        self._status = Status()  # BY DEFAULT IT IS A VALID ONE!

        if not isinstance(file_path, (str, Path)):
            raise custom_exceptions.InvalidConfiguration("Invalid path!")

        if not isinstance(file_path, Path):
            file_path = Path(file_path)

        self.file_path: Path = file_path
        if init_log:
            logger.info(f"Creating frame from: {self.file_path}")
        self.inst_name = inst_name

        self.sub_instrument = None

        self.available_indicators = available_indicators

        self._KW_map = KW_map
        if "UseMolecfit" in user_configs:
            self.spectral_format = "S1D"
        elif "bypass_ST_designation" in user_configs:
            self.spectral_format = user_configs["bypass_ST_designation"]
        else:
            self.spectral_format = self.get_spectral_type()
        self.instrument_properties["array_size"] = self.instrument_properties["array_sizes"][self.spectral_format]
        self.array_size = self.instrument_properties["array_size"]
        super().__init__(user_configs=user_configs, quiet_user_params=quiet_user_params)

        self.use_approximated_BERV_correction = self._internal_configs["USE_APPROX_BERV_CORRECTION"]

        # stores the information loaded from the header of the S2D files. THis dict will be the default values in case
        # the instrument does not support them!
        # orderwise SNRs OR values with units -> should not be passed inside the KW_map!!!
        self.observation_info = {
            "BERV": np.nan * kilometer_second,
            # THE BERV value on the ESPRESSO pipeline is not fully correct on BERV, so we need to propagate
            # the BERV factor. As such, it will be a common property to all frames. Default value of None, so
            # that we can ensure that it is properly computed when used
            "BERV_FACTOR": None,
            "previous_SBART_RV": np.nan * kilometer_second,
            "previous_SBART_RV_ERR": np.nan * kilometer_second,
            "DRS_CCF_MASK": "",
            "DRS_FLUX_CORRECTION_TEMPLATE": "",
            "DRS_RV": np.nan * kilometer_second,
            "DRS_RV_ERR": np.nan * kilometer_second,
            "drift": np.nan * kilometer_second,
            "drift_ERR": np.nan * kilometer_second,
            "relative_humidity": np.nan,  # for telfit
            "ambient_temperature": np.nan,  # for telfit
            "airmass": np.nan,
            "orderwise_SNRs": [],
            "OBJECT": None,
            "MAX_BERV": 35 * kilometer_second,
            "BJD": None,
            "MJD": None,
            "DRS-VERSION": None,
            "MD5-CHECK": None,
            "ISO-DATE": None,
            "CONTRAST": 0,
            "CONTRAST_ERR": 0,
            "FWHM": 0,  # Store this as km/s
            "FWHM_ERR": 0,  # Store this as km/s
            "BIS SPAN": 0,  # Store this as km/s
            "BIS SPAN_ERR": 0,  # Store this as km/s
            "EXPTIME": 0,
            "RA": None,
            "DEC": None,
            "SPEC_TYPE": "None",  # This keyword is simply loading the CCF mask...
            "DET_BINX": None,
            "DET_BINY": None,
            "seeing": None,
            "MOON PHASE": 0,
            "MOON DISTANCE": 0,
            "INS MODE": "None",
            "INS NAME": "None",
            "PROG ID": "None",
            "DATE_NIGHT": "None",
        }

        # Used to allow to reject a wavelength region from one order and keep any overlap that might exist on others
        self._orderwise_wavelength_rejection: Optional[Dict[int, List]] = None

        self.load_header_info()
        # list of lists Each entry will be a pair of Reason: list<[<start, end>]> wavelenghts. When the S2D array is
        # opened, these elements will be used to mask spectral regions

        # TODO: understand why the typing error goes away when we use dict() instead of {}
        self.wavelengths_to_remove: Dict[Flag, List[List[int]]] = {}

        # Store here the wavelength limits for each order (if we want to impose them)!
        self.wavelengths_to_keep = None

        if reject_subInstruments is not None:
            for bad_subInst in reject_subInstruments:
                if self.is_SubInstrument(bad_subInst):
                    self.add_to_status(MISSING_DATA("Rejected entire subInstrument"))
                    logger.warning("Rejecting subInstruments")

        if need_external_data_load:
            self.add_to_status(LOADING_EXTERNAL_DATA)

        self._header: Optional[fits.Header] = None
        if "skysub" in self.file_path.stem:
            self.is_skysub = True

    def get_spectral_type(self) -> str:
        """Check the filename to see if we are using an S1D or S2D file.

        Raises:
            custom_exceptions.InternalError: If it is not possible to recognize the filename

        Returns:
            str: S1D or S2D

        """
        name_lowercase = self.file_path.stem.lower()
        if "s2d" in name_lowercase or "e2ds" in name_lowercase:
            return "S2D"
        if "s1d" in name_lowercase:
            return "S1D"
        raise custom_exceptions.InternalError(f"{self.name} can't recognize the file that it received!")

    def copy_into_S2D(self, new_S2D_size: Optional[Tuple[int, int]] = None) -> Frame:
        """Return a new object which contains the S1D that that has been converted into a S2D.

        Args:
            new_S2D_size (Optional[Tuple[int, int]], optional): Size of the new S2D size, should be a tuple with two
            elements: (number orders, pixel in order). If it is None, then uses the standard size of S2D files of this
            instrument. Defaults to None.

        Raises:
            custom_exceptions.InvalidConfiguration: If it is already in S2D format

        Returns:
            Frame: new Frame

        """
        if self.is_S2D:
            raise custom_exceptions.InvalidConfiguration("Can't transform S2D file into S2D file")
        logger.warning("Creating a copy of a S1D Frame for transformation into S2D")

        og_shape = self.instrument_properties["array_sizes"]["S2D"] if new_S2D_size is None else new_S2D_size

        reconstructed_S2D = np.zeros(og_shape)
        reconstructed_wavelengths = np.zeros(og_shape)
        reconstructed_uncertainties = np.zeros(og_shape)

        order_number = 0
        order_size = reconstructed_wavelengths[0].size
        to_break = False
        wavelengths, flux, uncertainties, _ = self.get_data_from_full_spectrum()
        wavelengths = wavelengths[0]
        flux = flux[0]
        uncertainties = uncertainties[0]

        while not to_break:
            start_order = order_size * order_number
            end_order = start_order + order_size
            if end_order >= wavelengths.size:
                to_break = True
                end_order = wavelengths.size

            slice_size = end_order - start_order
            reconstructed_wavelengths[order_number] = np.pad(
                wavelengths[start_order:end_order],
                (0, order_size - slice_size),
                constant_values=0,
            )
            reconstructed_S2D[order_number] = np.pad(
                flux[start_order:end_order],
                (0, order_size - slice_size),
                constant_values=0,
            )
            reconstructed_uncertainties[order_number] = np.pad(
                uncertainties[start_order:end_order],
                (0, order_size - slice_size),
                constant_values=0,
            )
            order_number += 1

        # The "new" orders that don't have any information will have a flux of zero. Thus, they will be deemed to
        # be invalid during the mask creation process (that is re-launched after this routine is done)

        # Ensure that we don't lose information due to the SNR cut
        user_configs = self._internal_configs._user_configs
        user_configs["minimum_order_SNR"] = 0

        inst_properties = self.instrument_properties["array_sizes"]
        if new_S2D_size is not None:
            inst_properties["S2D"] = new_S2D_size

        new_frame = Frame(
            inst_name=self.inst_name,
            array_size=inst_properties,
            file_path=self.file_path,
            frameID=self.frameID,
            KW_map=self._KW_map,
            available_indicators=self.available_indicators,
            user_configs=self._internal_configs._user_configs,
        )
        new_frame.wavelengths = reconstructed_wavelengths
        new_frame.spectra = reconstructed_S2D
        new_frame.uncertainties = reconstructed_uncertainties
        for key in ["observation_info", "instrument_properties"]:
            setattr(new_frame, key, getattr(self, key))

        new_frame._spectrum_has_data_on_memory = True  # to avoid new data loads!
        new_frame._never_close = True  # ensure that we don't lose the transformation
        new_frame.spectral_format = "S2D"
        new_frame.instrument_properties["array_size"] = new_S2D_size
        new_frame.array_size = new_S2D_size
        new_frame.sub_instrument = self.sub_instrument
        new_frame.is_blaze_corrected = self.is_blaze_corrected
        new_frame.observation_info["orderwise_SNRs"] = [1 for _ in range(new_S2D_size[0])]
        new_frame.regenerate_order_status()
        return new_frame

    def import_KW_from_outside(self, KW: str, value: Any, optional: bool) -> None:
        """Allow to manually override header parameters (in memory) from the outside.

        This can be used if an instrument has data stored in multiple files. This allows a post-setup
        update of header values (for the keywords stored in observation_info)

        Args:
            KW (str): keyword name, as defined by the Frame interface
            value (Any): New value
            optional (bool): if it is optional, it can be a non-finite value

        Raises:
            FrameError: If we attempt to load a optional=False keyword that has a non-finite value

        """
        if KW not in self.observation_info:
            logger.critical(
                "Keyword <{}> is not supported by the Frames. Couldn't load it from the outside",
                KW,
            )

        if not np.isfinite(value):
            if not optional:
                logger.critical(
                    "Loaded mandatory keyword <{}> with a non-finite value for frame {}",
                    KW,
                    self.fname,
                )
                raise FrameError
            logger.critical(
                "Loaded keyword <{}> has a non-finite value for frame {}",
                KW,
                self.fname,
            )
        self.observation_info[KW] = value

    def reject_wavelength_region_from_order(self, order: int, region: list[tuple[int, int]]) -> None:
        """Flag a wavelength region from specific order to be marked as invalid during the creation of the stellar mask.

        This will not account for order overlaps.

        Args:
            order (_type_): _description_
            region (_type_): _description_

        Raises:
            custom_exceptions.InvalidConfiguration:

        """
        if not isinstance(region, (Iterable,)):
            raise custom_exceptions.InvalidConfiguration("The rejection region must be a list of lists")

        if self._orderwise_wavelength_rejection is None:
            self._orderwise_wavelength_rejection = {}
        self._orderwise_wavelength_rejection[order] = region

    def mark_wavelength_region(self, reason: Flag, wavelength_blocks: list[tuple[int, int]]) -> None:
        """Add wavelength regions to be removed whenever the S2D file is opened.

        When rejecting wavelengths through this function, we only have to specify wavelength intervels, allowing
        to account for possible order overlap. When loading the Frame, we search through all orders to find any
        occurence of this wavelength blocks.

        Parameters
        ----------
        reason : Flag
            Flag for the removal type
        wavelength_blocks : list[tuple[int, int]]
            List with lists of wavelength limits. [[lambda_0, lambda_1], [lambda_2, lambda_3]] to reject.z\

        """
        self.wavelengths_to_remove[reason] = wavelength_blocks

    def select_wavelength_region(self, order: int, wavelength_blocks: list[tuple[int, int]]) -> None:
        """Reject all wavelengths that are not part of the provided intervals.

        Args:
            order (int): Spectral order
            wavelength_blocks (list[list[int]]): List of tuples, each containing wavelength of start and end
            of each "good" interval

        """
        if self.wavelengths_to_keep is None:
            self.wavelengths_to_keep = {}
        self.wavelengths_to_keep[order] = wavelength_blocks

    def finalize_data_load(self, bad_flag: Optional[Flag] = None) -> None:
        """Run for all Instruments, even those that do not need an external data load.

        Checks if the non-fatal Flag "LOADING_EXTERNAL_DATA" exists in the Status.
        If so, add the fatal Flag "MISSING_EXTERNAL_DATA". Otherwise, does nothing

        """
        if self._status.has_flag(LOADING_EXTERNAL_DATA):
            logger.critical(f"Frame {self.name} did not load the external data that it needed!")

            self._status.delete_flag(LOADING_EXTERNAL_DATA)
            if bad_flag is None:
                self.add_to_status(MISSING_EXTERNAL_DATA)
            else:
                self.add_to_status(bad_flag)

    def finalized_external_data_load(self) -> None:
        """Mark frame after everything is loaded into memory.

        The frames that need external data will have a Flag of "LOADING_EXTERNAL_DATA" that will translate into a
        rejection of the Frame (if it is not removed).

        This call will remove that flag from Status and sinalizes that this Frame managed to load everything that
        it needed
        """
        if not self.is_valid:
            logger.warning("Finalizing external data loading for Frame that was already rejected.")
        else:
            self._status.delete_flag(LOADING_EXTERNAL_DATA)

    def add_to_status(self, new_flag: Flag) -> None:
        """Add a new Flag to the Status of this Frame."""
        logger.debug("Updating Frame ({}) status to {}", self.fname, new_flag)

        super().add_to_status(new_flag=new_flag)

        if not self.is_valid:
            self.close_arrays()

    def _data_access_checks(self) -> None:
        super()._data_access_checks()
        if not self.is_open:
            self.load_data()

    @property
    def status(self) -> Status:
        """Return the Status of the entire Frame."""
        return self._status

    @property
    def is_SA_corrected(self) -> bool:
        """Check if the frame was corrected from secular acceleration."""
        return self._internal_configs["IS_SA_CORRECTED"]

    ###################################
    #          Cleaning data          #
    ###################################

    def build_mask(self, bypass_QualCheck: bool = False, assess_bad_orders: bool = True) -> None:
        """Build a pixel-wise mask for rejection purposes.

        Args:
            bypass_QualCheck (bool, optional): If True, Bypass using the QUAL_DATA from the DRS. Defaults to False.
            assess_bad_orders (bool, optional): if True, reject entire spectral orders based on the assess_bad_orders()
                This can be used if we want to run more pixel-rejection methods before we evaluate bad orders.
            Defaults to True.

        """
        self.spectral_mask = Mask(initial_mask=np.zeros(self.instrument_properties["array_size"], dtype=np.uint16))
        if not bypass_QualCheck:
            zero_indexes = np.where(self.qual_data != 0)
            self.spectral_mask.add_indexes_to_mask(zero_indexes, QUAL_DATA)

        self.spectral_mask.add_indexes_to_mask(np.where(~np.isfinite(self.spectra)), MISSING_DATA)
        self.spectral_mask.add_indexes_to_mask(np.where(self.spectra == 0), MISSING_DATA)

        if self._internal_configs["REJECT_NEGATIVE_FLUXES"]:
            self.spectral_mask.add_indexes_to_mask(np.where(self.spectra < 0), MISSING_DATA)

        self.spectral_mask.add_indexes_to_mask(np.where(self.uncertainties == 0), MISSING_DATA)
        self.spectral_mask.add_indexes_to_mask(np.where(~np.isfinite(self.uncertainties)), MISSING_DATA)

        order_map = {i: (np.min(self.wavelengths[i]), np.max(self.wavelengths[i])) for i in range(self.N_orders)}
        removal_reasons = [i.name for i in self.wavelengths_to_remove.keys()]
        N_point_removed = []
        time_took = []

        logger.debug(f"Cleaning wavelength regions from {removal_reasons}")

        for removal_reason, wavelengths in self.wavelengths_to_remove.items():
            start_time = time.time()
            nrem = len(wavelengths)

            N_point_removed.append(nrem)
            for wave_pair in wavelengths:
                for order in range(self.N_orders):
                    if check_if_overlap(wave_pair, order_map[order]):
                        indexes = np.where(
                            np.logical_and(
                                self.wavelengths[order] >= wave_pair[0],
                                self.wavelengths[order] <= wave_pair[1],
                            ),
                        )
                        self.spectral_mask.add_indexes_to_mask_order(order, indexes, removal_reason)
            time_took.append(time.time() - start_time)
        logger.debug(
            "Removed {} regions ({})",
            sum(N_point_removed),
            " + ".join(map(str, N_point_removed)),
        )

        if self._internal_configs["SIGMA_CLIP_FLUX_VALUES"] > 0:
            logger.info("Sigma-clipping on flux is activated. Running rejection procedure")
            median_level = np.median(self.spectra, axis=1)
            threshold = (
                self._internal_configs["SIGMA_CLIP_FLUX_VALUES"] * self.uncertainties + median_level[:, np.newaxis]
            )
            self.spectral_mask.add_indexes_to_mask(np.where(self.spectra > threshold), MISSING_DATA)

        if self._orderwise_wavelength_rejection is not None:
            logger.info("Rejecting spectral chunks from individual orders")
            for order, region in self._orderwise_wavelength_rejection.items():
                for subregion in region:
                    indexes = np.where(
                        np.logical_and(
                            self.wavelengths[order] >= subregion[0],
                            self.wavelengths[order] <= subregion[1],
                        ),
                    )
                    self.spectral_mask.add_indexes_to_mask_order(order, indexes, NON_COMMON_WAVELENGTH)

        logger.debug("Ensuring that we have increasing wavelengths")

        diffs = np.where(np.diff(self.wavelengths, axis=1) < 0)
        if diffs[0].size > 0:
            logger.warning("Found non-increasing wavelengths on {}", self.name)
            self.spectral_mask.add_indexes_to_mask(diffs, QUAL_DATA("Non-increasing wavelengths"))
        logger.debug("Took {} seconds ({})", sum(time_took), " + ".join(map(str, time_took)))

        if assess_bad_orders:
            self.assess_bad_orders()

        if self.wavelengths_to_keep is not None:
            logger.info("Provided desired wavelength region. Rejecting regions outside it")
            for order in range(self.N_orders):
                good_regions = self.wavelengths_to_keep[order]
                if len(good_regions) == 0:  # TODO: ensure that the order is also rejected
                    continue

                inds = np.zeros(self.wavelengths[order].size, dtype=bool)
                for region in good_regions:
                    wavelengths_to_keep = np.where(
                        np.logical_and(
                            self.wavelengths[order] >= region[0],
                            self.wavelengths[order] <= region[1],
                        ),
                    )
                    inds[wavelengths_to_keep] = True
                self.spectral_mask.add_indexes_to_mask_order(order, np.where(~inds), NON_COMMON_WAVELENGTH)

    def assess_bad_orders(self) -> None:
        """Evaluate the orders and Frames that can be fully rejected.

        Goals:
        1) Check if any order rejects more than *reject_order_percentage* % of the pixels. If so, rejects it
        2) Apply SNR cut of *minimum_order_SNR*
        3) if a Frame rejects more than *MAX_ORDER_REJECTION * % of all orders, it is rejected from the analysis.

        """
        # True in the points to mask
        logger.debug("Rejecting spectral orders")

        if self.spectral_mask is not None:
            entire_mask = self.spectral_mask.get_custom_mask()

            for order, value in enumerate(entire_mask):
                # See if the total amounf of rejected points is larger than
                # 1 - reject_order-percentage of the entire order
                perc = self._internal_configs["reject_order_percentage"]
                if np.sum(value) > (1 - perc) * self.pixels_per_order:
                    self._OrderStatus.add_flag_to_order(order, HIGH_CONTAMINATION("Rejection threshold met in order"))

        if len(self.bad_orders) > 0:
            logger.debug(
                "Frame {} rejected {} orders due for having less than {} valid pixels: {}",
                self.frameID,
                len(self.bad_orders),
                self._internal_configs["reject_order_percentage"],
                ranges(list(self.bad_orders)),
            )

        if self.is_S2D:  # we don't have the SNR for the S1D file!
            bad_SNR = []
            SNRS = self.get_KW_value("orderwise_SNRs")
            for order in range(self.N_orders):
                if SNRS[order] < self._internal_configs["minimum_order_SNR"]:
                    self._OrderStatus.add_flag_to_order(order, LOW_SNR("Minimum SNR not met in order"))
                    bad_SNR.append(order)

            if len(bad_SNR) > 0:
                logger.info(
                    "Frame {} rejected {} orders for having SNR smaller than {}: {}",
                    self.frameID,
                    len(bad_SNR),
                    self._internal_configs["minimum_order_SNR"],
                    ranges(bad_SNR),
                )

        if len(self.bad_orders) >= self._internal_configs["MAX_ORDER_REJECTION"] * self.N_orders / 100:
            logger.warning(
                "Frame {} is rejecting more than {} % of the spectral orders",
                self,
                self._internal_configs["MAX_ORDER_REJECTION"],
            )
            self.add_to_status(
                NO_VALID_ORDERS(
                    f" Rejected more than {self._internal_configs['MAX_ORDER_REJECTION']} % of spectral orders",
                ),
            )

    ####################################
    #      Sanity Checks               #
    ####################################
    def check_header_QC(self, header: fits.header.Header) -> None:
        """Check if the header keywords are in accordance with their default value.

        Each instrument should do this check on its own

        This function will check for two things:
        1. Fatal keywords - will mark the Frame as invalid
        2. Warning Keywords - the frame is still valid, but it has a warning issued in the logs

        If any of those conditions is met, make sure that the flags meet the following naming conditions
        (so that we can filter by them later on):

        For fatal flags
        ```
        msg = f"QC flag {flag} has taken the bad value of {bad_value}"
        self.add_to_status(FATAL_KW(msg))
        ```

        For warnings:
        ```
        msg = f"QC flag {flag} meets the bad value"
        self._status.store_warning(KW_WARNING(msg))
        ```
        """

    def find_instrument_type(self) -> None:
        """Compare the date of observation with pre-defined sub-Instruments to see where it fits."""
        obs_date = self.get_KW_value("ISO-DATE")
        obs_date = "-".join(obs_date.split("T")).split(":")[0]
        obs_date = datetime.datetime.strptime(obs_date, r"%Y-%m-%d-%H")

        for key, threshold in self.__class__.sub_instruments.items():
            # If it is not higher tha  the threshold, then it beleongs in this "interval"
            if not obs_date > threshold:
                self.sub_instrument = key
                break
        else:
            raise custom_exceptions.InternalError("no sub-instrument found for observation")

    #####################################
    #      Handle data management      #
    ####################################
    def get_S1D_name(self) -> str:
        """Build the S1D name that should be associated with this Frame.

        If it is already a S1D, returns the actual name.
        If it is not, remove "blaze" from the filename and replaces "S2D" with "S1D"

        """
        # TODO: this will not work for non-ESPRESSO files

        if self.is_S1D:
            return self.fname
        name = self.fname
        return name.replace("BLAZE_", "").replace("S2D", "S1D")

    def load_data(self) -> None:
        """Abstraction to load all data of this Frame.

        If the Frame is already open, it does nothing.
        Calls the S1D or S2D version of the data load, depending on file type
        Can remove BERV correction at run time, if properly configured to do so.

        Raises:
            custom_exceptions.InternalError: If it is neither S2D or S1D
            FrameError: If the frame is no longer valid after loading

        """
        if self.is_open:
            return

        if self.is_S1D:
            self.load_S1D_data()
        elif self.is_S2D:
            self.load_S2D_data()
        else:
            raise custom_exceptions.InternalError("something went wrong on this frame")

        if not self.is_valid:
            raise FrameError("Frame is no longer valid")

        BERV_value = self.get_KW_value("BERV")

        if not self._internal_configs["open_without_BervCorr"]:
            self.apply_BERV_correction(BERV_value)
        else:
            logger.warning(f"Opening {self.name} without the BERV correction")
            self.remove_BERV_correction(BERV_value)

    def load_S1D_data(self) -> None:
        """To be overriden by the children classes."""
        logger.debug("Opening the S1D arrays from {}", self.fname)
        if not self.is_valid:
            raise FrameError
        self._spectrum_has_data_on_memory = True

    def load_S2D_data(self) -> None:
        """To be overriden by the children classes."""
        logger.debug("Opening the S2D arrays from {}", self.fname)
        if not self.is_valid:
            raise FrameError
        self._spectrum_has_data_on_memory = True

    def load_instrument_specific_KWs(self, header: Mapping[str, Any]) -> None:
        """Load instrument-specific KW values that can't be loaded in a general fashion.

        To be overriden by the different instruments

        Args:
            header (Mapping[str, Any]): header unit of this observation

        """

    def load_header_info(self) -> None:
        """Open the header of the fits file and load the necessary keywords.

        Does the following operations:
        1) Load header assuming fits file
        2) Parse through the _KW_map to load header keywords
        3) Call self.load_instrument_specific_KWs
        4) Call check_header_QC(hdu)
        5) Call find_instrument_type()
        6) Call assess_bad_orders()

        """
        try:
            hdu = fits.getheader(self.file_path)
        except FileNotFoundError:
            msg = f"File <{self.file_path}> does not exist"
            self.add_to_status(MISSING_FILE(msg))
            logger.critical(msg)
            return

        for internal_KW, S2D_KW in self._KW_map.items():
            self.observation_info[internal_KW] = hdu[S2D_KW]

        self.load_instrument_specific_KWs(hdu)
        self.check_header_QC(hdu)
        self.find_instrument_type()
        self.assess_bad_orders()

    ####################################
    #       Access data
    ####################################

    def get_KW_value(self, KW: str) -> Any:
        """Get a given KW value that is defined in the common framework."""
        return self.observation_info[KW]

    def get_header_value(self, kw: str) -> Any:
        """Directly retrieves a KW from the header.

        After this is called, the frame will keep the header stored in memory until the object is deleted

        Args:
            kw (str): Keyword name, present in the fits header

        Returns:
            Any: Header value

        """
        if self._header is None:
            self._header = fits.getheader(self.file_path)
        return self._header[kw]

    ####################################
    #       properties of the Frames
    ####################################

    @property
    def is_S1D(self) -> bool:
        """Check if Frame is of S1D format."""
        return self.spectral_format == "S1D"

    @property
    def is_S2D(self) -> bool:
        """Check if Frame is of S2D format."""
        return self.spectral_format == "S2D"

    @property
    def has_warnings(self) -> bool:
        """Check if Frame has any warnings."""
        return self._status.has_warnings

    def is_Instrument(self, Instrument: str) -> bool:
        """Check if Frame is from a given instrument."""
        return self.inst_name == Instrument

    def is_SubInstrument(self, sub_instrument: str) -> bool:
        """Check if the current instrument is from the given time_block (e.g ESPRESSO18/ESPRESSO19).

        Parameters
        ----------
        sub_instrument : str
            Name of the time block that is going to be checked

        Returns
        -------
        [bool]
            Results from the comparison

        """
        return self.sub_instrument == sub_instrument

    @property
    def previous_RV_measurements(self) -> tuple[RV_measurement, RV_measurement]:
        """Get previous DRS RV and uncertainty."""
        return self.get_KW_value("DRS_RV"), self.get_KW_value("DRS_RV_ERR")

    @property
    def bare_fname(self) -> str:
        """Returns the file name without the _S2D (and similar) parts.

        The children classes must overload this property. Otherwise, returns the full filename
        """
        return self.fname

    @property
    def fname(self) -> str:
        """Get filename."""
        return self.file_path.name

    @property
    def min_pixel_in_order(self) -> int:
        """Minimum number of pixels in order to be a valid one."""
        return self._internal_configs["reject_order_percentage"] * self.pixels_per_order

    @property
    def spectrum_information(self) -> dict[str, Any]:
        """Get general instrument and spectra information."""
        return {
            "subInstrument": self.sub_instrument,
            "filename": self.bare_fname,
            "is_S2D": self.is_S2D,
            "is_S1D": self.is_S1D,
            **super().spectrum_information,
        }

    def __repr__(self) -> str:  # noqa: D105
        return self.__str__()

    def __str__(self) -> str:  # noqa: D105
        return (
            f"Frame of {self.inst_name} : {self.sub_instrument}"
            f" data ({self.get_KW_value('ISO-DATE')}; ID = {self.frameID})"
        )

    def plot_spectra(
        self,
        which_orders: None | DETECTOR_DEFINITION | list[int] = None,
        axis=None,
    ):
        """Plot the spectra.

        Args:
            which_orders (None | DETECTOR_DEFINITION | list[int], optional): Either a pre-configured
            detector definition, a list of orders, or None (plots all orders). Defaults to None.
            axis (_type_, optional): if None, create a new figure. Otherwise, use this one. Defaults to None.

        """
        fig = None
        if axis is None:
            fig, axis = plt.subplots()
        wf, ff, ef, mf = self.get_data_from_full_spectrum()

        if which_orders is None:
            which_orders = DETECTOR_DEFINITION.WHITE_LIGHT

        if isinstance(which_orders, (list, tuple)):
            orders_to_plot = which_orders
        else:
            orders_to_plot = self.order_intervals[which_orders]

        for sl in orders_to_plot:
            w, f, e, m = wf[sl], ff[sl], ef[sl], mf[sl]
            axis.errorbar(w[~m], f[~m], e[~m], ls="", marker="x")

        return fig, axis

    def trigger_data_storage(self, *args, **kwargs):
        super().trigger_data_storage(*args, **kwargs)

bare_fname property

Returns the file name without the _S2D (and similar) parts.

The children classes must overload this property. Otherwise, returns the full filename

fname property

Get filename.

has_warnings property

Check if Frame has any warnings.

is_S1D property

Check if Frame is of S1D format.

is_S2D property

Check if Frame is of S2D format.

is_SA_corrected property

Check if the frame was corrected from secular acceleration.

min_pixel_in_order property

Minimum number of pixels in order to be a valid one.

previous_RV_measurements property

Get previous DRS RV and uncertainty.

spectrum_information property

Get general instrument and spectra information.

status property

Return the Status of the entire Frame.

add_to_status(new_flag)

Add a new Flag to the Status of this Frame.

Source code in src/ASTRA/base_models/Frame.py
def add_to_status(self, new_flag: Flag) -> None:
    """Add a new Flag to the Status of this Frame."""
    logger.debug("Updating Frame ({}) status to {}", self.fname, new_flag)

    super().add_to_status(new_flag=new_flag)

    if not self.is_valid:
        self.close_arrays()

assess_bad_orders()

Evaluate the orders and Frames that can be fully rejected.

Goals: 1) Check if any order rejects more than reject_order_percentage % of the pixels. If so, rejects it 2) Apply SNR cut of minimum_order_SNR 3) if a Frame rejects more than *MAX_ORDER_REJECTION * % of all orders, it is rejected from the analysis.

Source code in src/ASTRA/base_models/Frame.py
def assess_bad_orders(self) -> None:
    """Evaluate the orders and Frames that can be fully rejected.

    Goals:
    1) Check if any order rejects more than *reject_order_percentage* % of the pixels. If so, rejects it
    2) Apply SNR cut of *minimum_order_SNR*
    3) if a Frame rejects more than *MAX_ORDER_REJECTION * % of all orders, it is rejected from the analysis.

    """
    # True in the points to mask
    logger.debug("Rejecting spectral orders")

    if self.spectral_mask is not None:
        entire_mask = self.spectral_mask.get_custom_mask()

        for order, value in enumerate(entire_mask):
            # See if the total amounf of rejected points is larger than
            # 1 - reject_order-percentage of the entire order
            perc = self._internal_configs["reject_order_percentage"]
            if np.sum(value) > (1 - perc) * self.pixels_per_order:
                self._OrderStatus.add_flag_to_order(order, HIGH_CONTAMINATION("Rejection threshold met in order"))

    if len(self.bad_orders) > 0:
        logger.debug(
            "Frame {} rejected {} orders due for having less than {} valid pixels: {}",
            self.frameID,
            len(self.bad_orders),
            self._internal_configs["reject_order_percentage"],
            ranges(list(self.bad_orders)),
        )

    if self.is_S2D:  # we don't have the SNR for the S1D file!
        bad_SNR = []
        SNRS = self.get_KW_value("orderwise_SNRs")
        for order in range(self.N_orders):
            if SNRS[order] < self._internal_configs["minimum_order_SNR"]:
                self._OrderStatus.add_flag_to_order(order, LOW_SNR("Minimum SNR not met in order"))
                bad_SNR.append(order)

        if len(bad_SNR) > 0:
            logger.info(
                "Frame {} rejected {} orders for having SNR smaller than {}: {}",
                self.frameID,
                len(bad_SNR),
                self._internal_configs["minimum_order_SNR"],
                ranges(bad_SNR),
            )

    if len(self.bad_orders) >= self._internal_configs["MAX_ORDER_REJECTION"] * self.N_orders / 100:
        logger.warning(
            "Frame {} is rejecting more than {} % of the spectral orders",
            self,
            self._internal_configs["MAX_ORDER_REJECTION"],
        )
        self.add_to_status(
            NO_VALID_ORDERS(
                f" Rejected more than {self._internal_configs['MAX_ORDER_REJECTION']} % of spectral orders",
            ),
        )

build_mask(bypass_QualCheck=False, assess_bad_orders=True)

Build a pixel-wise mask for rejection purposes.

Parameters:

Name Type Description Default
bypass_QualCheck bool

If True, Bypass using the QUAL_DATA from the DRS. Defaults to False.

False
assess_bad_orders bool

if True, reject entire spectral orders based on the assess_bad_orders() This can be used if we want to run more pixel-rejection methods before we evaluate bad orders.

True
Source code in src/ASTRA/base_models/Frame.py
def build_mask(self, bypass_QualCheck: bool = False, assess_bad_orders: bool = True) -> None:
    """Build a pixel-wise mask for rejection purposes.

    Args:
        bypass_QualCheck (bool, optional): If True, Bypass using the QUAL_DATA from the DRS. Defaults to False.
        assess_bad_orders (bool, optional): if True, reject entire spectral orders based on the assess_bad_orders()
            This can be used if we want to run more pixel-rejection methods before we evaluate bad orders.
        Defaults to True.

    """
    self.spectral_mask = Mask(initial_mask=np.zeros(self.instrument_properties["array_size"], dtype=np.uint16))
    if not bypass_QualCheck:
        zero_indexes = np.where(self.qual_data != 0)
        self.spectral_mask.add_indexes_to_mask(zero_indexes, QUAL_DATA)

    self.spectral_mask.add_indexes_to_mask(np.where(~np.isfinite(self.spectra)), MISSING_DATA)
    self.spectral_mask.add_indexes_to_mask(np.where(self.spectra == 0), MISSING_DATA)

    if self._internal_configs["REJECT_NEGATIVE_FLUXES"]:
        self.spectral_mask.add_indexes_to_mask(np.where(self.spectra < 0), MISSING_DATA)

    self.spectral_mask.add_indexes_to_mask(np.where(self.uncertainties == 0), MISSING_DATA)
    self.spectral_mask.add_indexes_to_mask(np.where(~np.isfinite(self.uncertainties)), MISSING_DATA)

    order_map = {i: (np.min(self.wavelengths[i]), np.max(self.wavelengths[i])) for i in range(self.N_orders)}
    removal_reasons = [i.name for i in self.wavelengths_to_remove.keys()]
    N_point_removed = []
    time_took = []

    logger.debug(f"Cleaning wavelength regions from {removal_reasons}")

    for removal_reason, wavelengths in self.wavelengths_to_remove.items():
        start_time = time.time()
        nrem = len(wavelengths)

        N_point_removed.append(nrem)
        for wave_pair in wavelengths:
            for order in range(self.N_orders):
                if check_if_overlap(wave_pair, order_map[order]):
                    indexes = np.where(
                        np.logical_and(
                            self.wavelengths[order] >= wave_pair[0],
                            self.wavelengths[order] <= wave_pair[1],
                        ),
                    )
                    self.spectral_mask.add_indexes_to_mask_order(order, indexes, removal_reason)
        time_took.append(time.time() - start_time)
    logger.debug(
        "Removed {} regions ({})",
        sum(N_point_removed),
        " + ".join(map(str, N_point_removed)),
    )

    if self._internal_configs["SIGMA_CLIP_FLUX_VALUES"] > 0:
        logger.info("Sigma-clipping on flux is activated. Running rejection procedure")
        median_level = np.median(self.spectra, axis=1)
        threshold = (
            self._internal_configs["SIGMA_CLIP_FLUX_VALUES"] * self.uncertainties + median_level[:, np.newaxis]
        )
        self.spectral_mask.add_indexes_to_mask(np.where(self.spectra > threshold), MISSING_DATA)

    if self._orderwise_wavelength_rejection is not None:
        logger.info("Rejecting spectral chunks from individual orders")
        for order, region in self._orderwise_wavelength_rejection.items():
            for subregion in region:
                indexes = np.where(
                    np.logical_and(
                        self.wavelengths[order] >= subregion[0],
                        self.wavelengths[order] <= subregion[1],
                    ),
                )
                self.spectral_mask.add_indexes_to_mask_order(order, indexes, NON_COMMON_WAVELENGTH)

    logger.debug("Ensuring that we have increasing wavelengths")

    diffs = np.where(np.diff(self.wavelengths, axis=1) < 0)
    if diffs[0].size > 0:
        logger.warning("Found non-increasing wavelengths on {}", self.name)
        self.spectral_mask.add_indexes_to_mask(diffs, QUAL_DATA("Non-increasing wavelengths"))
    logger.debug("Took {} seconds ({})", sum(time_took), " + ".join(map(str, time_took)))

    if assess_bad_orders:
        self.assess_bad_orders()

    if self.wavelengths_to_keep is not None:
        logger.info("Provided desired wavelength region. Rejecting regions outside it")
        for order in range(self.N_orders):
            good_regions = self.wavelengths_to_keep[order]
            if len(good_regions) == 0:  # TODO: ensure that the order is also rejected
                continue

            inds = np.zeros(self.wavelengths[order].size, dtype=bool)
            for region in good_regions:
                wavelengths_to_keep = np.where(
                    np.logical_and(
                        self.wavelengths[order] >= region[0],
                        self.wavelengths[order] <= region[1],
                    ),
                )
                inds[wavelengths_to_keep] = True
            self.spectral_mask.add_indexes_to_mask_order(order, np.where(~inds), NON_COMMON_WAVELENGTH)

check_header_QC(header)

Check if the header keywords are in accordance with their default value.

Each instrument should do this check on its own

This function will check for two things: 1. Fatal keywords - will mark the Frame as invalid 2. Warning Keywords - the frame is still valid, but it has a warning issued in the logs

If any of those conditions is met, make sure that the flags meet the following naming conditions (so that we can filter by them later on):

For fatal flags

msg = f"QC flag {flag} has taken the bad value of {bad_value}"
self.add_to_status(FATAL_KW(msg))

For warnings:

msg = f"QC flag {flag} meets the bad value"
self._status.store_warning(KW_WARNING(msg))

Source code in src/ASTRA/base_models/Frame.py
def check_header_QC(self, header: fits.header.Header) -> None:
    """Check if the header keywords are in accordance with their default value.

    Each instrument should do this check on its own

    This function will check for two things:
    1. Fatal keywords - will mark the Frame as invalid
    2. Warning Keywords - the frame is still valid, but it has a warning issued in the logs

    If any of those conditions is met, make sure that the flags meet the following naming conditions
    (so that we can filter by them later on):

    For fatal flags
    ```
    msg = f"QC flag {flag} has taken the bad value of {bad_value}"
    self.add_to_status(FATAL_KW(msg))
    ```

    For warnings:
    ```
    msg = f"QC flag {flag} meets the bad value"
    self._status.store_warning(KW_WARNING(msg))
    ```
    """

copy_into_S2D(new_S2D_size=None)

Return a new object which contains the S1D that that has been converted into a S2D.

Parameters:

Name Type Description Default
new_S2D_size Optional[Tuple[int, int]]

Size of the new S2D size, should be a tuple with two

None
elements

(number orders, pixel in order). If it is None, then uses the standard size of S2D files of this

required

Raises:

Type Description
InvalidConfiguration

If it is already in S2D format

Returns:

Name Type Description
Frame Frame

new Frame

Source code in src/ASTRA/base_models/Frame.py
def copy_into_S2D(self, new_S2D_size: Optional[Tuple[int, int]] = None) -> Frame:
    """Return a new object which contains the S1D that that has been converted into a S2D.

    Args:
        new_S2D_size (Optional[Tuple[int, int]], optional): Size of the new S2D size, should be a tuple with two
        elements: (number orders, pixel in order). If it is None, then uses the standard size of S2D files of this
        instrument. Defaults to None.

    Raises:
        custom_exceptions.InvalidConfiguration: If it is already in S2D format

    Returns:
        Frame: new Frame

    """
    if self.is_S2D:
        raise custom_exceptions.InvalidConfiguration("Can't transform S2D file into S2D file")
    logger.warning("Creating a copy of a S1D Frame for transformation into S2D")

    og_shape = self.instrument_properties["array_sizes"]["S2D"] if new_S2D_size is None else new_S2D_size

    reconstructed_S2D = np.zeros(og_shape)
    reconstructed_wavelengths = np.zeros(og_shape)
    reconstructed_uncertainties = np.zeros(og_shape)

    order_number = 0
    order_size = reconstructed_wavelengths[0].size
    to_break = False
    wavelengths, flux, uncertainties, _ = self.get_data_from_full_spectrum()
    wavelengths = wavelengths[0]
    flux = flux[0]
    uncertainties = uncertainties[0]

    while not to_break:
        start_order = order_size * order_number
        end_order = start_order + order_size
        if end_order >= wavelengths.size:
            to_break = True
            end_order = wavelengths.size

        slice_size = end_order - start_order
        reconstructed_wavelengths[order_number] = np.pad(
            wavelengths[start_order:end_order],
            (0, order_size - slice_size),
            constant_values=0,
        )
        reconstructed_S2D[order_number] = np.pad(
            flux[start_order:end_order],
            (0, order_size - slice_size),
            constant_values=0,
        )
        reconstructed_uncertainties[order_number] = np.pad(
            uncertainties[start_order:end_order],
            (0, order_size - slice_size),
            constant_values=0,
        )
        order_number += 1

    # The "new" orders that don't have any information will have a flux of zero. Thus, they will be deemed to
    # be invalid during the mask creation process (that is re-launched after this routine is done)

    # Ensure that we don't lose information due to the SNR cut
    user_configs = self._internal_configs._user_configs
    user_configs["minimum_order_SNR"] = 0

    inst_properties = self.instrument_properties["array_sizes"]
    if new_S2D_size is not None:
        inst_properties["S2D"] = new_S2D_size

    new_frame = Frame(
        inst_name=self.inst_name,
        array_size=inst_properties,
        file_path=self.file_path,
        frameID=self.frameID,
        KW_map=self._KW_map,
        available_indicators=self.available_indicators,
        user_configs=self._internal_configs._user_configs,
    )
    new_frame.wavelengths = reconstructed_wavelengths
    new_frame.spectra = reconstructed_S2D
    new_frame.uncertainties = reconstructed_uncertainties
    for key in ["observation_info", "instrument_properties"]:
        setattr(new_frame, key, getattr(self, key))

    new_frame._spectrum_has_data_on_memory = True  # to avoid new data loads!
    new_frame._never_close = True  # ensure that we don't lose the transformation
    new_frame.spectral_format = "S2D"
    new_frame.instrument_properties["array_size"] = new_S2D_size
    new_frame.array_size = new_S2D_size
    new_frame.sub_instrument = self.sub_instrument
    new_frame.is_blaze_corrected = self.is_blaze_corrected
    new_frame.observation_info["orderwise_SNRs"] = [1 for _ in range(new_S2D_size[0])]
    new_frame.regenerate_order_status()
    return new_frame

finalize_data_load(bad_flag=None)

Run for all Instruments, even those that do not need an external data load.

Checks if the non-fatal Flag "LOADING_EXTERNAL_DATA" exists in the Status. If so, add the fatal Flag "MISSING_EXTERNAL_DATA". Otherwise, does nothing

Source code in src/ASTRA/base_models/Frame.py
def finalize_data_load(self, bad_flag: Optional[Flag] = None) -> None:
    """Run for all Instruments, even those that do not need an external data load.

    Checks if the non-fatal Flag "LOADING_EXTERNAL_DATA" exists in the Status.
    If so, add the fatal Flag "MISSING_EXTERNAL_DATA". Otherwise, does nothing

    """
    if self._status.has_flag(LOADING_EXTERNAL_DATA):
        logger.critical(f"Frame {self.name} did not load the external data that it needed!")

        self._status.delete_flag(LOADING_EXTERNAL_DATA)
        if bad_flag is None:
            self.add_to_status(MISSING_EXTERNAL_DATA)
        else:
            self.add_to_status(bad_flag)

finalized_external_data_load()

Mark frame after everything is loaded into memory.

The frames that need external data will have a Flag of "LOADING_EXTERNAL_DATA" that will translate into a rejection of the Frame (if it is not removed).

This call will remove that flag from Status and sinalizes that this Frame managed to load everything that it needed

Source code in src/ASTRA/base_models/Frame.py
def finalized_external_data_load(self) -> None:
    """Mark frame after everything is loaded into memory.

    The frames that need external data will have a Flag of "LOADING_EXTERNAL_DATA" that will translate into a
    rejection of the Frame (if it is not removed).

    This call will remove that flag from Status and sinalizes that this Frame managed to load everything that
    it needed
    """
    if not self.is_valid:
        logger.warning("Finalizing external data loading for Frame that was already rejected.")
    else:
        self._status.delete_flag(LOADING_EXTERNAL_DATA)

find_instrument_type()

Compare the date of observation with pre-defined sub-Instruments to see where it fits.

Source code in src/ASTRA/base_models/Frame.py
def find_instrument_type(self) -> None:
    """Compare the date of observation with pre-defined sub-Instruments to see where it fits."""
    obs_date = self.get_KW_value("ISO-DATE")
    obs_date = "-".join(obs_date.split("T")).split(":")[0]
    obs_date = datetime.datetime.strptime(obs_date, r"%Y-%m-%d-%H")

    for key, threshold in self.__class__.sub_instruments.items():
        # If it is not higher tha  the threshold, then it beleongs in this "interval"
        if not obs_date > threshold:
            self.sub_instrument = key
            break
    else:
        raise custom_exceptions.InternalError("no sub-instrument found for observation")

get_KW_value(KW)

Get a given KW value that is defined in the common framework.

Source code in src/ASTRA/base_models/Frame.py
def get_KW_value(self, KW: str) -> Any:
    """Get a given KW value that is defined in the common framework."""
    return self.observation_info[KW]

get_S1D_name()

Build the S1D name that should be associated with this Frame.

If it is already a S1D, returns the actual name. If it is not, remove "blaze" from the filename and replaces "S2D" with "S1D"

Source code in src/ASTRA/base_models/Frame.py
def get_S1D_name(self) -> str:
    """Build the S1D name that should be associated with this Frame.

    If it is already a S1D, returns the actual name.
    If it is not, remove "blaze" from the filename and replaces "S2D" with "S1D"

    """
    # TODO: this will not work for non-ESPRESSO files

    if self.is_S1D:
        return self.fname
    name = self.fname
    return name.replace("BLAZE_", "").replace("S2D", "S1D")

get_header_value(kw)

Directly retrieves a KW from the header.

After this is called, the frame will keep the header stored in memory until the object is deleted

Parameters:

Name Type Description Default
kw str

Keyword name, present in the fits header

required

Returns:

Name Type Description
Any Any

Header value

Source code in src/ASTRA/base_models/Frame.py
def get_header_value(self, kw: str) -> Any:
    """Directly retrieves a KW from the header.

    After this is called, the frame will keep the header stored in memory until the object is deleted

    Args:
        kw (str): Keyword name, present in the fits header

    Returns:
        Any: Header value

    """
    if self._header is None:
        self._header = fits.getheader(self.file_path)
    return self._header[kw]

get_spectral_type()

Check the filename to see if we are using an S1D or S2D file.

Raises:

Type Description
InternalError

If it is not possible to recognize the filename

Returns:

Name Type Description
str str

S1D or S2D

Source code in src/ASTRA/base_models/Frame.py
def get_spectral_type(self) -> str:
    """Check the filename to see if we are using an S1D or S2D file.

    Raises:
        custom_exceptions.InternalError: If it is not possible to recognize the filename

    Returns:
        str: S1D or S2D

    """
    name_lowercase = self.file_path.stem.lower()
    if "s2d" in name_lowercase or "e2ds" in name_lowercase:
        return "S2D"
    if "s1d" in name_lowercase:
        return "S1D"
    raise custom_exceptions.InternalError(f"{self.name} can't recognize the file that it received!")

import_KW_from_outside(KW, value, optional)

Allow to manually override header parameters (in memory) from the outside.

This can be used if an instrument has data stored in multiple files. This allows a post-setup update of header values (for the keywords stored in observation_info)

Parameters:

Name Type Description Default
KW str

keyword name, as defined by the Frame interface

required
value Any

New value

required
optional bool

if it is optional, it can be a non-finite value

required

Raises:

Type Description
FrameError

If we attempt to load a optional=False keyword that has a non-finite value

Source code in src/ASTRA/base_models/Frame.py
def import_KW_from_outside(self, KW: str, value: Any, optional: bool) -> None:
    """Allow to manually override header parameters (in memory) from the outside.

    This can be used if an instrument has data stored in multiple files. This allows a post-setup
    update of header values (for the keywords stored in observation_info)

    Args:
        KW (str): keyword name, as defined by the Frame interface
        value (Any): New value
        optional (bool): if it is optional, it can be a non-finite value

    Raises:
        FrameError: If we attempt to load a optional=False keyword that has a non-finite value

    """
    if KW not in self.observation_info:
        logger.critical(
            "Keyword <{}> is not supported by the Frames. Couldn't load it from the outside",
            KW,
        )

    if not np.isfinite(value):
        if not optional:
            logger.critical(
                "Loaded mandatory keyword <{}> with a non-finite value for frame {}",
                KW,
                self.fname,
            )
            raise FrameError
        logger.critical(
            "Loaded keyword <{}> has a non-finite value for frame {}",
            KW,
            self.fname,
        )
    self.observation_info[KW] = value

is_Instrument(Instrument)

Check if Frame is from a given instrument.

Source code in src/ASTRA/base_models/Frame.py
def is_Instrument(self, Instrument: str) -> bool:
    """Check if Frame is from a given instrument."""
    return self.inst_name == Instrument

is_SubInstrument(sub_instrument)

Check if the current instrument is from the given time_block (e.g ESPRESSO18/ESPRESSO19).

Parameters

sub_instrument : str Name of the time block that is going to be checked

Returns

[bool] Results from the comparison

Source code in src/ASTRA/base_models/Frame.py
def is_SubInstrument(self, sub_instrument: str) -> bool:
    """Check if the current instrument is from the given time_block (e.g ESPRESSO18/ESPRESSO19).

    Parameters
    ----------
    sub_instrument : str
        Name of the time block that is going to be checked

    Returns
    -------
    [bool]
        Results from the comparison

    """
    return self.sub_instrument == sub_instrument

load_S1D_data()

To be overriden by the children classes.

Source code in src/ASTRA/base_models/Frame.py
def load_S1D_data(self) -> None:
    """To be overriden by the children classes."""
    logger.debug("Opening the S1D arrays from {}", self.fname)
    if not self.is_valid:
        raise FrameError
    self._spectrum_has_data_on_memory = True

load_S2D_data()

To be overriden by the children classes.

Source code in src/ASTRA/base_models/Frame.py
def load_S2D_data(self) -> None:
    """To be overriden by the children classes."""
    logger.debug("Opening the S2D arrays from {}", self.fname)
    if not self.is_valid:
        raise FrameError
    self._spectrum_has_data_on_memory = True

load_data()

Abstraction to load all data of this Frame.

If the Frame is already open, it does nothing. Calls the S1D or S2D version of the data load, depending on file type Can remove BERV correction at run time, if properly configured to do so.

Raises:

Type Description
InternalError

If it is neither S2D or S1D

FrameError

If the frame is no longer valid after loading

Source code in src/ASTRA/base_models/Frame.py
def load_data(self) -> None:
    """Abstraction to load all data of this Frame.

    If the Frame is already open, it does nothing.
    Calls the S1D or S2D version of the data load, depending on file type
    Can remove BERV correction at run time, if properly configured to do so.

    Raises:
        custom_exceptions.InternalError: If it is neither S2D or S1D
        FrameError: If the frame is no longer valid after loading

    """
    if self.is_open:
        return

    if self.is_S1D:
        self.load_S1D_data()
    elif self.is_S2D:
        self.load_S2D_data()
    else:
        raise custom_exceptions.InternalError("something went wrong on this frame")

    if not self.is_valid:
        raise FrameError("Frame is no longer valid")

    BERV_value = self.get_KW_value("BERV")

    if not self._internal_configs["open_without_BervCorr"]:
        self.apply_BERV_correction(BERV_value)
    else:
        logger.warning(f"Opening {self.name} without the BERV correction")
        self.remove_BERV_correction(BERV_value)

load_header_info()

Open the header of the fits file and load the necessary keywords.

Does the following operations: 1) Load header assuming fits file 2) Parse through the _KW_map to load header keywords 3) Call self.load_instrument_specific_KWs 4) Call check_header_QC(hdu) 5) Call find_instrument_type() 6) Call assess_bad_orders()

Source code in src/ASTRA/base_models/Frame.py
def load_header_info(self) -> None:
    """Open the header of the fits file and load the necessary keywords.

    Does the following operations:
    1) Load header assuming fits file
    2) Parse through the _KW_map to load header keywords
    3) Call self.load_instrument_specific_KWs
    4) Call check_header_QC(hdu)
    5) Call find_instrument_type()
    6) Call assess_bad_orders()

    """
    try:
        hdu = fits.getheader(self.file_path)
    except FileNotFoundError:
        msg = f"File <{self.file_path}> does not exist"
        self.add_to_status(MISSING_FILE(msg))
        logger.critical(msg)
        return

    for internal_KW, S2D_KW in self._KW_map.items():
        self.observation_info[internal_KW] = hdu[S2D_KW]

    self.load_instrument_specific_KWs(hdu)
    self.check_header_QC(hdu)
    self.find_instrument_type()
    self.assess_bad_orders()

load_instrument_specific_KWs(header)

Load instrument-specific KW values that can't be loaded in a general fashion.

To be overriden by the different instruments

Parameters:

Name Type Description Default
header Mapping[str, Any]

header unit of this observation

required
Source code in src/ASTRA/base_models/Frame.py
def load_instrument_specific_KWs(self, header: Mapping[str, Any]) -> None:
    """Load instrument-specific KW values that can't be loaded in a general fashion.

    To be overriden by the different instruments

    Args:
        header (Mapping[str, Any]): header unit of this observation

    """

mark_wavelength_region(reason, wavelength_blocks)

Add wavelength regions to be removed whenever the S2D file is opened.

When rejecting wavelengths through this function, we only have to specify wavelength intervels, allowing to account for possible order overlap. When loading the Frame, we search through all orders to find any occurence of this wavelength blocks.

Parameters

reason : Flag Flag for the removal type wavelength_blocks : list[tuple[int, int]] List with lists of wavelength limits. [[lambda_0, lambda_1], [lambda_2, lambda_3]] to reject.z

Source code in src/ASTRA/base_models/Frame.py
def mark_wavelength_region(self, reason: Flag, wavelength_blocks: list[tuple[int, int]]) -> None:
    """Add wavelength regions to be removed whenever the S2D file is opened.

    When rejecting wavelengths through this function, we only have to specify wavelength intervels, allowing
    to account for possible order overlap. When loading the Frame, we search through all orders to find any
    occurence of this wavelength blocks.

    Parameters
    ----------
    reason : Flag
        Flag for the removal type
    wavelength_blocks : list[tuple[int, int]]
        List with lists of wavelength limits. [[lambda_0, lambda_1], [lambda_2, lambda_3]] to reject.z\

    """
    self.wavelengths_to_remove[reason] = wavelength_blocks

plot_spectra(which_orders=None, axis=None)

Plot the spectra.

Parameters:

Name Type Description Default
which_orders None | DETECTOR_DEFINITION | list[int]

Either a pre-configured

None
axis _type_

if None, create a new figure. Otherwise, use this one. Defaults to None.

None
Source code in src/ASTRA/base_models/Frame.py
def plot_spectra(
    self,
    which_orders: None | DETECTOR_DEFINITION | list[int] = None,
    axis=None,
):
    """Plot the spectra.

    Args:
        which_orders (None | DETECTOR_DEFINITION | list[int], optional): Either a pre-configured
        detector definition, a list of orders, or None (plots all orders). Defaults to None.
        axis (_type_, optional): if None, create a new figure. Otherwise, use this one. Defaults to None.

    """
    fig = None
    if axis is None:
        fig, axis = plt.subplots()
    wf, ff, ef, mf = self.get_data_from_full_spectrum()

    if which_orders is None:
        which_orders = DETECTOR_DEFINITION.WHITE_LIGHT

    if isinstance(which_orders, (list, tuple)):
        orders_to_plot = which_orders
    else:
        orders_to_plot = self.order_intervals[which_orders]

    for sl in orders_to_plot:
        w, f, e, m = wf[sl], ff[sl], ef[sl], mf[sl]
        axis.errorbar(w[~m], f[~m], e[~m], ls="", marker="x")

    return fig, axis

reject_wavelength_region_from_order(order, region)

Flag a wavelength region from specific order to be marked as invalid during the creation of the stellar mask.

This will not account for order overlaps.

Parameters:

Name Type Description Default
order _type_

description

required
region _type_

description

required

Raises:

Type Description
InvalidConfiguration
Source code in src/ASTRA/base_models/Frame.py
def reject_wavelength_region_from_order(self, order: int, region: list[tuple[int, int]]) -> None:
    """Flag a wavelength region from specific order to be marked as invalid during the creation of the stellar mask.

    This will not account for order overlaps.

    Args:
        order (_type_): _description_
        region (_type_): _description_

    Raises:
        custom_exceptions.InvalidConfiguration:

    """
    if not isinstance(region, (Iterable,)):
        raise custom_exceptions.InvalidConfiguration("The rejection region must be a list of lists")

    if self._orderwise_wavelength_rejection is None:
        self._orderwise_wavelength_rejection = {}
    self._orderwise_wavelength_rejection[order] = region

select_wavelength_region(order, wavelength_blocks)

Reject all wavelengths that are not part of the provided intervals.

Parameters:

Name Type Description Default
order int

Spectral order

required
wavelength_blocks list[list[int]]

List of tuples, each containing wavelength of start and end

required
Source code in src/ASTRA/base_models/Frame.py
def select_wavelength_region(self, order: int, wavelength_blocks: list[tuple[int, int]]) -> None:
    """Reject all wavelengths that are not part of the provided intervals.

    Args:
        order (int): Spectral order
        wavelength_blocks (list[list[int]]): List of tuples, each containing wavelength of start and end
        of each "good" interval

    """
    if self.wavelengths_to_keep is None:
        self.wavelengths_to_keep = {}
    self.wavelengths_to_keep[order] = wavelength_blocks

Currently available interfaces to spectrographs

ESPRESSO

Bases: ESO_PIPELINE

Interface to handle ESPRESSO observations (S2D and S1D).

With ESPRESSO data we are considering 3 sub-Instruments:

  • ESPRESSO18 - Before 2019-06-27
  • ESPRESSO19 - Before 2020-12-18
  • ESPRESSO21 - Until the ends of time (hopefully)

User parameters:

================================ ================ ================ ================ ================ Parameter name Mandatory Default Value Valid Values Comment ================================ ================ ================ ================ ================ ================================ ================ ================ ================ ================

.. note:: Also check the User parameters of the parent classes for further customization options of SBART

Source code in src/ASTRA/Instruments/ESPRESSO.py
class ESPRESSO(ESO_PIPELINE):
    """Interface to handle ESPRESSO observations (S2D and S1D).

    With ESPRESSO data we are considering 3 sub-Instruments:

    * ESPRESSO18 - Before  2019-06-27
    * ESPRESSO19 - Before  2020-12-18
    * ESPRESSO21 - Until the ends of time (hopefully)


    **User parameters:**

    ================================ ================ ================ ================ ================
    Parameter name                      Mandatory      Default Value    Valid Values    Comment
    ================================ ================ ================ ================ ================
    ================================ ================ ================ ================ ================

    .. note::
        Also check the **User parameters** of the parent classes for further customization options of SBART

    """

    _default_params = ESO_PIPELINE._default_params

    sub_instruments = {
        "ESPRESSO18": datetime.datetime.strptime("2019-06-27", r"%Y-%m-%d"),
        "ESPRESSO19": datetime.datetime.max,
        # "ESPRESSO19": datetime.datetime.strptime("2020-12-18", r"%Y-%m-%d"),
    }
    _name = "ESPRESSO"

    order_intervals: dict[DETECTOR_DEFINITION, slice] = {
        DETECTOR_DEFINITION.WHITE_LIGHT: list(range(170)),
        DETECTOR_DEFINITION.RED_DET: list(range(90, 170)),
        DETECTOR_DEFINITION.BLUE_DET: list(range(0, 90)),
    }

    def __init__(
        self,
        file_path,
        user_configs: Optional[Dict[str, Any]] = None,
        reject_subInstruments: Optional[Iterable[str]] = None,
        frameID: Optional[int] = None,
        quiet_user_params: bool = True,
    ):
        """ESPRESSO interface.

        Parameters
        ----------
        file_path
            Path to the S2D (or S1D) file.
        user_configs
            Dictionary whose keys are the configurable options of ESPRESSO (check above)
        reject_subInstruments
            Iterable of subInstruments to fully reject
        frameID
            ID for this observation. Only used for organization purposes by :class:`~SBART.data_objects.DataClass`

        """
        # Wavelength coverage

        coverage = (350, 900)

        super().__init__(
            inst_name="ESPRESSO",
            array_size={"S2D": (170, 9111), "S1D": (1, 443262)},
            file_path=file_path,
            frameID=frameID,
            KW_identifier="ESO",
            user_configs=user_configs,
            reject_subInstruments=reject_subInstruments,
            quiet_user_params=quiet_user_params,
        )

        self.instrument_properties["wavelength_coverage"] = coverage
        self.instrument_properties["resolution"] = 140_000
        self.instrument_properties["EarthLocation"] = EarthLocation.of_site("Cerro Paranal")
        self.instrument_properties["is_drift_corrected"] = True

        # https://www.eso.org/sci/facilities/paranal/astroclimate/site.html
        self.instrument_properties["site_pressure"] = 750

    def load_telemetry_info(self, header):
        # Find the UT number and load the airmass
        for i in range(1, 5):
            try:
                self.observation_info["airmass"] = header[f"HIERARCH ESO TEL{i} AIRM START"]
                self.UT_number = i
                break
            except KeyError as e:
                if i == 4:
                    msg = "\tCannot find ESO TELx AIRM START key"
                    raise KeyError(msg) from e

        # Environmental KWs for telfit (also needs airmassm previously loaded)
        ambi_KWs = {
            "relative_humidity": "AMBI RHUM",
            "ambient_temperature": "AMBI TEMP",
            "seeing": "AMBI FWHM START",
        }

        for name, endKW in ambi_KWs.items():
            self.observation_info[name] = float(header[f"HIERARCH ESO TEL{self.UT_number} {endKW}"])
            if "temperature" in name:  # store temperature in KELVIN for TELFIT
                self.observation_info[name] = convert_temperature(
                    self.observation_info[name],
                    old_scale="Celsius",
                    new_scale="Kelvin",
                )

        self.observation_info["DET_BINX"] = header["HIERARCH ESO DET BINX"]
        self.observation_info["DET_BINY"] = header["HIERARCH ESO DET BINY"]

    def check_header_QC_ESO_DRS(self, header):
        nonfatal_QC_flags = {
            "HIERARCH ESO INS{} ADC{} RA": 0,  # related with ADC2 problem
            "HIERARCH ESO INS{} ADC{} dec": 0,  # related with ADC2 problem
            "HIERARCH ESO INS{} ADC{} SENS1": 0,  # related with ADC2 problem
            "HIERARCH ESO INS{} ADC{} TEMP": 0,  # related with ADC2 problem
        }
        found_ADC_issue = False
        for flag, bad_value in nonfatal_QC_flags.items():
            found_UT = False
            for UT_KW in ["", "2", "3", "4"]:
                try:
                    for ADC in [1, 2]:
                        ADC_KW = flag.format(UT_KW, ADC)
                        if header[ADC_KW] == bad_value:
                            msg = f"QC flag {ADC_KW} has a value of {bad_value}"
                            logger.warning(msg)
                            self._status.store_warning(KW_WARNING(msg))
                            found_ADC_issue = True
                        found_UT = True
                except:
                    pass
            if not found_UT:
                logger.critical(f"Did not find the entry for the following UT related metric: {flag}")

        if found_ADC_issue:
            self._status.store_warning(KW_WARNING("ADC2 issues found"))

        super().check_header_QC_ESO_DRS(header)

    def build_mask(self, bypass_QualCheck: bool = False) -> None:
        super().build_mask(bypass_QualCheck=bypass_QualCheck, assess_bad_orders=False)

        if self.spectral_format == "S2D":
            # the first two orders of the RED CCD have a large amount of noise in the beginning so we remove a
            # portion from the start of those two orders Now, what is going on: we want to find the indexes,
            # from order 90 and 91 that are below the 5230 \AA
            inds = np.where(self.wavelengths[90:92, :] <= 5230)
            # numpy where returns the indexes assuming the zero to be 90 and the one to be 91. Remember that we
            # sliced the array to only remove from those two orders
            inds_1 = np.where(inds[0], 91, 90)
            # rebuild the 'numpy where' output, to pass to the mask as the proper output
            inds = (inds_1, inds[1])
            self.spectral_mask.add_indexes_to_mask(inds, ERROR_THRESHOLD)

        self.assess_bad_orders()

    def trigger_data_storage(self, *args, **kwargs):
        super().trigger_data_storage(*args, **kwargs)

HARPS

Bases: ESO_PIPELINE

Interface to handle HARPS data; S1D not supported.

This class also defines 2 sub-Instruments:

  • HARPSpre - Before 2015-05-29
  • HARPSpost - Until the ends of time (hopefully)

The steps to load the S2D data are described in the HARPS DRS manual <https://www.eso.org/sci/facilities/lasilla/instruments/harps/doc/DRS.pdf>_. The summary is:

- Construct the wavelength solution & correct from BERV
- Load instrumental drift
- Construct flux noises:

    - Table 10 of `the user manual <https://www.eso.org/sci/facilities/lasilla/instruments/harps/doc/manual/HARPS-UserManual2.4.pdf>`_ gives max RON of 7.07 for red detector
    - Noise = sqrt(obj + sky + n*dark*expTime + nBinY*ron^2)

User parameters:

Currently there are no HARPS-specific parameters

Note: Check the User parameters of the parent classes for further customization options of SBART

Source code in src/ASTRA/Instruments/HARPS.py
class HARPS(ESO_PIPELINE):
    """Interface to handle HARPS data; S1D **not** supported.

    This class also defines 2 sub-Instruments:

    * HARPSpre - Before  2015-05-29
    * HARPSpost - Until the ends of time (hopefully)

    The steps to load the S2D data are described in the HARPS `DRS manual <https://www.eso.org/sci/facilities/lasilla/instruments/harps/doc/DRS.pdf>`_. The summary is:

        - Construct the wavelength solution & correct from BERV
        - Load instrumental drift
        - Construct flux noises:

            - Table 10 of `the user manual <https://www.eso.org/sci/facilities/lasilla/instruments/harps/doc/manual/HARPS-UserManual2.4.pdf>`_ gives max RON of 7.07 for red detector
            - Noise = sqrt(obj + sky + n*dark*expTime + nBinY*ron^2)

    **User parameters:**

     Currently there are no HARPS-specific parameters

    *Note:* Check the **User parameters** of the parent classes for further customization options of SBART

    """

    sub_instruments = {
        "HARPS03": datetime.datetime.strptime("2015-05-29", r"%Y-%m-%d"),
        "HARPS15": datetime.datetime.max,
    }

    _name = "HARPS"
    _default_params = ESO_PIPELINE._default_params

    order_intervals: dict[DETECTOR_DEFINITION, slice] = {
        DETECTOR_DEFINITION.WHITE_LIGHT: list(range(0, 71)),
        DETECTOR_DEFINITION.RED_DET: list(range(47, 71)),
        DETECTOR_DEFINITION.BLUE_DET: list(range(0, 47)),
    }

    def __init__(
        self,
        file_path,
        user_configs: Optional[Dict[str, Any]] = None,
        reject_subInstruments=None,
        frameID=None,
        quiet_user_params: bool = True,
    ):
        """HARPS constructor.

        Parameters
        ----------
        file_path
            Path to the S2D (or S1D) file.
        user_configs
            Dictionary whose keys are the configurable options of ESPRESSO (check above)
        reject_subInstruments
            Iterable of subInstruments to fully reject
        frameID
            ID for this observation. Only used for organization purposes by :class:`~SBART.data_objects.DataClass`

        """
        logger.info(f"Creating frame from: {file_path}")

        coverage = [350, 700]
        # Note: 46 blue orders and 26 red orders. From Table 2.2 of:
        # https://www.eso.org/sci/facilities/lasilla/instruments/harps/doc/manual/HARPS-UserManual2.4.pdf
        if user_configs["use_old_pipeline"]:
            mat_size = (72, 4096)
            KW_map = {
                "OBJECT": "OBJECT",
                "BJD": "HIERARCH ESO DRS BJD",
                "MJD": "MJD-OBS",
                "ISO-DATE": "DATE-OBS",
                "DRS-VERSION": "HIERARCH ESO DRS VERSION",
                "RA": "RA",
                "DEC": "DEC",
                "MD5-CHECK": "DATASUM",
                "SPEC_TYPE": None,
                "DRS_CCF_MASK": None,
                "DRS_FLUX_CORRECTION_TEMPLATE": None,
            }
        else:
            mat_size = (71, 4096)
            KW_map = {}

        if user_configs["use_old_pipeline"]:
            file_path, self.ccf_path, self.BIS_file, search_status = self.find_files(file_path)

        super().__init__(
            inst_name="HARPS",
            array_size={"S2D": mat_size},
            file_path=file_path,
            KW_identifier="ESO",
            frameID=frameID,
            user_configs=user_configs,
            reject_subInstruments=reject_subInstruments,
            quiet_user_params=quiet_user_params,
            override_KW_map=KW_map,
            override_indicators=("CONTRAST", "FWHM"),
        )

        if user_configs["use_old_pipeline"] and not search_status.is_good_flag:
            self.add_to_status(search_status)

        self.instrument_properties["wavelength_coverage"] = coverage

        if user_configs["use_old_pipeline"]:
            self.instrument_properties["is_drift_corrected"] = False
            self.is_BERV_corrected = False

        self.instrument_properties["resolution"] = 115_000
        self.instrument_properties["EarthLocation"] = EarthLocation.of_site("La Silla Observatory")
        # ? same as for Paranal?
        # https://www.eso.org/sci/facilities/paranal/astroclimate/site.html
        self.instrument_properties["site_pressure"] = 750
        self.is_blaze_corrected = False

    def load_telemetry_info(self, header):
        """Loads (at least) the following keywords:

        - relative humidity
        - ambient temperature, in Celsius
        - airmass
        - Detector

        Parameters
        ----------
        header

        Returns
        -------

        """
        ambi_KWs = {
            "relative_humidity": "RHUM",
            "ambient_temperature": "TEMP",
        }

        for name, endKW in ambi_KWs.items():
            self.observation_info[name] = header[f"HIERARCH {self.KW_identifier} TEL AMBI {endKW}"]
            if "temperature" in name:  # store temperature in KELVIN for TELFIT
                self.observation_info[name] = convert_temperature(
                    self.observation_info[name],
                    old_scale="Celsius",
                    new_scale="Kelvin",
                )

        if self.observation_info["relative_humidity"] == 255:
            logger.warning(f"{self.name} has an invalid value in the humidity sensor...")
            self.observation_info["relative_humidity"] = np.nan

        self.observation_info["airmass"] = header["HIERARCH ESO TEL AIRM START"]

    def find_files(self, file_name):
        """Find the CCF and S2D files and BIS files, which should be stored inside the same folder"""
        logger.debug("Searching for the ccf and e2ds files")

        search_status = MISSING_DATA("Missing the ccf file")
        ccf_path = None
        bis_path = None

        if os.path.isdir(file_name):
            logger.debug("Received a folder, searching inside for necessary files")
            # search for e2ds file
            folder_name = file_name

            e2ds_files = glob.glob(os.path.join(folder_name, "*e2ds_A.fits"), recursive=True)
            ccf_files = glob.glob(os.path.join(folder_name, "*ccf*A.fits"), recursive=True)
            bis_files = glob.glob(os.path.join(folder_name, "*bis*A.fits"), recursive=True)

            for name, elems in [
                ("e2ds_A", e2ds_files),
                ("ccf", ccf_files),
                ("bis", bis_files),
            ]:
                if len(elems) > 1:
                    msg = f"HARPS data only received folder name and it has more than 1 {name} file in it"
                    raise custom_exceptions.InvalidConfiguration(msg)

                if len(elems) < 1:
                    msg = f"HARPS data only received folder name and it has no {name} file in it"
                    if name != "bis":
                        # The BIS file is not critical for the run
                        raise custom_exceptions.InvalidConfiguration(msg)
                    logger.critical(msg)

            e2ds_path = e2ds_files[0]
            ccf_path = ccf_files[0]
            bis_path = bis_files[0]
            search_status = SUCCESS("Found all input files")
        else:
            logger.debug("Received path of E2DS file; searching for CCF with matching name")
            folder_name = os.path.dirname(file_name)
            e2ds_path = file_name
            file_start, *_ = os.path.basename(file_name).split("_")

            found_CCF = False
            found_BIS = False
            ccf_files = glob.glob(os.path.join(folder_name, "*ccf*A.fits"), recursive=True)
            bis_files = glob.glob(os.path.join(folder_name, "*bis*A.fits"), recursive=True)
            for file in ccf_files:
                if file_start in file:
                    ccf_path = file
                    found_CCF = True

            for file in bis_files:
                if file_start in file:
                    bis_path = file
                    found_BIS = True
            if found_CCF:
                logger.info(f"Found CCF file: {ccf_path}")
                search_status = SUCCESS("Found CCF file")
            else:
                logger.critical("Was not able to find CCF file. Marking frame as invalid")
                ccf_path = ""

            if not found_BIS:
                bis_path = None

        return e2ds_path, ccf_path, bis_path, search_status

    def build_HARPS_wavelengths(self, hdr):
        """Compute the wavelength solution to this given spectra (EQ 4.1 of DRS manual)
        Convert from air wavelenbgths to vacuum
        """
        # degree of the polynomial
        d = hdr["HIERARCH ESO DRS CAL TH DEG LL"]
        # number of orders
        omax = hdr.get("HIERARCH ESO DRS CAL LOC NBO", self.array_size[0])
        xmax = self.array_size[1]

        # matrix X:
        #
        # axis 0: the entry corresponding to each coefficient
        # axis 1: each pixel number

        x = np.empty((d + 1, xmax), "int64")
        x[0].fill(1)  # x[0,*] = x^0 = 1,1,1,1,1,...
        x[1] = np.arange(xmax)

        for i in range(1, d):
            x[i + 1] = x[i] * x[1]

        # matrix A:
        #
        # axis 0: the different orders
        # axis 1: all coefficients for the given order

        A = np.reshape(
            [hdr["HIERARCH ESO DRS CAL TH COEFF LL" + str(i)] for i in range(omax * (d + 1))],
            (omax, d + 1),
        )  # slow 30 ms

        # the wavelengths for each order are a simple dot product between the coefficients and pixel-wise data (X)
        wavelengths = np.dot(A, x)

        vacuum_wavelengths = airtovac(wavelengths)
        return vacuum_wavelengths

    def _load_old_DRS_KWs(self, header):
        if not self._internal_configs["use_old_pipeline"]:
            raise custom_exceptions.InvalidConfiguration("Can't load data from old pipeline without the config")

        self.observation_info["MAX_BERV"] = header["HIERARCH ESO DRS BERVMX"] * kilometer_second
        self.observation_info["BERV"] = header["HIERARCH ESO DRS BERV"] * kilometer_second

        # Environmental KWs for telfit (also needs airmassm previously loaded)
        ambi_KWs = {
            "relative_humidity": "AMBI RHUM",
            "ambient_temperature": "AMBI TEMP",
        }

        for name, endKW in ambi_KWs.items():
            self.observation_info[name] = header[f"HIERARCH ESO TEL {endKW}"]
            if "temperature" in name:  # store temperature in KELVIN for TELFIT
                self.observation_info[name] = convert_temperature(
                    self.observation_info[name],
                    old_scale="Celsius",
                    new_scale="Kelvin",
                )

        for order in range(self.N_orders):
            self.observation_info["orderwise_SNRs"].append(header[f"HIERARCH ESO DRS SPE EXT SN{order}"])

        self.observation_info["airmass"] = header["HIERARCH ESO TEL AIRM START"]

        bad_drift = False
        try:
            flag = "HIERARCH ESO DRS DRIFT QC"
            if header[flag].strip() != "PASSED":
                bad_drift = True
                msg = f"QC flag {flag} meets the bad value"
                logger.warning(msg)
                self._status.store_warning(KW_WARNING(msg))
            else:
                # self.logger.info("DRIFT QC has passed")
                drift = header["HIERARCH ESO DRS DRIFT RV USED"] * meter_second
                drift_err = header["HIERARCH ESO DRS DRIFT NOISE"] * meter_second
        except Exception:
            bad_drift = True
            logger.warning("DRIFT KW does not exist")

        if bad_drift:
            logger.warning("Due to previous drift-related problems, setting it to zero [m/s]")
            drift = 0 * meter_second
            drift_err = 0 * meter_second

        self.observation_info["drift"] = drift
        self.observation_info["drift_ERR"] = drift_err
        self.load_ccf_data()

    def load_ccf_data(self) -> None:
        """Load the necessarfy CCF data from the file!"""
        logger.debug("Loading data from the ccf file")
        header = fits.getheader(self.ccf_path)

        self.observation_info["DRS_RV"] = header["HIERARCH ESO DRS CCF RV"] * kilometer_second
        self.observation_info["SPEC_TYPE"] = header["HIERARCH ESO DRS CCF MASK"]

        RV_err = np.sqrt(
            header["HIERARCH ESO DRS CAL TH ERROR"] ** 2
            +
            # hdulist[0].header['HIERARCH ESO DRS DRIFT NOISE']**2   +
            (1000 * header["HIERARCH ESO DRS CCF NOISE"]) ** 2,
        )
        self.observation_info["DRS_RV_ERR"] = RV_err * meter_second

        for key in self.available_indicators:
            full_key = "HIERARCH ESO DRS CCF " + key
            self.observation_info[key] = header[full_key]

        # We are missing error in CONTRAST!
        self.observation_info["FWHM_ERR"] = convert_data(
            2.35 * RV_err * meter_second,
            new_units=kilometer_second,
            as_value=True,
        )
        self.observation_info["BIS SPAN_ERR"] = convert_data(
            np.sqrt(2) * RV_err * meter_second,
            new_units=kilometer_second,
            as_value=True,
        )

        if self.BIS_file is not None:
            head = fits.getheader(self.BIS_file)
            self.observation_info["BIS SPAN"] = head["HIERARCH ESO DRS BIS SPAN"]

    def load_S1D_data(self) -> Mask:
        raise NotImplementedError

    def check_header_QC_old_DRS(self, header):
        logger.info("Currently missing QC checks for the old DRS")

    def load_old_DRS_S2D(self):
        """Loads the spectra

        Returns
        -------

        """
        super().load_S2D_data()

        with fits.open(self.file_path) as hdulist:
            # Compute the wavelength solution + BERV correction
            wave_from_file = self.build_HARPS_wavelengths(hdulist[0].header)

            sci_data = hdulist[0].data  # spetra from all orders

            # photon noise + estimate of max value for the rest
            # from ETC calculator the readout noise should be the largest contribution
            # assuming that it is of ~7e- (equal to manual) it should have a maximum contribution
            # of 200
            flux_errors = np.sqrt(250 + np.abs(sci_data, dtype=float))

            # Validate for overflows and missing data
            quality_data = np.zeros(sci_data.shape)
            quality_data[np.where(np.isnan(sci_data))] = NAN_DATA.code
            quality_data[np.where(sci_data > 300000)] = SATURATION.code
            quality_data[np.where(sci_data < -3 * flux_errors)] = ERROR_THRESHOLD.code

        self.spectra = sci_data.astype(np.float64)
        self.wavelengths = wave_from_file
        self.qual_data = quality_data
        self.uncertainties = flux_errors.astype(np.float64)

        self.build_mask(bypass_QualCheck=False)
        return 1

    def close_arrays(self):
        super().close_arrays()
        if self._internal_configs["use_old_pipeline"]:
            self.is_BERV_corrected = False

build_HARPS_wavelengths(hdr)

Compute the wavelength solution to this given spectra (EQ 4.1 of DRS manual) Convert from air wavelenbgths to vacuum

Source code in src/ASTRA/Instruments/HARPS.py
def build_HARPS_wavelengths(self, hdr):
    """Compute the wavelength solution to this given spectra (EQ 4.1 of DRS manual)
    Convert from air wavelenbgths to vacuum
    """
    # degree of the polynomial
    d = hdr["HIERARCH ESO DRS CAL TH DEG LL"]
    # number of orders
    omax = hdr.get("HIERARCH ESO DRS CAL LOC NBO", self.array_size[0])
    xmax = self.array_size[1]

    # matrix X:
    #
    # axis 0: the entry corresponding to each coefficient
    # axis 1: each pixel number

    x = np.empty((d + 1, xmax), "int64")
    x[0].fill(1)  # x[0,*] = x^0 = 1,1,1,1,1,...
    x[1] = np.arange(xmax)

    for i in range(1, d):
        x[i + 1] = x[i] * x[1]

    # matrix A:
    #
    # axis 0: the different orders
    # axis 1: all coefficients for the given order

    A = np.reshape(
        [hdr["HIERARCH ESO DRS CAL TH COEFF LL" + str(i)] for i in range(omax * (d + 1))],
        (omax, d + 1),
    )  # slow 30 ms

    # the wavelengths for each order are a simple dot product between the coefficients and pixel-wise data (X)
    wavelengths = np.dot(A, x)

    vacuum_wavelengths = airtovac(wavelengths)
    return vacuum_wavelengths

find_files(file_name)

Find the CCF and S2D files and BIS files, which should be stored inside the same folder

Source code in src/ASTRA/Instruments/HARPS.py
def find_files(self, file_name):
    """Find the CCF and S2D files and BIS files, which should be stored inside the same folder"""
    logger.debug("Searching for the ccf and e2ds files")

    search_status = MISSING_DATA("Missing the ccf file")
    ccf_path = None
    bis_path = None

    if os.path.isdir(file_name):
        logger.debug("Received a folder, searching inside for necessary files")
        # search for e2ds file
        folder_name = file_name

        e2ds_files = glob.glob(os.path.join(folder_name, "*e2ds_A.fits"), recursive=True)
        ccf_files = glob.glob(os.path.join(folder_name, "*ccf*A.fits"), recursive=True)
        bis_files = glob.glob(os.path.join(folder_name, "*bis*A.fits"), recursive=True)

        for name, elems in [
            ("e2ds_A", e2ds_files),
            ("ccf", ccf_files),
            ("bis", bis_files),
        ]:
            if len(elems) > 1:
                msg = f"HARPS data only received folder name and it has more than 1 {name} file in it"
                raise custom_exceptions.InvalidConfiguration(msg)

            if len(elems) < 1:
                msg = f"HARPS data only received folder name and it has no {name} file in it"
                if name != "bis":
                    # The BIS file is not critical for the run
                    raise custom_exceptions.InvalidConfiguration(msg)
                logger.critical(msg)

        e2ds_path = e2ds_files[0]
        ccf_path = ccf_files[0]
        bis_path = bis_files[0]
        search_status = SUCCESS("Found all input files")
    else:
        logger.debug("Received path of E2DS file; searching for CCF with matching name")
        folder_name = os.path.dirname(file_name)
        e2ds_path = file_name
        file_start, *_ = os.path.basename(file_name).split("_")

        found_CCF = False
        found_BIS = False
        ccf_files = glob.glob(os.path.join(folder_name, "*ccf*A.fits"), recursive=True)
        bis_files = glob.glob(os.path.join(folder_name, "*bis*A.fits"), recursive=True)
        for file in ccf_files:
            if file_start in file:
                ccf_path = file
                found_CCF = True

        for file in bis_files:
            if file_start in file:
                bis_path = file
                found_BIS = True
        if found_CCF:
            logger.info(f"Found CCF file: {ccf_path}")
            search_status = SUCCESS("Found CCF file")
        else:
            logger.critical("Was not able to find CCF file. Marking frame as invalid")
            ccf_path = ""

        if not found_BIS:
            bis_path = None

    return e2ds_path, ccf_path, bis_path, search_status

load_ccf_data()

Load the necessarfy CCF data from the file!

Source code in src/ASTRA/Instruments/HARPS.py
def load_ccf_data(self) -> None:
    """Load the necessarfy CCF data from the file!"""
    logger.debug("Loading data from the ccf file")
    header = fits.getheader(self.ccf_path)

    self.observation_info["DRS_RV"] = header["HIERARCH ESO DRS CCF RV"] * kilometer_second
    self.observation_info["SPEC_TYPE"] = header["HIERARCH ESO DRS CCF MASK"]

    RV_err = np.sqrt(
        header["HIERARCH ESO DRS CAL TH ERROR"] ** 2
        +
        # hdulist[0].header['HIERARCH ESO DRS DRIFT NOISE']**2   +
        (1000 * header["HIERARCH ESO DRS CCF NOISE"]) ** 2,
    )
    self.observation_info["DRS_RV_ERR"] = RV_err * meter_second

    for key in self.available_indicators:
        full_key = "HIERARCH ESO DRS CCF " + key
        self.observation_info[key] = header[full_key]

    # We are missing error in CONTRAST!
    self.observation_info["FWHM_ERR"] = convert_data(
        2.35 * RV_err * meter_second,
        new_units=kilometer_second,
        as_value=True,
    )
    self.observation_info["BIS SPAN_ERR"] = convert_data(
        np.sqrt(2) * RV_err * meter_second,
        new_units=kilometer_second,
        as_value=True,
    )

    if self.BIS_file is not None:
        head = fits.getheader(self.BIS_file)
        self.observation_info["BIS SPAN"] = head["HIERARCH ESO DRS BIS SPAN"]

load_old_DRS_S2D()

Loads the spectra

Returns
Source code in src/ASTRA/Instruments/HARPS.py
def load_old_DRS_S2D(self):
    """Loads the spectra

    Returns
    -------

    """
    super().load_S2D_data()

    with fits.open(self.file_path) as hdulist:
        # Compute the wavelength solution + BERV correction
        wave_from_file = self.build_HARPS_wavelengths(hdulist[0].header)

        sci_data = hdulist[0].data  # spetra from all orders

        # photon noise + estimate of max value for the rest
        # from ETC calculator the readout noise should be the largest contribution
        # assuming that it is of ~7e- (equal to manual) it should have a maximum contribution
        # of 200
        flux_errors = np.sqrt(250 + np.abs(sci_data, dtype=float))

        # Validate for overflows and missing data
        quality_data = np.zeros(sci_data.shape)
        quality_data[np.where(np.isnan(sci_data))] = NAN_DATA.code
        quality_data[np.where(sci_data > 300000)] = SATURATION.code
        quality_data[np.where(sci_data < -3 * flux_errors)] = ERROR_THRESHOLD.code

    self.spectra = sci_data.astype(np.float64)
    self.wavelengths = wave_from_file
    self.qual_data = quality_data
    self.uncertainties = flux_errors.astype(np.float64)

    self.build_mask(bypass_QualCheck=False)
    return 1

load_telemetry_info(header)

Loads (at least) the following keywords:

  • relative humidity
  • ambient temperature, in Celsius
  • airmass
  • Detector
Parameters

header

Returns
Source code in src/ASTRA/Instruments/HARPS.py
def load_telemetry_info(self, header):
    """Loads (at least) the following keywords:

    - relative humidity
    - ambient temperature, in Celsius
    - airmass
    - Detector

    Parameters
    ----------
    header

    Returns
    -------

    """
    ambi_KWs = {
        "relative_humidity": "RHUM",
        "ambient_temperature": "TEMP",
    }

    for name, endKW in ambi_KWs.items():
        self.observation_info[name] = header[f"HIERARCH {self.KW_identifier} TEL AMBI {endKW}"]
        if "temperature" in name:  # store temperature in KELVIN for TELFIT
            self.observation_info[name] = convert_temperature(
                self.observation_info[name],
                old_scale="Celsius",
                new_scale="Kelvin",
            )

    if self.observation_info["relative_humidity"] == 255:
        logger.warning(f"{self.name} has an invalid value in the humidity sensor...")
        self.observation_info["relative_humidity"] = np.nan

    self.observation_info["airmass"] = header["HIERARCH ESO TEL AIRM START"]