Source code for qsospec.solvers.variable_projection

"""Bounded variable projection for separable nonlinear least squares."""

from __future__ import annotations

from dataclasses import dataclass
from types import SimpleNamespace
from typing import Callable, Optional, Sequence, Tuple

import numpy as np
from scipy.optimize import least_squares, lsq_linear


class VariableProjectionError(RuntimeError):
    """Raised when the bounded variable-projection solver cannot continue."""


DesignEvaluator = Callable[[np.ndarray, bool], Tuple[np.ndarray, Optional[Sequence[np.ndarray]]]]


@dataclass
class VariableProjectionState:
    """One cached nonlinear evaluation and its bounded linear solution."""

    nonlinear: np.ndarray
    design: np.ndarray
    derivatives: Optional[Tuple[np.ndarray, ...]]
    linear: np.ndarray
    linear_active_mask: np.ndarray
    residual: np.ndarray


@dataclass
class VariableProjectionResult:
    """Reduced optimizer result plus the final bounded linear solution."""

    nonlinear: np.ndarray
    linear: np.ndarray
    residual: np.ndarray
    reduced_jacobian: np.ndarray
    design: np.ndarray
    design_derivatives: Tuple[np.ndarray, ...]
    linear_active_mask: np.ndarray
    nonlinear_active_mask: np.ndarray
    success: bool
    status: int
    message: str
    nfev: int
    njev: int
    linear_solve_count: int


class _VariableProjectionProblem:
    def __init__(
        self,
        flux: np.ndarray,
        err: np.ndarray,
        linear_bounds: Tuple[np.ndarray, np.ndarray],
        evaluator: DesignEvaluator,
    ):
        self.flux = np.asarray(flux, dtype=float)
        self.err = np.asarray(err, dtype=float)
        self.weighted_flux = self.flux / self.err
        self.linear_lower = np.asarray(linear_bounds[0], dtype=float)
        self.linear_upper = np.asarray(linear_bounds[1], dtype=float)
        self.evaluator = evaluator
        self.linear_solve_count = 0
        self._state: Optional[VariableProjectionState] = None

    def state(self, nonlinear: np.ndarray, need_derivatives: bool) -> VariableProjectionState:
        nonlinear = np.asarray(nonlinear, dtype=float)
        if (
            self._state is not None
            and np.array_equal(nonlinear, self._state.nonlinear)
        ):
            if not need_derivatives or self._state.derivatives is not None:
                return self._state
            design, derivatives = self.evaluator(nonlinear, True)
            design = np.asarray(design, dtype=float)
            derivatives = tuple(np.asarray(item, dtype=float) for item in derivatives or ())
            if (
                design.shape != self._state.design.shape
                or not np.array_equal(design, self._state.design)
                or len(derivatives) != nonlinear.size
                or any(
                    item.shape != design.shape or not np.all(np.isfinite(item))
                    for item in derivatives
                )
            ):
                raise VariableProjectionError(
                    "Variable-projection derivative evaluation is inconsistent with its cached design."
                )
            self._state.derivatives = derivatives
            return self._state

        design, derivatives = self.evaluator(nonlinear, need_derivatives)
        design = np.asarray(design, dtype=float)
        if design.ndim != 2 or design.shape[0] != self.flux.size:
            raise VariableProjectionError("Variable-projection design matrix has an invalid shape.")
        if not np.all(np.isfinite(design)):
            raise VariableProjectionError("Variable-projection design matrix contains non-finite values.")
        if need_derivatives:
            if derivatives is None or len(derivatives) != nonlinear.size:
                raise VariableProjectionError("Variable-projection derivative count is invalid.")
            derivatives = tuple(np.asarray(item, dtype=float) for item in derivatives)
            if any(item.shape != design.shape or not np.all(np.isfinite(item)) for item in derivatives):
                raise VariableProjectionError("Variable-projection derivatives are invalid.")
        else:
            derivatives = None

        weighted_design = design / self.err[:, None]
        linear_result = lsq_linear(
            weighted_design,
            self.weighted_flux,
            bounds=(self.linear_lower, self.linear_upper),
            method="bvls",
        )
        self.linear_solve_count += 1
        if not linear_result.success or not np.all(np.isfinite(linear_result.x)):
            raise VariableProjectionError(
                f"Bounded linear solve failed: {getattr(linear_result, 'message', 'unknown error')}"
            )
        linear = np.asarray(linear_result.x, dtype=float)
        residual = self.weighted_flux - weighted_design @ linear
        if not np.all(np.isfinite(residual)):
            raise VariableProjectionError("Variable-projection residual contains non-finite values.")
        self._state = VariableProjectionState(
            nonlinear=nonlinear.copy(),
            design=design,
            derivatives=derivatives,
            linear=linear,
            linear_active_mask=np.asarray(linear_result.active_mask, dtype=int),
            residual=residual,
        )
        return self._state

    def residual(self, nonlinear: np.ndarray) -> np.ndarray:
        return self.state(nonlinear, need_derivatives=False).residual

    def jacobian(self, nonlinear: np.ndarray) -> np.ndarray:
        state = self.state(nonlinear, need_derivatives=True)
        weighted_design = state.design / self.err[:, None]
        free = state.linear_active_mask == 0
        free_design = weighted_design[:, free]
        free_information_inverse = (
            np.linalg.pinv(free_design.T @ free_design) if np.any(free) else None
        )
        jacobian = np.empty((self.flux.size, nonlinear.size), dtype=float)
        for index, derivative in enumerate(state.derivatives or ()):
            weighted_derivative = derivative / self.err[:, None]
            direct = weighted_derivative @ state.linear
            if np.any(free):
                rhs = weighted_derivative[:, free].T @ state.residual - free_design.T @ direct
                coefficient_derivative = free_information_inverse @ rhs
                jacobian[:, index] = -direct - free_design @ coefficient_derivative
            else:
                jacobian[:, index] = -direct
        if not np.all(np.isfinite(jacobian)):
            raise VariableProjectionError("Variable-projection Jacobian contains non-finite values.")
        return jacobian


[docs] def solve_variable_projection( flux: np.ndarray, err: np.ndarray, nonlinear_initial: np.ndarray, nonlinear_bounds: Tuple[np.ndarray, np.ndarray], linear_bounds: Tuple[np.ndarray, np.ndarray], evaluator: DesignEvaluator, *, jacobian_method: str = "semi_analytic", max_nfev: Optional[int] = None, ) -> VariableProjectionResult: """Solve a bounded separable nonlinear least-squares problem.""" if jacobian_method not in ("semi_analytic", "2-point"): raise ValueError("jacobian_method must be 'semi_analytic' or '2-point'.") nonlinear_initial = np.asarray(nonlinear_initial, dtype=float) nonlinear_lower = np.asarray(nonlinear_bounds[0], dtype=float) nonlinear_upper = np.asarray(nonlinear_bounds[1], dtype=float) problem = _VariableProjectionProblem(flux, err, linear_bounds, evaluator) if nonlinear_initial.size: nonlinear_result = least_squares( problem.residual, nonlinear_initial, bounds=(nonlinear_lower, nonlinear_upper), jac=problem.jacobian if jacobian_method == "semi_analytic" else "2-point", x_scale="jac", max_nfev=max_nfev, ) if not nonlinear_result.success: raise VariableProjectionError(str(nonlinear_result.message)) final_state = problem.state(nonlinear_result.x, need_derivatives=True) reduced_jacobian = problem.jacobian(nonlinear_result.x) nonlinear_active_mask = np.asarray(nonlinear_result.active_mask, dtype=int) success = bool(nonlinear_result.success) status = int(nonlinear_result.status) message = str(nonlinear_result.message) nfev = int(nonlinear_result.nfev) njev = int(getattr(nonlinear_result, "njev", 0) or 0) else: final_state = problem.state(nonlinear_initial, need_derivatives=True) reduced_jacobian = np.empty((np.asarray(flux).size, 0), dtype=float) nonlinear_active_mask = np.empty(0, dtype=int) success = True status = 1 message = "Bounded linear least-squares solution." nfev = 1 njev = 0 return VariableProjectionResult( nonlinear=final_state.nonlinear.copy(), linear=final_state.linear.copy(), residual=final_state.residual.copy(), reduced_jacobian=reduced_jacobian, design=final_state.design.copy(), design_derivatives=tuple(item.copy() for item in final_state.derivatives or ()), linear_active_mask=final_state.linear_active_mask.copy(), nonlinear_active_mask=nonlinear_active_mask, success=success, status=status, message=message, nfev=nfev, njev=njev, linear_solve_count=problem.linear_solve_count, )
def evaluate_profile_chi2( flux: np.ndarray, err: np.ndarray, nonlinear: np.ndarray, linear_bounds: Tuple[np.ndarray, np.ndarray], evaluator: DesignEvaluator, ) -> float: """Evaluate the bounded linear profile objective at fixed nonlinear values.""" problem = _VariableProjectionProblem(flux, err, linear_bounds, evaluator) residual = problem.residual(np.asarray(nonlinear, dtype=float)) return float(np.sum(residual**2)) def optimizer_result_adapter( *, full_x: np.ndarray, full_jacobian: np.ndarray, full_active_mask: np.ndarray, result: VariableProjectionResult, ): """Return the subset of ``OptimizeResult`` attributes used by qsospec.""" return SimpleNamespace( x=np.asarray(full_x, dtype=float), jac=np.asarray(full_jacobian, dtype=float), active_mask=np.asarray(full_active_mask, dtype=int), success=bool(result.success), status=int(result.status), message=str(result.message), nfev=int(result.nfev), njev=int(result.njev), linear_solve_count=int(result.linear_solve_count), )