Source code for qsospec.io.products

"""Output products and diagnostic plots for the global qsospec workflow."""

from __future__ import annotations

import json
from dataclasses import dataclass
from functools import wraps
from pathlib import Path
import re
from typing import Dict, Mapping, Optional, Tuple, Union
import unicodedata

import numpy as np
import pandas as pd

from ..global_result import WorkflowResult
from .. import complex_recipes, lines


[docs] @dataclass(frozen=True) class GlobalQAPlotConfig: """Rendering options for the global continuum and emission-line QA plot.""" figure_width: float = 10.5 figure_height: float = 8.0 max_zoom_panels: int = 4 show_smoothed_data: bool = True smooth_original_spectrum_for_display: bool = False smoothing_window_pixels: int = 7 show_residual_panel: bool = True show_fit_regions: bool = True unmodelled_windows: Tuple[Tuple[float, float, str], ...] = ( (1170.0, 1275.0, "Lyα"), ) show_host_context_in_overview: bool = True object_name: Optional[str] = None object_label: Optional[str] = None show_coordinates: bool = True output_format: str = "png" write_other_diagnostics: bool = False def __post_init__(self) -> None: if self.figure_width <= 0 or self.figure_height <= 0: raise ValueError("QA figure dimensions must be positive.") if self.max_zoom_panels < 1: raise ValueError("max_zoom_panels must be at least one.") if self.smoothing_window_pixels < 1 or self.smoothing_window_pixels % 2 == 0: raise ValueError("smoothing_window_pixels must be a positive odd integer.") for window in self.unmodelled_windows: if len(window) != 3 or not float(window[0]) < float(window[1]): raise ValueError( "Each unmodelled window must be (lower, upper, label)." ) if self.output_format not in ("png", "pdf", "both"): raise ValueError("output_format must be 'png', 'pdf', or 'both'.")
_SCIENCE_PLOT_STYLE = { "font.family": "serif", "font.serif": [ "Times New Roman", "Times", "STIX Two Text", "DejaVu Serif", ], "font.size": 12.0, "axes.titlesize": 12.0, "axes.labelsize": 14.0, "xtick.labelsize": 12.0, "ytick.labelsize": 12.0, "legend.fontsize": 11.0, "figure.titlesize": 16.0, "mathtext.fontset": "stix", } def _science_plot_style(function): """Render one plot without mutating process-global Matplotlib settings.""" @wraps(function) def wrapped(*args, **kwargs): import matplotlib.pyplot as plt with plt.rc_context(_SCIENCE_PLOT_STYLE): return function(*args, **kwargs) return wrapped def _normalized_file_label(value: object) -> str: text = unicodedata.normalize("NFKD", str(value)).encode( "ascii", "ignore" ).decode("ascii") text = re.sub(r"[^a-zA-Z0-9]+", "_", text.strip().lower()) return text.strip("_") or "object" def _qa_object_name( result: WorkflowResult, config: GlobalQAPlotConfig, ) -> str: value = config.object_name if value in (None, ""): value = ( result.metadata.get("object_name") or result.metadata.get("object_id") or result.metadata.get("targetid") or result.spectrum.metadata.source or "object" ) return _normalized_file_label(value) def _plot_formats(config: GlobalQAPlotConfig) -> Tuple[str, ...]: if config.output_format == "both": return ("png", "pdf") return (config.output_format,) def _plot_paths( output_dir: Path, stem: str, config: GlobalQAPlotConfig, ) -> Dict[str, Path]: return { file_format: output_dir / f"{stem}.{file_format}" for file_format in _plot_formats(config) } def _save_figure(fig, paths: Mapping[str, Path]) -> Dict[str, str]: saved = {} for file_format, path in paths.items(): save_kwargs = {"dpi": 160} if file_format == "png" else {} fig.savefig(path, **save_kwargs) saved[file_format] = str(path) return saved def _coerce_plot_paths( paths: Union[Path, Mapping[str, Path]], ) -> Dict[str, Path]: if isinstance(paths, Path): file_format = paths.suffix.lower().lstrip(".") or "png" return {file_format: paths} return dict(paths) def _json_default(value): if isinstance(value, np.generic): return value.item() if isinstance(value, np.ndarray): return value.tolist() if hasattr(value, "to_dict"): return value.to_dict() return str(value) def _percentile_limits(values, percentiles: Tuple[float, float] = (1.0, 99.0), pad: float = 0.08): arrays = [np.ravel(np.asarray(value, dtype=float)) for value in values if value is not None] if not arrays: return None data = np.concatenate(arrays) data = data[np.isfinite(data)] if data.size == 0: return None lo, hi = np.percentile(data, percentiles) width = hi - lo margin = width * pad if width > 0 else max(abs(lo) * pad, 1.0) return float(lo - margin), float(hi + margin) def _flux_display_scale(spectrum) -> float: """Return the display-only scale into DESI-style 1e-17 cgs units.""" if getattr(spectrum, "flux_unit", None) != "cgs": return 1.0 scale = getattr(spectrum, "flux_scale", None) if scale is None or not np.isfinite(scale) or scale <= 0: return 1.0 return float(scale) / 1.0e-17 @_science_plot_style def _plot_global( result: WorkflowResult, paths: Union[Path, Mapping[str, Path]], config: GlobalQAPlotConfig, window: Optional[Tuple[float, float]] = None, ) -> Dict[str, str]: import matplotlib.pyplot as plt paths = _coerce_plot_paths(paths) spectrum = result.spectrum continuum = result.continuum wave = spectrum.wave_rest display_scale = _flux_display_scale(spectrum) valid = spectrum.valid_mask if window is not None: valid &= (wave >= window[0]) & (wave <= window[1]) fig, ax = plt.subplots( figsize=(config.figure_width, 3.8), constrained_layout=True, ) ax.plot( wave[valid], display_scale * spectrum.flux[valid], color="0.45", lw=0.65, label="host-subtracted data", ) ax.plot( wave[valid], display_scale * continuum.model[valid], color="black", lw=1.8, label="full continuum", ) iron_label_used = False balmer_label_used = False for name, component in continuum.component_models.items(): color, linestyle = _CONTINUUM_STYLES.get(name, ("0.5", "-")) if name in ("uv_iron", "optical_iron"): label = "iron" if not iron_label_used else "_nolegend_" iron_label_used = True elif name in ("balmer_bound_free", "balmer_high_order_series"): label = ( "Balmer pseudo-continuum" if not balmer_label_used else "_nolegend_" ) balmer_label_used = True else: label = name.replace("_", " ") ax.plot( wave[valid], display_scale * component[valid], lw=0.75, ls=linestyle, color=color, label=label, ) used = continuum.clip_mask & valid if np.any(used): ax.scatter( wave[used], display_scale * spectrum.flux[used], s=5, color="k", alpha=0.25, label="fit pixels", ) upper = _rounded_model_upper_limit( display_scale * continuum.model[valid] ) if upper is not None: ax.set_ylim(0.0, upper) if window is not None: ax.set_xlim(*window) ax.set_xlabel(r"Rest wavelength [$\mathrm{\AA}$]", fontsize=13) ax.set_ylabel( _flux_density_axis_label(spectrum), fontsize=13, ) _configure_qa_axis(ax) ax.legend(fontsize=9, ncol=3, framealpha=0.72) saved = _save_figure(fig, paths) plt.close(fig) return saved _COMPLEX_WINDOWS = { recipe.id: recipe.fit_window for recipe in complex_recipes.list_complexes() if recipe.fit_window[1] > recipe.fit_window[0] + 1.0 } _COMPLEX_WINDOWS.update({"hbeta": (4600.0, 5120.0), "halpha": (6400.0, 6800.0)}) _TCC_COLORS = { "data": "#a8a19d", # 鼠毛 "data_smooth": "#686b68", # 石涅 "total_model": "#28292b", # 元青 "continuum": "#ee781f", # 金红 "host": "#785034", # 驼褐 "powerlaw": "#b03766", # 魏红 "feii": "#7b5aa3", # 青莲 "balmer_cont": "#db9c4b", # 库金 "broad_total": "#014a8f", # 空青 "broad_component": "#30aecf", # 法蓝 "narrow": "#007d62", # 孔雀绿 "outflow": "#d80835", # 朱砂 "line_marker": "#6f9bc6", # 挼蓝 "unmodelled_span": "#e4ecf0", # 卵白 "masked_span": "#ddd4d3", # 葭灰 } _SPECIES_COLORS = { "MgII": _TCC_COLORS["broad_component"], "Hb": _TCC_COLORS["broad_component"], "HeII": _TCC_COLORS["outflow"], "OIII": _TCC_COLORS["narrow"], "Ha": _TCC_COLORS["outflow"], "NII": _TCC_COLORS["narrow"], "SII": _TCC_COLORS["host"], } _IRON_STYLE = (_TCC_COLORS["feii"], "-") _BALMER_STYLE = (_TCC_COLORS["balmer_cont"], "-.") _CONTINUUM_STYLES = { "power_law": (_TCC_COLORS["powerlaw"], "--"), "uv_iron": _IRON_STYLE, "optical_iron": _IRON_STYLE, "balmer_bound_free": _BALMER_STYLE, "balmer_high_order_series": _BALMER_STYLE, } _COMBINED_BROAD_STYLE = { "color": _TCC_COLORS["broad_total"], "linestyle": "-", "linewidth": 1.6, } _BROAD_COMPONENT_STYLE = { "color": _TCC_COLORS["broad_component"], "linestyle": "-", "linewidth": 0.9, } _NARROW_STYLE = { "color": _TCC_COLORS["narrow"], "linestyle": "-", "linewidth": 1.15, } _WING_STYLE = { "color": _TCC_COLORS["outflow"], "linestyle": "-", "linewidth": 0.9, } _HOST_STYLE = { "color": _TCC_COLORS["host"], "linestyle": "-", "linewidth": 0.9, } _MAJOR_EMISSION_LINES = ( (1215.67, r"Ly$\alpha$"), (1549.06, r"C IV"), (1908.73, r"C III]"), (2798.75, r"Mg II"), (4862.68, r"H$\beta$"), (5008.24, r"[O III] 5008"), (6564.61, r"H$\alpha$"), ) _ZOOM_EMISSION_LINES = { "mgii": ((2798.75, r"Mg II"),), "hbeta": ( (4862.68, r"H$\beta$"), (4960.30, r"[O III] 4960"), (5008.24, r"[O III] 5008"), ), "halpha": ( (6549.85, r"[N II] 6550"), (6564.61, r"H$\alpha$"), (6585.28, r"[N II] 6585"), (6718.29, r"[S II] 6718"), (6732.67, r"[S II] 6733"), ), } for _recipe in complex_recipes.list_complexes(): if _recipe.qa_labels: _ZOOM_EMISSION_LINES[_recipe.id] = tuple( ( lines.get(line_id).vacuum_wavelength, lines.get(line_id).label, ) for line_id in _recipe.qa_labels ) _ZOOM_PRIORITY = ( "lya_nv", "hbeta_oiii", "hbeta", "mgii", "halpha_nii_sii", "halpha", ) _LINE_MARKER_STYLE = { "color": _TCC_COLORS["line_marker"], "linestyle": ":", "linewidth": 0.8, "alpha": 0.65, } def _species_from_component(name: str) -> str: for species in ("MgII", "HeII", "OIII", "NII", "SII", "Hb", "Ha"): if name.startswith(species): return species return "Hb" def _line_groups(name: str, fit) -> Tuple[Tuple[str, np.ndarray, str, str], ...]: broad_names = [key for key in fit.component_models if "broad" in key and "wing" not in key] groups = [] if broad_names: broad_sum = sum( (fit.component_models[key] for key in broad_names), np.zeros_like(fit.model), ) try: label = f"{complex_recipes.get(name).label} broad" except ValueError: label = f"{name} broad" species = _species_from_component(broad_names[0]) groups.append((label, broad_sum, species, "broad")) for component_name, component in fit.component_models.items(): if component_name in broad_names: continue kind = "wing" if "wing" in component_name else "narrow" groups.append( ( component_name.replace("_", " "), component, _species_from_component(component_name), kind, ) ) return tuple(groups) def _broad_component_names(fit) -> Tuple[str, ...]: return tuple( name for name in fit.component_models if "broad" in name and "wing" not in name ) def _combined_broad_profile(fit) -> np.ndarray: return sum( (fit.component_models[name] for name in _broad_component_names(fit)), np.zeros_like(fit.model), ) def _select_zoom_complexes( line_complexes: Mapping[str, object], max_zoom_panels: int, ) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: available = [ name for name, fit in line_complexes.items() if name in _COMPLEX_WINDOWS and bool(getattr(fit, "success", False)) and not ( name == "oii_nev_neiii_hgamma" and not bool( getattr(fit, "metadata", {}).get( "qa_all_lines_covered", False ) ) ) ] priority_order = [ name for name in _ZOOM_PRIORITY if name in available ] + sorted( (name for name in available if name not in _ZOOM_PRIORITY), key=lambda name: _COMPLEX_WINDOWS[name][0], ) selected = set(priority_order[:max_zoom_panels]) displayed = tuple( sorted(selected, key=lambda name: _COMPLEX_WINDOWS[name][0]) ) omitted = tuple( sorted( (name for name in available if name not in selected), key=lambda name: _COMPLEX_WINDOWS[name][0], ) ) return displayed, omitted def _masked_running_median( values: np.ndarray, valid: np.ndarray, window_pixels: int, ) -> np.ndarray: series = pd.Series( np.where(np.asarray(valid, dtype=bool), np.asarray(values, dtype=float), np.nan) ) smoothed = series.rolling( window=window_pixels, center=True, min_periods=1, ).median().to_numpy(copy=True) smoothed[~np.asarray(valid, dtype=bool)] = np.nan return smoothed def _format_reduced_chi2(value: float) -> str: return f"{value:.2f}" if np.isfinite(value) else "n/a" def _qa_overview_title( result: WorkflowResult, plot_config: Optional[GlobalQAPlotConfig] = None, ) -> str: config = plot_config or GlobalQAPlotConfig() object_name = ( config.object_name if config.object_name not in (None, "") else result.metadata.get("object_id") ) parts = [] if object_name not in (None, ""): object_label = config.object_label or "Object" parts.append(f"{object_label} {object_name}".strip()) redshift = result.metadata.get("redshift") if redshift is not None and np.isfinite(redshift): parts.append(f"z = {float(redshift):.4f}") title = " ".join(parts) if config.show_coordinates: coordinates = [] ra = result.metadata.get("ra") dec = result.metadata.get("dec") if ra is not None and np.isfinite(ra): coordinates.append(f"RA = {float(ra):.5f}") if dec is not None and np.isfinite(dec): coordinates.append(f"Dec = {float(dec):+.5f}") extinction = result.metadata.get("galactic_extinction", {}) ebv = ( extinction.get("applied_ebv") if isinstance(extinction, Mapping) else None ) if ebv is not None and np.isfinite(ebv): coordinates.append( rf"$E(B-V) = {float(ebv):.4f}$" ) if coordinates: title = ( title + "\n" if title else "" ) + " ".join(coordinates) return title def _input_spectrum_label( result: WorkflowResult, *, smoothed: bool = False, ) -> str: extinction = result.metadata.get( "galactic_extinction", result.spectrum.metadata.galactic_extinction, ) status = ( extinction.get("status") if isinstance(extinction, Mapping) else None ) corrected = bool( result.spectrum.metadata.galactic_extinction_corrected or status in ( "applied", "declared_corrected", "caller_preprocessed", ) ) parts = ["Input spectrum"] if corrected: parts.append("MW extinction corrected") if smoothed: parts.append("smoothed for display") return "\n".join(parts) def _configure_qa_axis(axis) -> None: axis.minorticks_on() axis.tick_params( which="both", direction="in", top=True, right=True, labelsize=11, ) axis.tick_params(which="major", length=4.0) axis.tick_params(which="minor", length=2.2) def _annotate_emission_lines(axis, lines, *, y_fraction: float) -> Tuple[str, ...]: labels = [] x_min, x_max = axis.get_xlim() for line_wave, line_label in lines: if not x_min <= line_wave <= x_max: continue label_y_fraction = ( min(y_fraction, 0.68) if line_label.startswith("[") else y_fraction ) axis.axvline( line_wave, ymin=0.88, ymax=1.0, zorder=0.5, **_LINE_MARKER_STYLE, ) axis.text( line_wave, label_y_fraction, line_label, transform=axis.get_xaxis_transform(), rotation=90, ha="center", va="bottom", fontsize=8.5, color=_TCC_COLORS["line_marker"], alpha=0.82, ) labels.append(line_label) return tuple(labels) def _flux_density_axis_label(spectrum_or_unit) -> str: flux_unit = getattr(spectrum_or_unit, "flux_unit", None) if flux_unit == "cgs": return ( r"$F_\lambda\ " r"[10^{-17}\,\mathrm{erg}\,\mathrm{s}^{-1}\," r"\mathrm{cm}^{-2}\,\mathrm{\AA}^{-1}]$" ) if flux_unit == "relative": return r"$F_\lambda$ [relative units]" normalized = str(spectrum_or_unit).lower().replace("angstrom", "aa") if ( "1e-17" in normalized and "erg" in normalized and "cm" in normalized and "aa" in normalized ): return ( r"$F_\lambda\ " r"[10^{-17}\,\mathrm{erg}\,\mathrm{s}^{-1}\," r"\mathrm{cm}^{-2}\,\mathrm{\AA}^{-1}]$" ) if "relative" in normalized: return r"$F_\lambda$ [relative units]" return rf"$F_\lambda$ [{spectrum_or_unit}]" def _rounded_model_upper_limit(model_values: np.ndarray) -> Optional[float]: """Return a compact rounded upper limit with headroom above the fitted model.""" values = np.asarray(model_values, dtype=float) values = values[np.isfinite(values)] if values.size == 0: return None peak = float(np.max(values)) if peak <= 0: return None target = 1.2 * peak magnitude = 10.0 ** np.floor(np.log10(target)) normalized = target / magnitude for nice_value in (1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 7.5, 10.0): if normalized <= nice_value: return float(nice_value * magnitude) return float(10.0 * magnitude) def _full_line_model(result: WorkflowResult) -> np.ndarray: return sum( ( complex_result.model for complex_result in result.line_complexes.values() if complex_result.success ), np.zeros_like(result.continuum.model), ) def _has_host_context(result: WorkflowResult) -> bool: if result.total_spectrum is None or result.host_model_on_quasar_grid is None: return False host = np.asarray(result.host_model_on_quasar_grid, dtype=float) return host.shape == result.spectrum.flux.shape and np.any(np.isfinite(host)) def _host_fraction_annotation(result: WorkflowResult) -> str: samples = result.metadata.get("continuum_samples", {}) valid_wave = result.spectrum.wave_rest[result.spectrum.valid_mask] entries = [] for wavelength in (3000, 5100): if ( valid_wave.size == 0 or wavelength < float(np.min(valid_wave)) or wavelength > float(np.max(valid_wave)) ): continue fraction = samples.get(f"fracHost_{wavelength}") if fraction is not None and np.isfinite(fraction): entries.append( rf"$f_{{\rm host}}({wavelength}\,\mathrm{{\AA}})=" f"{100.0 * float(fraction):.1f}\\%$" ) return "\n".join(entries) def _final_fit_masks( result: WorkflowResult, config: GlobalQAPlotConfig, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Return final fitted, unmodelled, and pPXF-masked pixels.""" wave = result.spectrum.wave_rest valid = result.spectrum.valid_mask fitted = np.asarray(result.continuum.clip_mask, dtype=bool) & valid successful_line_fit = np.zeros_like(fitted) unmodelled = np.zeros_like(fitted) lya_fit_successful = bool( "lya_nv" in result.line_complexes and result.line_complexes["lya_nv"].success ) for name, fit in result.line_complexes.items(): if fit.success: line_mask = np.asarray(fit.fit_mask, dtype=bool) & valid fitted |= line_mask successful_line_fit |= line_mask elif name in _COMPLEX_WINDOWS: lo, hi = _COMPLEX_WINDOWS[name] unmodelled |= valid & (wave >= lo) & (wave <= hi) for lo, hi, _ in config.unmodelled_windows: if lya_fit_successful and lo <= 1215.67 <= hi: continue unmodelled |= valid & (wave >= float(lo)) & (wave <= float(hi)) unmodelled &= ~fitted ppxf_masked = np.zeros_like(fitted) if ( result.host_fit_mask is not None and result.host_emission_mask is not None ): host_fit_mask = np.asarray(result.host_fit_mask, dtype=bool) host_emission_mask = np.asarray(result.host_emission_mask, dtype=bool) if host_fit_mask.shape == fitted.shape and host_emission_mask.shape == fitted.shape: ppxf_masked = ( valid & host_fit_mask & host_emission_mask & ~fitted ) return fitted, unmodelled, ppxf_masked def _mask_intervals( wave: np.ndarray, mask: np.ndarray, ) -> Tuple[Tuple[float, float], ...]: indices = np.flatnonzero(np.asarray(mask, dtype=bool)) if indices.size == 0: return () breaks = np.flatnonzero(np.diff(indices) > 1) + 1 groups = np.split(indices, breaks) return tuple( (float(wave[group[0]]), float(wave[group[-1]])) for group in groups if group.size ) def _shade_mask_regions( axis, wave: np.ndarray, unmodelled: np.ndarray, ppxf_masked: np.ndarray, *, labels: bool, ) -> None: for index, (lo, hi) in enumerate(_mask_intervals(wave, ppxf_masked)): axis.axvspan( lo, hi, facecolor=_TCC_COLORS["masked_span"], alpha=0.42, linewidth=0.0, zorder=-10, label="masked in pPXF host fit" if labels and index == 0 else "_nolegend_", ) for index, (lo, hi) in enumerate(_mask_intervals(wave, unmodelled)): axis.axvspan( lo, hi, facecolor=_TCC_COLORS["unmodelled_span"], alpha=0.45, hatch="////", edgecolor=_TCC_COLORS["masked_span"], linewidth=0.0, zorder=-9, label="not fitted / not modelled" if labels and index == 0 else "_nolegend_", ) def _deduplicated_legend_items(axis): items = {} for handle, label in zip(*axis.get_legend_handles_labels()): if label and label != "_nolegend_" and label not in items: items[label] = handle return list(items.values()), list(items) @_science_plot_style def _plot_host_context( result: WorkflowResult, paths: Union[Path, Mapping[str, Path]], *, config: GlobalQAPlotConfig, ) -> Dict[str, str]: """Plot the original spectrum, host decomposition, and final AGN model.""" import matplotlib.pyplot as plt paths = _coerce_plot_paths(paths) if not _has_host_context(result): raise ValueError("A total spectrum and finite host model are required.") spectrum = result.spectrum total_spectrum = result.total_spectrum wave = spectrum.wave_rest display_scale = _flux_display_scale(spectrum) host = np.asarray(result.host_model_on_quasar_grid, dtype=float) line_model = _full_line_model(result) agn_model = result.continuum.model + line_model reconstructed_total = host + agn_model valid_fit = spectrum.valid_mask & np.isfinite(host) valid_total = ( total_spectrum.valid_mask & np.isfinite(host) & np.isfinite(reconstructed_total) ) smoothing_effective = bool( config.smooth_original_spectrum_for_display and wave.size > 4000 ) displayed_total_flux_native = ( _masked_running_median( total_spectrum.flux, valid_total, config.smoothing_window_pixels, ) if smoothing_effective else total_spectrum.flux ) displayed_total_flux = display_scale * displayed_total_flux_native reconstructed_total_display = display_scale * reconstructed_total host_display = display_scale * host fit_flux_display = display_scale * spectrum.flux agn_model_display = display_scale * agn_model original_label = _input_spectrum_label( result, smoothed=smoothing_effective, ) valid_wave = wave[valid_total | valid_fit] fig, axes = plt.subplots( 2, 1, figsize=(config.figure_width, 5.2), sharex=True, constrained_layout=True, gridspec_kw={"height_ratios": (1.0, 1.0)}, ) top_axis, bottom_axis = axes top_axis.plot( wave[valid_total], displayed_total_flux[valid_total], color="0.48", lw=0.65, label=original_label, ) top_axis.plot( wave[valid_total], reconstructed_total_display[valid_total], color="black", lw=1.7, label="host + final AGN model", ) top_axis.plot( wave[valid_total], host_display[valid_total], **_HOST_STYLE, label="host galaxy", ) top_upper = _rounded_model_upper_limit( reconstructed_total_display[valid_total] ) if top_upper is not None: top_axis.set_ylim(0.0, top_upper) top_axis.set_ylabel( _flux_density_axis_label(spectrum), fontsize=13, ) source_title = _qa_overview_title(result, config) top_axis.set_title( "Host decomposition and final-model context" + (f" — {source_title}" if source_title else ""), fontsize=12, ) fraction_text = _host_fraction_annotation(result) if fraction_text: top_axis.text( 0.01, 0.96, fraction_text, transform=top_axis.transAxes, ha="left", va="top", fontsize=9, bbox={ "boxstyle": "round,pad=0.25", "facecolor": "white", "edgecolor": "0.75", "alpha": 0.72, }, ) top_axis.legend( fontsize=9, ncol=3, loc="best", framealpha=0.72, borderpad=0.35, ) bottom_axis.plot( wave[valid_fit], fit_flux_display[valid_fit], color="0.48", lw=0.65, label="host-subtracted spectrum", ) bottom_axis.plot( wave[valid_fit], agn_model_display[valid_fit], color="black", lw=1.7, label="final AGN + emission-line model", ) bottom_upper = _rounded_model_upper_limit( agn_model_display[valid_fit] ) if bottom_upper is not None: bottom_axis.set_ylim(0.0, bottom_upper) bottom_axis.set_ylabel( _flux_density_axis_label(spectrum), fontsize=13, ) bottom_axis.set_xlabel( r"Rest wavelength [$\mathrm{\AA}$]", fontsize=13, ) bottom_axis.legend( fontsize=9, ncol=2, loc="best", framealpha=0.72, borderpad=0.35, ) if valid_wave.size: x_limits = (float(valid_wave.min()), float(valid_wave.max())) top_axis.set_xlim(*x_limits) result.metadata["host_context_xlim"] = list(x_limits) for axis in axes: _configure_qa_axis(axis) result.metadata["host_context_figure_size_inches"] = [ float(config.figure_width), 5.2, ] result.metadata["host_context_ymin"] = 0.0 result.metadata["host_context_model_upper_limits"] = { "original_plus_host": top_upper, "host_subtracted": bottom_upper, } result.metadata["host_context_fraction_annotation"] = fraction_text result.metadata["host_context_original_spectrum_smoothed_for_display"] = ( smoothing_effective ) result.metadata["host_context_smoothing_requested"] = bool( config.smooth_original_spectrum_for_display ) result.metadata["host_context_smoothing_effective"] = smoothing_effective result.metadata["host_context_flux_display_scale"] = display_scale saved = _save_figure(fig, paths) plt.close(fig) return saved @_science_plot_style def _plot_qa( result: WorkflowResult, paths: Optional[Union[Path, Mapping[str, Path]]] = None, plot_config: Optional[GlobalQAPlotConfig] = None, *, return_figure: bool = False, ): import matplotlib.pyplot as plt resolved_paths = None if paths is None else _coerce_plot_paths(paths) config = plot_config or GlobalQAPlotConfig() available, omitted = _select_zoom_complexes( result.line_complexes, config.max_zoom_panels, ) result.metadata["qa_panel_count"] = ( 1 + int(config.show_residual_panel) + len(available) ) result.metadata["qa_percentiles"] = [1.0, 99.8] result.metadata["qa_layout"] = ( "overview_residual_complexes" if config.show_residual_panel else "overview_complexes" ) result.metadata["qa_figure_size_inches"] = [ float(config.figure_width), float(config.figure_height), ] result.metadata["qa_max_zoom_panels"] = int(config.max_zoom_panels) result.metadata["qa_displayed_complexes"] = list(available) result.metadata["qa_omitted_complexes"] = list(omitted) result.metadata["qa_smoothed_data_requested"] = bool( config.show_smoothed_data ) result.metadata["qa_original_spectrum_smoothed_for_display"] = bool( config.smooth_original_spectrum_for_display ) result.metadata["qa_show_fit_regions"] = bool(config.show_fit_regions) result.metadata["qa_show_residual_panel"] = bool( config.show_residual_panel ) result.metadata["qa_unmodelled_windows"] = [ [float(lo), float(hi), str(label)] for lo, hi, label in config.unmodelled_windows ] result.metadata["qa_plot_style"] = "qsospec_science_serif" result.metadata["qa_plot_style_rc"] = dict(_SCIENCE_PLOT_STYLE) result.metadata["qa_smoothing_window_pixels"] = int( config.smoothing_window_pixels ) result.metadata["qa_tick_direction"] = "in" result.metadata["qa_minor_ticks"] = True result.metadata["qa_zoom_model_upper_limits"] = {} result.metadata["qa_zoom_ymin"] = {} result.metadata["qa_zoom_titles"] = {} result.metadata["qa_zoom_line_labels"] = {} ncols = max(len(available), 1) fig = plt.figure( figsize=(config.figure_width, config.figure_height), constrained_layout=True, ) if config.show_residual_panel: grid = fig.add_gridspec( 3, ncols, height_ratios=(1.0, 0.28, 0.78), ) overview_axis = fig.add_subplot(grid[0, :]) residual_axis = fig.add_subplot( grid[1, :], sharex=overview_axis, ) zoom_row = 2 else: grid = fig.add_gridspec(2, ncols, height_ratios=(1.0, 0.78)) overview_axis = fig.add_subplot(grid[0, :]) residual_axis = None zoom_row = 1 zoom_axes = [ fig.add_subplot(grid[zoom_row, index]) for index in range(ncols) ] spectrum = result.spectrum wave = spectrum.wave_rest display_scale = _flux_display_scale(spectrum) valid = spectrum.valid_mask line_model = _full_line_model(result) full_model = result.continuum.model + line_model fitted_mask, unmodelled_mask, ppxf_masked = _final_fit_masks( result, config ) if not config.show_fit_regions: unmodelled_mask[:] = False ppxf_masked[:] = False result.metadata["qa_n_fitted_pixels"] = int(np.count_nonzero(fitted_mask)) result.metadata["qa_n_unmodelled_pixels"] = int( np.count_nonzero(unmodelled_mask) ) result.metadata["qa_n_ppxf_masked_pixels"] = int( np.count_nonzero(ppxf_masked) ) result.metadata["qa_host_mask_provenance"] = result.metadata.get( "host_mask_provenance", "exact" if result.host_fit_mask is not None and result.host_emission_mask is not None else "unavailable", ) host_overview = bool( config.show_host_context_in_overview and _has_host_context(result) ) result.metadata["qa_host_context_overview_requested"] = bool( config.show_host_context_in_overview ) result.metadata["qa_host_context_overview_used"] = host_overview host_model = ( np.asarray(result.host_model_on_quasar_grid, dtype=float) if host_overview else np.zeros_like(full_model) ) overview_data_native = ( np.asarray(result.total_spectrum.flux, dtype=float) if host_overview else spectrum.flux ) overview_full_model_native = full_model + host_model overview_data = display_scale * overview_data_native overview_full_model = display_scale * overview_full_model_native full_model_display = display_scale * full_model fit_data_display = display_scale * spectrum.flux fit_error_display = display_scale * spectrum.err host_model_display = display_scale * host_model overview_valid = valid.copy() if host_overview: overview_valid &= ( result.total_spectrum.valid_mask & np.isfinite(host_model) & np.isfinite(overview_data_native) ) smoothing_requested = bool( config.show_smoothed_data or config.smooth_original_spectrum_for_display ) smoothing_effective = bool(smoothing_requested and wave.size > 4000) smoothed_fit_data_native = ( _masked_running_median( spectrum.flux, valid, config.smoothing_window_pixels, ) if smoothing_effective else None ) smoothed_overview_data_native = ( _masked_running_median( overview_data_native, overview_valid, config.smoothing_window_pixels, ) if smoothing_effective else None ) smoothed_fit_data = ( display_scale * smoothed_fit_data_native if smoothed_fit_data_native is not None else None ) smoothed_overview_data = ( display_scale * smoothed_overview_data_native if smoothed_overview_data_native is not None else None ) replace_original_with_smoothed = bool( config.smooth_original_spectrum_for_display and smoothing_effective ) show_smoothed_trace = bool( config.show_smoothed_data and smoothing_effective ) result.metadata["qa_original_spectrum_smoothed_used"] = ( replace_original_with_smoothed ) result.metadata["qa_smoothed_data"] = show_smoothed_trace result.metadata["qa_smoothing_requested"] = smoothing_requested result.metadata["qa_smoothing_effective"] = smoothing_effective result.metadata["qa_smoothing_suppressed_short_spectrum"] = bool( smoothing_requested and wave.size <= 4000 ) result.metadata["qa_flux_display_scale"] = display_scale result.metadata["qa_flux_display_unit"] = ( "1e-17 cgs" if spectrum.flux_unit == "cgs" else "relative" ) result.metadata["qa_wavelength_frame"] = "rest" result.metadata["qa_flux_density_frame"] = "rest" result.metadata["qa_input_spectrum_label"] = _input_spectrum_label( result ) def plot_observed( ax, panel_mask, *, data_values, smoothed_values, labels, ): if not replace_original_with_smoothed: ax.plot( wave[panel_mask], data_values[panel_mask], color=_TCC_COLORS["data"], lw=0.8, alpha=0.8, zorder=1, label=( _input_spectrum_label(result) if labels else "_nolegend_" ), ) if smoothed_values is not None and ( show_smoothed_trace or replace_original_with_smoothed ): ax.plot( wave[panel_mask], smoothed_values[panel_mask], color=_TCC_COLORS["data_smooth"], lw=0.9, alpha=0.9, zorder=2, label=( _input_spectrum_label(result, smoothed=True) if labels else "_nolegend_" ), ) def plot_continuum_components(ax, panel_mask, *, labels): iron_label_used = False balmer_label_used = False for component_name, component in result.continuum.component_models.items(): if not np.any(np.abs(component[panel_mask]) > 0): continue color, linestyle = _CONTINUUM_STYLES.get( component_name, ("0.5", ":") ) if component_name in ("uv_iron", "optical_iron"): label = ( "Fe II" if labels and not iron_label_used else "_nolegend_" ) iron_label_used = True elif component_name in ( "balmer_bound_free", "balmer_high_order_series", ): label = ( "Balmer pseudo-continuum" if labels and not balmer_label_used else "_nolegend_" ) balmer_label_used = True else: label = ( component_name.replace("_", " ") if labels else "_nolegend_" ) ax.plot( wave[panel_mask], display_scale * component[panel_mask], color=color, ls=linestyle, lw=0.8, alpha=0.9, label=label, zorder=3, ) plot_observed( overview_axis, overview_valid, data_values=overview_data, smoothed_values=smoothed_overview_data, labels=True, ) overview_model_plot = np.where( overview_valid, overview_full_model, np.nan, ) overview_axis.plot( wave, overview_model_plot, color=_TCC_COLORS["total_model"], lw=1.8, label="total model", zorder=6, ) if host_overview: overview_axis.plot( wave[overview_valid], host_model_display[overview_valid], label="host galaxy", zorder=3, **_HOST_STYLE, ) plot_continuum_components( overview_axis, overview_valid, labels=True, ) overview_title = _qa_overview_title(result, config) result.metadata["qa_overview_title"] = overview_title overview_axis.set_title(overview_title, fontsize=12) overview_axis.set_ylabel( _flux_density_axis_label(spectrum), fontsize=13, ) _configure_qa_axis(overview_axis) broad_label_used = False narrow_label_used = False wing_label_used = False for complex_name, fit in result.line_complexes.items(): if not fit.success: continue fit = result.line_complexes[complex_name] component_mask = valid & np.asarray(fit.fit_mask, dtype=bool) combined = _combined_broad_profile(fit) overview_axis.plot( wave[component_mask], display_scale * combined[component_mask], label="broad-line model" if not broad_label_used else "_nolegend_", zorder=5, **_COMBINED_BROAD_STYLE, ) broad_label_used |= bool(np.any(combined[component_mask] != 0)) for label, component, species, kind in _line_groups(complex_name, fit): if kind == "broad": continue style = _WING_STYLE if kind == "wing" else _NARROW_STYLE if kind == "wing": legend_label = "outflow wing" if not wing_label_used else "_nolegend_" wing_label_used = True else: legend_label = "narrow-line model" if not narrow_label_used else "_nolegend_" narrow_label_used = True overview_axis.plot( wave[component_mask], display_scale * component[component_mask], label=legend_label, zorder=5, **style, ) if config.show_fit_regions: _shade_mask_regions( overview_axis, wave, unmodelled_mask, ppxf_masked, labels=True, ) valid_wave = wave[overview_valid] if valid_wave.size: overview_axis.set_xlim(float(valid_wave.min()), float(valid_wave.max())) result.metadata["qa_overview_xlim"] = [ float(valid_wave.min()), float(valid_wave.max()), ] result.metadata["qa_major_emission_line_labels"] = list( _annotate_emission_lines( overview_axis, _MAJOR_EMISSION_LINES, y_fraction=0.82, ) ) valid_overview_wave = wave[overview_valid] lya_in_coverage = bool( valid_overview_wave.size and float(np.min(valid_overview_wave)) <= 1215.67 and float(np.max(valid_overview_wave)) >= 1215.67 ) lya_model_fitted = any( ( "lya" in str(name).lower() or "lyalpha" in str(name).lower() ) and fit.success for name, fit in result.line_complexes.items() ) if lya_in_coverage and not lya_model_fitted: data_limits = _percentile_limits( [overview_data[overview_valid]], percentiles=(1.0, 99.8), ) overview_upper = data_limits[1] if data_limits is not None else None result.metadata["qa_overview_upper_policy"] = "data_only_percentile" result.metadata["qa_overview_upper_percentile"] = 99.8 else: overview_upper = _rounded_model_upper_limit( overview_full_model[overview_valid] ) result.metadata["qa_overview_upper_policy"] = "rounded_model" result.metadata["qa_overview_upper_percentile"] = None result.metadata["qa_lya_in_valid_coverage"] = lya_in_coverage result.metadata["qa_lya_model_fitted"] = lya_model_fitted if overview_upper is not None: overview_axis.set_ylim( 0.0, max(overview_upper, 0.0), ) result.metadata["qa_overview_ymin"] = 0.0 result.metadata["qa_overview_model_upper_limit"] = overview_upper clipped = overview_valid & np.isfinite(overview_data) & ( overview_data > overview_upper ) if np.any(clipped): overview_axis.scatter( wave[clipped], np.full(np.count_nonzero(clipped), 0.985 * overview_upper), marker="^", s=12, facecolor=_TCC_COLORS["data_smooth"], edgecolor="none", alpha=0.7, zorder=8, ) result.metadata["qa_overview_clipped_peak_count"] = int( np.count_nonzero(clipped) ) host_state = ( "decomposed with pPXF" if result.host_decomp_enabled or result.metadata.get("host_decomp_enabled", False) else "not decomposed" ) overview_annotation_lines = [ rf"$\chi^2_\nu(\mathrm{{cont., fitted\ pixels}})=" f"{_format_reduced_chi2(result.continuum.reduced_chi2)}$", f"Host: {host_state}", ] if host_overview: overview_annotation_lines.append( "Zoom panels: host-subtracted spectrum" ) lya_status = result.metadata.get("lya_coverage_status") if lya_status in ("red_side_only", "edge_truncated"): overview_annotation_lines.append( "Lyα: " + ( "limited red-side fit" if lya_status == "red_side_only" else "edge-truncated; not fitted" ) ) host_fraction_annotation = _host_fraction_annotation(result) if host_fraction_annotation: overview_annotation_lines.extend(host_fraction_annotation.splitlines()) overview_annotation = "\n".join(overview_annotation_lines) overview_axis.text( 0.01, 0.97, overview_annotation, transform=overview_axis.transAxes, ha="left", va="top", fontsize=9, bbox={ "boxstyle": "round,pad=0.25", "facecolor": "white", "edgecolor": "0.75", "alpha": 0.72, }, ) result.metadata["qa_overview_annotation"] = { "continuum_reduced_chi2": float(result.continuum.reduced_chi2), "host_state": host_state, "zoom_spectrum": ( "host-subtracted" if host_overview else "input" ), "host_fractions": host_fraction_annotation, "ra": result.metadata.get("ra") if config.show_coordinates else None, "dec": result.metadata.get("dec") if config.show_coordinates else None, } handles, labels = _deduplicated_legend_items(overview_axis) if handles: fig.legend( handles, labels, fontsize=10, ncol=min(5, len(handles)), loc="upper center", bbox_to_anchor=(0.5, 1.0), framealpha=0.82, borderpad=0.35, handlelength=2.4, ) layout_engine = fig.get_layout_engine() if layout_engine is not None: layout_engine.set(rect=(0.0, 0.0, 1.0, 0.91)) if residual_axis is not None: residual_mask = ( fitted_mask & overview_valid & np.isfinite(overview_data) & np.isfinite(overview_full_model) & np.isfinite(spectrum.err) & (spectrum.err > 0) ) normalized_residual = np.full_like(overview_data, np.nan, dtype=float) normalized_residual[residual_mask] = ( overview_data[residual_mask] - overview_full_model[residual_mask] ) / fit_error_display[residual_mask] residual_axis.plot( wave, normalized_residual, color=_TCC_COLORS["total_model"], lw=0.55, ) for value, linestyle, alpha in ( (0.0, "-", 0.7), (3.0, "--", 0.45), (-3.0, "--", 0.45), ): residual_axis.axhline( value, color=_TCC_COLORS["data_smooth"], lw=0.7, ls=linestyle, alpha=alpha, zorder=0, ) if config.show_fit_regions: _shade_mask_regions( residual_axis, wave, unmodelled_mask, ppxf_masked, labels=False, ) residual_axis.set_ylim(-5.5, 5.5) residual_axis.set_ylabel( r"$\Delta/\sigma$", fontsize=11, ) residual_axis.tick_params(axis="x", labelbottom=False) _configure_qa_axis(residual_axis) result.metadata["qa_residual_definition"] = "(data-model)/sigma" result.metadata["qa_residual_reference_lines"] = [0.0, -3.0, 3.0] result.metadata["qa_n_residual_pixels"] = int( np.count_nonzero(residual_mask) ) for zoom_index, (axis, complex_name) in enumerate(zip(zoom_axes, available)): lo, hi = _COMPLEX_WINDOWS[complex_name] panel_mask = valid & (wave >= lo) & (wave <= hi) try: title = complex_recipes.get(complex_name).label except ValueError: title = complex_name fit = result.line_complexes[complex_name] title = ( f"{title} | " rf"$\chi^2_\nu={_format_reduced_chi2(fit.reduced_chi2)}$" ) if complex_name == "lya_nv": coverage_status = fit.metadata.get("lya_coverage_status") if coverage_status == "red_side_only": title = ( r"Ly$\alpha$ / N V" + "\n" + rf"$\chi^2_\nu={_format_reduced_chi2(fit.reduced_chi2)}$" + "; limited" ) elif not fit.metadata.get("lya_fit_reliable", False): title = ( r"Ly$\alpha$ / N V" + "\n" + rf"$\chi^2_\nu={_format_reduced_chi2(fit.reduced_chi2)}$" + "; unreliable" ) result.metadata.setdefault("qa_zoom_titles", {})[complex_name] = title plot_observed( axis, panel_mask, data_values=fit_data_display, smoothed_values=smoothed_fit_data, labels=False, ) axis.plot( wave, np.where(panel_mask & valid, full_model_display, np.nan), color=_TCC_COLORS["total_model"], lw=1.8, label="_nolegend_", zorder=6, ) plot_continuum_components(axis, panel_mask, labels=False) axis.set_title(title, fontsize=12) _configure_qa_axis(axis) combined = _combined_broad_profile(fit) axis.plot( wave[panel_mask], display_scale * combined[panel_mask], label="broad-line model", **_COMBINED_BROAD_STYLE, ) broad_component_label_used = False broad_names = set(_broad_component_names(fit)) for component_name, component in fit.component_models.items(): if component_name in broad_names: if complex_name == "lya_nv": component_label = ( "Lyα component" if component_name.lower().startswith("lya") else "N V component" ) else: component_label = ( "broad components" if not broad_component_label_used else "_nolegend_" ) axis.plot( wave[panel_mask], display_scale * component[panel_mask], label=component_label, **_BROAD_COMPONENT_STYLE, ) broad_component_label_used = True continue kind = "wing" if "wing" in component_name else "narrow" style = _WING_STYLE if kind == "wing" else _NARROW_STYLE axis.plot( wave[panel_mask], display_scale * component[panel_mask], label=( "outflow wing" if kind == "wing" else "narrow lines" ), **style, ) if complex_name == "lya_nv": excluded = ( np.asarray(fit.excluded_mask, dtype=bool) if fit.excluded_mask is not None else np.zeros_like(panel_mask) ) excluded &= panel_mask if np.any(excluded): axis.scatter( wave[excluded], fit_data_display[excluded], marker="x", s=18, linewidths=0.8, color=_TCC_COLORS["outflow"], label="masked absorption", zorder=8, ) limits = axis.get_ylim() zoom_upper = _rounded_model_upper_limit( full_model_display[panel_mask] ) axis.set_ylim(0.0, max(zoom_upper if zoom_upper is not None else limits[1], 0.0)) result.metadata.setdefault("qa_zoom_model_upper_limits", {})[ complex_name ] = zoom_upper result.metadata.setdefault("qa_zoom_ymin", {})[complex_name] = 0.0 axis.set_xlim(lo, hi) axis.axhline(0.0, color="0.55", lw=0.65, zorder=0) result.metadata.setdefault("qa_zoom_line_labels", {})[complex_name] = list( _annotate_emission_lines( axis, _ZOOM_EMISSION_LINES.get(complex_name, ()), y_fraction=0.82, ) ) handles, labels = _deduplicated_legend_items(axis) allowed = { "broad components", "narrow lines", "outflow wing", "Lyα component", "N V component", "masked absorption", } local_items = [ (handle, label) for handle, label in zip(handles, labels) if label in allowed ] if local_items: axis.legend( [item[0] for item in local_items], [item[1] for item in local_items], fontsize=8, loc="best", framealpha=0.72, borderpad=0.35, ) if zoom_index == 0: axis.set_ylabel( _flux_density_axis_label(spectrum), fontsize=13, ) if not available: zoom_axes[0].set_visible(False) fig.supxlabel(r"Rest wavelength [$\mathrm{\AA}$]", fontsize=13) result.metadata["qa_shared_axis_labels"] = False result.metadata["qa_y_label_policy"] = "overview_and_leftmost_zoom" if return_figure: return fig if resolved_paths is None: raise ValueError("QA output paths are required when saving a figure.") saved = _save_figure(fig, resolved_paths) plt.close(fig) return saved def plot_qa_figure( result: WorkflowResult, plot_config: Optional[GlobalQAPlotConfig] = None, ): """Return an open Matplotlib QA figure without writing a file.""" return _plot_qa( result, None, plot_config or GlobalQAPlotConfig(), return_figure=True, ) @_science_plot_style def _plot_hbeta( result: WorkflowResult, paths: Union[Path, Mapping[str, Path]], config: GlobalQAPlotConfig, ) -> Dict[str, str]: import matplotlib.pyplot as plt paths = _coerce_plot_paths(paths) fit = result.hbeta if fit is None: raise ValueError("Hβ diagnostic requested when Hβ was not fitted.") view = (fit.wave_rest >= 4600.0) & (fit.wave_rest <= 5120.0) display_scale = _flux_display_scale(result.spectrum) fig, ax = plt.subplots( figsize=(config.figure_width, 3.8), constrained_layout=True, ) ax.plot( fit.wave_rest[view], display_scale * fit.flux_continuum_subtracted[view], color="0.45", lw=0.65, label="continuum-subtracted data", ) ax.plot( fit.wave_rest[view], display_scale * fit.model[view], color="black", lw=1.8, label="full line model", ) broad_label_used = False narrow_label_used = False wing_label_used = False for name, component in fit.component_models.items(): if "broad" in name and "wing" not in name: style = _BROAD_COMPONENT_STYLE label = "broad components" if not broad_label_used else "_nolegend_" broad_label_used = True elif "wing" in name: style = _WING_STYLE label = "outflow wing" if not wing_label_used else "_nolegend_" wing_label_used = True else: style = _NARROW_STYLE label = "narrow line" if not narrow_label_used else "_nolegend_" narrow_label_used = True ax.plot( fit.wave_rest[view], display_scale * component[view], label=label, **style, ) upper = _rounded_model_upper_limit(display_scale * fit.model[view]) if upper is not None: ax.set_ylim(0.0, upper) ax.set_xlim(4600.0, 5120.0) ax.set_xlabel(r"Rest wavelength [$\mathrm{\AA}$]", fontsize=13) ax.set_ylabel( _flux_density_axis_label(result.spectrum), fontsize=13, ) ax.set_title( "Hβ / [O III] diagnostic" + ( f" — {_qa_overview_title(result, config)}" if _qa_overview_title(result, config) else "" ), fontsize=12, ) _configure_qa_axis(ax) ax.legend(fontsize=9, ncol=3, framealpha=0.72) saved = _save_figure(fig, paths) plt.close(fig) return saved def _measurement_row(fit) -> Dict[str, object]: row = {} row.update(fit.param_values) row.update({f"{key}_err": value for key, value in fit.param_errors.items()}) row.update(fit.metrics) row.update({f"{key}_err": value for key, value in fit.metric_errors.items()}) row.update( { "success": fit.success, "selected_model": fit.selected_model, "reduced_chi2": fit.reduced_chi2, "bic": fit.bic, } ) return row def _write_complex_products(out: Path, name: str, fit, files: Dict[str, str]) -> None: filenames = { "mgii": ("mgii_measurements.csv", "mgii_model.csv"), "hbeta": ("hbeta_oiii_measurements.csv", "hbeta_oiii_model.csv"), "hbeta_oiii": ("hbeta_oiii_measurements.csv", "hbeta_oiii_model.csv"), "halpha": ("halpha_nii_sii_measurements.csv", "halpha_nii_sii_model.csv"), "halpha_nii_sii": ("halpha_nii_sii_measurements.csv", "halpha_nii_sii_model.csv"), } measurement_name, model_name = filenames.get( name, (f"{name}_measurements.csv", f"{name}_model.csv") ) measurement_path = out / measurement_name pd.DataFrame([_measurement_row(fit)]).to_csv(measurement_path, index=False) files[f"{name}_measurements_csv"] = str(measurement_path) legacy_name = { "hbeta_oiii": "hbeta", "halpha_nii_sii": "halpha", }.get(name) if legacy_name is not None: files[f"{legacy_name}_measurements_csv"] = str(measurement_path) if name in _COMPLEX_WINDOWS: lo, hi = _COMPLEX_WINDOWS[name] elif np.any(fit.fit_mask): lo = float(np.nanmin(fit.wave_rest[fit.fit_mask])) hi = float(np.nanmax(fit.wave_rest[fit.fit_mask])) else: lo, hi = float(fit.wave_rest.min()), float(fit.wave_rest.max()) view = (fit.wave_rest >= lo) & (fit.wave_rest <= hi) model_grid = { "wave_rest": fit.wave_rest[view], "continuum_subtracted": fit.flux_continuum_subtracted[view], "err": fit.err[view], "model": fit.model[view], "fit_used": fit.fit_mask[view].astype(int), } for component_name, component in fit.component_models.items(): model_grid[component_name] = component[view] model_path = out / model_name pd.DataFrame(model_grid).to_csv(model_path, index=False) files[f"{name}_model_csv"] = str(model_path) if legacy_name is not None: files[f"{legacy_name}_model_csv"] = str(model_path)
[docs] def write_global_line_products( result: WorkflowResult, output_dir: str, qa_plot_config: Optional[GlobalQAPlotConfig] = None, ) -> Dict[str, str]: """Write standard global-continuum and multi-complex products.""" out = Path(output_dir) out.mkdir(parents=True, exist_ok=True) files: Dict[str, str] = {} plot_config = qa_plot_config or GlobalQAPlotConfig() file_object_name = _qa_object_name(result, plot_config) result.metadata["qa_file_object_name"] = file_object_name result.metadata["qa_output_format"] = plot_config.output_format result.metadata["write_other_diagnostics"] = bool( plot_config.write_other_diagnostics ) summary_path = out / "qsospec_global_lines_summary.json" compatibility_summary_path = out / "qsospec_global_hbeta_summary.json" summary_payload = json.dumps( result.summary(), indent=2, sort_keys=True, default=_json_default ) summary_path.write_text( summary_payload, encoding="utf-8", ) compatibility_summary_path.write_text( summary_payload, encoding="utf-8", ) files["summary_json"] = str(summary_path) files["compatibility_summary_json"] = str(compatibility_summary_path) continuum_row = {} continuum_row.update(result.continuum.param_values) continuum_row.update({f"{key}_err": value for key, value in result.continuum.param_errors.items()}) continuum_row.update(result.metadata.get("continuum_samples", {})) for key in ( "balmer_pseudocontinuum_implied_hbeta_flux_input", "balmer_pseudocontinuum_implied_hbeta_flux_cgs", "balmer_pseudocontinuum_fwhm_kms", "balmer_pseudocontinuum_velocity_kms", "balmer_pseudocontinuum_edge_flux_density_input", "balmer_pseudocontinuum_template_provenance", "balmer_pseudocontinuum_n_min", "balmer_pseudocontinuum_n_max", "balmer_pseudocontinuum_fwhm_source", "balmer_pseudocontinuum_fwhm_synced_to_hbeta", "balmer_pseudocontinuum_fwhm_warning_codes", "balmer_pseudocontinuum_fwhm_snr", "hbeta_sync_requested", "hbeta_sync_attempted", "hbeta_sync_converged", "hbeta_sync_iterations", ): if key in result.continuum.metadata: continuum_row[key] = result.continuum.metadata[key] continuum_row.update( { "success": result.continuum.success, "reduced_chi2": result.continuum.reduced_chi2, "balmer_template": result.continuum.metadata.get("balmer_template"), } ) continuum_path = out / "global_continuum_measurements.csv" pd.DataFrame([continuum_row]).to_csv(continuum_path, index=False) files["continuum_measurements_csv"] = str(continuum_path) for complex_name, fit in result.line_complexes.items(): _write_complex_products(out, complex_name, fit, files) spectrum = result.spectrum grid = { "wave_obs": spectrum.wave_obs, "wave_rest": spectrum.wave_rest, "flux_fit_input": spectrum.flux, "err": spectrum.err, "global_continuum": result.continuum.model, "continuum_subtracted": spectrum.flux - result.continuum.model, "fit_used_continuum": result.continuum.clip_mask.astype(int), "full_model": result.continuum.model + _full_line_model(result), } if result.total_spectrum is not None: grid["flux_total_before_host"] = result.total_spectrum.flux if result.host_model_on_quasar_grid is not None: grid["ppxf_host_model"] = result.host_model_on_quasar_grid for name, component in result.continuum.component_models.items(): grid[f"continuum_{name}"] = component for complex_name, fit in result.line_complexes.items(): grid[f"{complex_name}_model"] = fit.model grid[f"fit_used_{complex_name}"] = fit.fit_mask.astype(int) for component_name, component in fit.component_models.items(): grid[f"line_{complex_name}_{component_name}"] = component grid_path = out / "qsospec_global_lines_full_grid.csv" pd.DataFrame(grid).to_csv(grid_path, index=False) files["full_grid_csv"] = str(grid_path) compatibility_grid_path = out / "qsospec_global_hbeta_full_grid.csv" pd.DataFrame(grid).to_csv(compatibility_grid_path, index=False) files["compatibility_full_grid_csv"] = str(compatibility_grid_path) main_qa_paths = _plot_paths( out, f"main_qa_{file_object_name}", plot_config, ) main_qa_files = _plot_qa( result, main_qa_paths, plot_config, ) for file_format, path in main_qa_files.items(): files[f"main_qa_{file_format}"] = path primary_main_qa = main_qa_files.get( "png", next(iter(main_qa_files.values())), ) files["main_qa"] = primary_main_qa files["global_plot"] = primary_main_qa files["qa_plot"] = primary_main_qa if plot_config.write_other_diagnostics and _has_host_context(result): host_context_files = _plot_host_context( result, _plot_paths( out, "diagnostic_global_host_context", plot_config, ), config=plot_config, ) for file_format, path in host_context_files.items(): files[f"host_context_plot_{file_format}"] = path files["host_context_plot"] = host_context_files.get( "png", next(iter(host_context_files.values())), ) result.metadata["host_context_plot_created"] = True else: result.metadata["host_context_plot_created"] = False if plot_config.write_other_diagnostics: balmer_edge_files = _plot_global( result, _plot_paths(out, "diagnostic_balmer_edge", plot_config), plot_config, window=(3300.0, 4300.0), ) for file_format, path in balmer_edge_files.items(): files[f"balmer_edge_plot_{file_format}"] = path files["balmer_edge_plot"] = balmer_edge_files.get( "png", next(iter(balmer_edge_files.values())), ) if result.hbeta is not None: hbeta_files = _plot_hbeta( result, _plot_paths( out, "diagnostic_hbeta_oiii", plot_config, ), plot_config, ) for file_format, path in hbeta_files.items(): files[f"hbeta_plot_{file_format}"] = path files["hbeta_plot"] = hbeta_files.get( "png", next(iter(hbeta_files.values())), ) result.output_files.update(files) summary_payload = json.dumps( result.summary(), indent=2, sort_keys=True, default=_json_default ) summary_path.write_text(summary_payload, encoding="utf-8") compatibility_summary_path.write_text(summary_payload, encoding="utf-8") return files
[docs] def write_global_hbeta_products( result: WorkflowResult, output_dir: str, qa_plot_config: Optional[GlobalQAPlotConfig] = None, ) -> Dict[str, str]: """Compatibility wrapper retaining H-beta summary/full-grid paths.""" files = write_global_line_products(result, output_dir, qa_plot_config) files["generic_summary_json"] = files["summary_json"] files["generic_full_grid_csv"] = files["full_grid_csv"] files["summary_json"] = files["compatibility_summary_json"] files["full_grid_csv"] = files["compatibility_full_grid_csv"] result.output_files.update(files) summary_payload = json.dumps( result.summary(), indent=2, sort_keys=True, default=_json_default ) Path(files["summary_json"]).write_text(summary_payload, encoding="utf-8") Path(files["generic_summary_json"]).write_text(summary_payload, encoding="utf-8") return files