diff --git a/flake.lock b/flake.lock index 9c4950885..3bc01c3ec 100644 --- a/flake.lock +++ b/flake.lock @@ -20,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1714076141, - "narHash": "sha256-Drmja/f5MRHZCskS6mvzFqxEaZMeciScCTFxWVLqWEY=", + "lastModified": 1721379653, + "narHash": "sha256-8MUgifkJ7lkZs3u99UDZMB4kbOxvMEXQZ31FO3SopZ0=", "owner": "nixos", "repo": "nixpkgs", - "rev": "7bb2ccd8cdc44c91edba16c48d2c8f331fb3d856", + "rev": "1d9c2c9b3e71b9ee663d11c5d298727dace8d374", "type": "github" }, "original": { diff --git a/glotaran/builtin/elements/baseline/element.py b/glotaran/builtin/elements/baseline/element.py index 3bc7e17ae..da42648da 100644 --- a/glotaran/builtin/elements/baseline/element.py +++ b/glotaran/builtin/elements/baseline/element.py @@ -7,6 +7,7 @@ import numpy as np from glotaran.model.element import Element +from glotaran.model.element import ElementResult if TYPE_CHECKING: import xarray as xr @@ -34,11 +35,15 @@ def calculate_matrix( matrix = np.ones((model_axis.size, 1), dtype=np.float64) return clp_label, matrix - def add_to_result_data( + def create_result( self, model: DataModel, - data: xr.Dataset, - as_global: bool = False, - ): - if not as_global: - data["baseline"] = data.clp.sel(clp_label=self.clp_label()) + global_dimension: str, + model_dimension: str, + amplitudes: xr.Dataset, + concentrations: xr.Dataset, + ) -> ElementResult: + return ElementResult( + amplitudes={"baseline": amplitudes.sel(amplitude_label=self.clp_label())}, + concentrations={}, + ) diff --git a/glotaran/builtin/elements/coherent_artifact/element.py b/glotaran/builtin/elements/coherent_artifact/element.py index b8ca952be..9b5990c78 100644 --- a/glotaran/builtin/elements/coherent_artifact/element.py +++ b/glotaran/builtin/elements/coherent_artifact/element.py @@ -6,17 +6,18 @@ import numba as nb import numpy as np -import xarray as xr from glotaran.builtin.items.activation import ActivationDataModel from glotaran.builtin.items.activation import MultiGaussianActivation -from glotaran.builtin.items.activation import add_activation_to_result_data -from glotaran.model.data_model import DataModel # noqa: TCH001 from glotaran.model.element import Element +from glotaran.model.element import ElementResult from glotaran.model.errors import GlotaranModelError from glotaran.model.item import ParameterType # noqa: TCH001 if TYPE_CHECKING: + import xarray as xr + + from glotaran.model.data_model import DataModel from glotaran.typing.types import ArrayLike @@ -37,117 +38,79 @@ def calculate_matrix( # type:ignore[override] if not 1 <= self.order <= 3: raise GlotaranModelError("Coherent artifact order must be between in [1,3]") - activations = [a for a in model.activation if isinstance(a, MultiGaussianActivation)] - - matrices = [] - activation_indices = [] - for i, activation in enumerate(activations): - if self.label not in activation.compartments: - continue - activation_indices.append(i) - parameters = activation.parameters(global_axis) - - matrix_shape = (model_axis.size, self.order) - index_dependent = any(isinstance(p, list) for p in parameters) - if index_dependent: - matrix_shape = (global_axis.size, *matrix_shape) # type:ignore[assignment] - matrix = np.zeros(matrix_shape, dtype=np.float64) - if index_dependent: - _calculate_coherent_artifact_matrix( - matrix, - np.array([ps[0].center for ps in parameters]), # type:ignore[index] - np.array( - [self.width or ps[0].width for ps in parameters] # type:ignore[index] - ), - global_axis.size, - model_axis, - self.order, - ) - - else: - _calculate_coherent_artifact_matrix_on_index( - matrix, - parameters[0].center, # type:ignore[union-attr] - self.width or parameters[0].width, # type:ignore[union-attr] - model_axis, - self.order, - ) - matrix *= activation.compartments[self.label] # type:ignore[arg-type] - matrices.append(matrix) - - if not len(matrices): + activations = [ + a + for a in model.activation + if isinstance(a, MultiGaussianActivation) and self.label in a.compartments + ] + + if not len(activations): raise GlotaranModelError( f'No (multi-)gaussian activation for coherent-artifact "{self.label}".' ) + if len(activations) > 1: + raise GlotaranModelError( + f'Coherent artifact "{self.label}" must be associated with exactly one activation.' + ) + activation = activations[0] + + parameters = activation.parameters(global_axis) + + matrix_shape = (model_axis.size, self.order) + index_dependent = any(isinstance(p, list) for p in parameters) + if index_dependent: + matrix_shape = (global_axis.size, *matrix_shape) # type:ignore[assignment] + matrix = np.zeros(matrix_shape, dtype=np.float64) + if index_dependent: + _calculate_coherent_artifact_matrix( + matrix, + np.array([ps[0].center for ps in parameters]), # type:ignore[index] + np.array( + [self.width or ps[0].width for ps in parameters] # type:ignore[index] + ), + global_axis.size, + model_axis, + self.order, + ) + + else: + _calculate_coherent_artifact_matrix_on_index( + matrix, + parameters[0].center, # type:ignore[union-attr] + self.width or parameters[0].width, # type:ignore[union-attr] + model_axis, + self.order, + ) + matrix *= activation.compartments[self.label] # type:ignore[arg-type] - clp_axis = [] - for i in activation_indices: - clp_axis += [f"{label}_activation_{i}" for label in self.compartments()] - return clp_axis, np.concatenate(matrices, axis=len(matrices[0].shape) - 1) + return self.compartments, matrix + @property def compartments(self): return [f"coherent_artifact_{self.label}_order_{i}" for i in range(1, self.order + 1)] - def add_to_result_data( # type:ignore[override] + def create_result( self, - model: ActivationDataModel, - data: xr.Dataset, - as_global: bool = False, - ): - add_activation_to_result_data(model, data) - if "coherent_artifact_order" in data.coords: - return - - data_matrix = data.global_matrix if "global_matrix" in data else data.matrix - elements = [m for m in model.elements if isinstance(m, CoherentArtifactElement)] - nr_activations = data.gaussian_activation.size - matrices = [] - estimations = [] - for coherent_artifact in elements: - artifact_matrices = [] - artifact_estimations = [] - activation_indices = [] - for i in range(nr_activations): - clp_axis = [ - label - for label in data.clp_label.data - if label.startswith(f"coherent_artifact_{coherent_artifact.label}") - and label.endswith(f"_activation_{i}") - ] - if not len(clp_axis): - continue - activation_indices.append(i) - order = [label.split("_activation_")[0].split("_order")[1] for label in clp_axis] - - artifact_matrices.append( - data_matrix.sel(clp_label=clp_axis) - .rename(clp_label="coherent_artifact_order") - .assign_coords({"coherent_artifact_order": order}) - ) - if "global_matrix" not in data: - artifact_estimations.append( - data.clp.sel(clp_label=clp_axis) - .rename(clp_label="coherent_artifact_order") - .assign_coords({"coherent_artifact_order": order}) - ) - matrices.append( - xr.concat(artifact_matrices, dim="gaussian_activation").assign_coords( - {"gaussian_activation": activation_indices} - ) - ) - if "global_matrix" not in data: - estimations.append( - xr.concat(artifact_estimations, dim="gaussian_activation").assign_coords( - {"gaussian_activation": activation_indices} - ) - ) - data["coherent_artifact_response"] = xr.concat( - matrices, dim="coherent_artifact" - ).assign_coords({"coherent_artifact": [m.label for m in elements]}) - if "global_matrix" not in data: - data["coherent_artifact_associated_estimation"] = xr.concat( - estimations, dim="coherent_artifact" - ).assign_coords({"coherent_artifact": [m.label for m in elements]}) + model: ActivationDataModel, # type:ignore[override] + global_dimension: str, + model_dimension: str, + amplitudes: xr.Dataset, + concentrations: xr.Dataset, + ) -> ElementResult: + amplitude = ( + amplitudes.sel(amplitude_label=self.compartments) + .rename(amplitude_label="coherent_artifact_order") + .assign_coords({"coherent_artifact_order": range(1, self.order + 1)}) + ) + concentration = ( + concentrations.sel(amplitude_label=self.compartments) + .rename(amplitude_label="coherent_artifact_order") + .assign_coords({"coherent_artifact_order": range(1, self.order + 1)}) + ) + return ElementResult( + amplitudes={"coherent_artifact": amplitude}, + concentrations={"coherent_artifact": concentration}, + ) @nb.jit(nopython=True, parallel=False) diff --git a/glotaran/builtin/elements/damped_oscillation/element.py b/glotaran/builtin/elements/damped_oscillation/element.py index 492ba3b1c..cc2cde643 100644 --- a/glotaran/builtin/elements/damped_oscillation/element.py +++ b/glotaran/builtin/elements/damped_oscillation/element.py @@ -5,6 +5,7 @@ from typing import Literal import numpy as np +import xarray as xr from glotaran.builtin.elements.damped_oscillation.matrix import ( calculate_damped_oscillation_matrix_gaussian_activation, @@ -19,12 +20,11 @@ from glotaran.builtin.items.activation import MultiGaussianActivation from glotaran.model.data_model import DataModel # noqa: TCH001 from glotaran.model.element import Element +from glotaran.model.element import ElementResult from glotaran.model.item import Item from glotaran.model.item import ParameterType if TYPE_CHECKING: - import xarray as xr - from glotaran.typing.types import ArrayLike @@ -133,67 +133,70 @@ def calculate_matrix( # type:ignore[override] return clp_axis, matrix - def add_to_result_data( # type:ignore[override] + def create_result( self, - model: ActivationDataModel, - data: xr.Dataset, - as_global: bool = False, - ): - prefix = "damped_oscillation" - if as_global or prefix in data: - # not implemented - return - - elements = [m for m in model.elements if isinstance(m, DampedOscillationElement)] - oscillations = [label for m in elements for label in m.oscillations] - frequencies = [o.frequency for m in elements for o in m.oscillations.values()] - rates = [o.rate for m in elements for o in m.oscillations.values()] - - data.coords[f"{prefix}"] = oscillations - data.coords[f"{prefix}_frequency"] = (prefix, frequencies) - data.coords[f"{prefix}_rate"] = (prefix, rates) - - model_dimension = data.attrs["model_dimension"] - global_dimension = data.attrs["global_dimension"] - dim1 = data.coords[global_dimension].size - dim2 = len(oscillations) - doas = np.zeros((dim1, dim2), dtype=np.float64) - phase = np.zeros((dim1, dim2), dtype=np.float64) - for i, label in enumerate(oscillations): - sin = data.clp.sel(clp_label=f"{label}_sin") - cos = data.clp.sel(clp_label=f"{label}_cos") - doas[:, i] = np.sqrt(sin * sin + cos * cos) - phase[:, i] = np.unwrap(np.arctan2(sin, cos)) - - data[f"{prefix}_associated_estimation"] = ((global_dimension, prefix), doas) - - data[f"{prefix}_phase"] = ((global_dimension, prefix), phase) - - if len(data.matrix.shape) > 2: - data[f"{prefix}_sin"] = ( - ( - global_dimension, - model_dimension, - prefix, - ), - data.matrix.sel(clp_label=[f"{label}_sin" for label in oscillations]).to_numpy(), - ) + model: ActivationDataModel, # type:ignore[override] + global_dimension: str, + model_dimension: str, + amplitudes: xr.Dataset, + concentrations: xr.Dataset, + ) -> ElementResult: + oscillations = list(self.oscillations) + frequencies = [self.oscillations[label].frequency for label in oscillations] + rates = [self.oscillations[label].rate for label in oscillations] + + oscillation_coords = { + "damped_oscillation": oscillations, + "damped_oscillation_frequency": ("damped_oscillation", frequencies), + "damped_oscillation_rate": ("damped_oscillation", rates), + } + + sin_label = [f"{label}_sin" for label in oscillations] + cos_label = [f"{label}_cos" for label in oscillations] + + sin_amplitudes = ( + amplitudes.sel(amplitude_label=sin_label) + .rename(amplitude_label="damped_oscillation") + .assign_coords(oscillation_coords) + ) + cos_amplitudes = ( + amplitudes.sel(amplitude_label=cos_label) + .rename(amplitude_label="damped_oscillation") + .assign_coords(oscillation_coords) + ) + doas_amplitudes = np.sqrt(sin_amplitudes**2 + cos_amplitudes**2) + phase_amplitudes = xr.DataArray( + np.unwrap(np.arctan2(sin_amplitudes, cos_amplitudes)), + coords=doas_amplitudes.coords, + ) - data[f"{prefix}_cos"] = ( - ( - global_dimension, - model_dimension, - prefix, - ), - data.matrix.sel(clp_label=[f"{label}_cos" for label in oscillations]).to_numpy(), - ) - else: - data[f"{prefix}_sin"] = ( - (model_dimension, prefix), - data.matrix.sel(clp_label=[f"{label}_sin" for label in oscillations]).to_numpy(), - ) + sin_concentrations = ( + concentrations.sel(amplitude_label=sin_label) + .rename(amplitude_label="damped_oscillation") + .assign_coords(oscillation_coords) + ) + cos_concentrations = ( + concentrations.sel(amplitude_label=cos_label) + .rename(amplitude_label="damped_oscillation") + .assign_coords(oscillation_coords) + ) + doas_concentrations = np.sqrt(sin_concentrations**2 + cos_concentrations**2) + phase_concentrations = xr.DataArray( + np.unwrap(np.arctan2(sin_concentrations, cos_concentrations)), + coords=doas_concentrations.coords, + ) - data[f"{prefix}_cos"] = ( - (model_dimension, prefix), - data.matrix.sel(clp_label=[f"{label}_cos" for label in oscillations]).to_numpy(), - ) + return ElementResult( + amplitudes={ + "damped_oscillation": doas_amplitudes, + "damped_oscillation_phase": phase_amplitudes, + "damped_oscillation_sin": sin_amplitudes, + "damped_oscillation_cos": cos_amplitudes, + }, + concentrations={ + "damped_oscillation": doas_concentrations, + "damped_oscillation_phase": phase_concentrations, + "damped_oscillation_sin": sin_concentrations, + "damped_oscillation_cos": cos_concentrations, + }, + ) diff --git a/glotaran/builtin/elements/kinetic/element.py b/glotaran/builtin/elements/kinetic/element.py index f1df390ed..8152df4be 100644 --- a/glotaran/builtin/elements/kinetic/element.py +++ b/glotaran/builtin/elements/kinetic/element.py @@ -13,12 +13,11 @@ from glotaran.builtin.elements.kinetic.matrix import calculate_matrix_gaussian_activation_on_index from glotaran.builtin.items.activation import ActivationDataModel from glotaran.builtin.items.activation import MultiGaussianActivation -from glotaran.builtin.items.activation import add_activation_to_result_data -from glotaran.model.data_model import DataModel -from glotaran.model.data_model import is_data_model_global +from glotaran.model.element import ElementResult from glotaran.model.element import ExtendableElement if TYPE_CHECKING: + from glotaran.model.data_model import DataModel from glotaran.typing.types import ArrayLike @@ -153,72 +152,83 @@ def calculate_matrix_gaussian_activation( return matrix - def add_to_result_data( # type:ignore[override] - self, model: ActivationDataModel, data: xr.Dataset, as_global: bool - ): - add_activation_to_result_data(model, data) - if "species" in data.coords or is_data_model_global(model): - return - kinetic = self.combine([m for m in model.elements if isinstance(m, KineticElement)]) - species = kinetic.species - global_dimension = data.attrs["global_dimension"] - model_dimension = data.attrs["model_dimension"] - - data.coords["species"] = species - matrix = data.global_matrix if as_global else data.matrix - clp_dim = "global_clp_label" if as_global else "clp_label" - concentration_shape = ( - global_dimension if as_global else model_dimension, - "species", + def create_result( + self, + model: ActivationDataModel, # type:ignore[override] + global_dimension: str, + model_dimension: str, + amplitudes: xr.Dataset, + concentrations: xr.Dataset, + ) -> ElementResult: + species_amplitude = amplitudes.sel(amplitude_label=self.species).rename( + amplitude_label="species" ) - if len(matrix.shape) > 2: - concentration_shape = ( # type:ignore[assignment] - (model_dimension if as_global else global_dimension), - *concentration_shape, - ) - data["species_concentration"] = ( - concentration_shape, - matrix.sel({clp_dim: species}).to_numpy(), + species_concentration = concentrations.sel(amplitude_label=self.species).rename( + amplitude_label="species" ) - data["k_matrix"] = xr.DataArray(kinetic.full_array, dims=(("species"), ("species"))) - data["k_matrix_reduced"] = xr.DataArray(kinetic.array, dims=(("species"), ("species"))) + k_matrix = xr.DataArray( + self.full_array, coords={"to_species": self.species, "from_species": self.species} + ) + reduced_k_matrix = xr.DataArray( + self.array, coords={"to_species": self.species, "from_species": self.species} + ) - rates = kinetic.calculate() + rates = self.calculate() lifetimes = 1 / rates - data.coords["kinetic"] = np.arange(1, rates.size + 1) - data.coords["rate"] = ("kinetic", rates) - data.coords["lifetime"] = ("kinetic", lifetimes) - - if hasattr(data, "global_matrix"): - return + kinetic_coords = { + "kinetic": np.arange(1, rates.size + 1), + "rate": ("kinetic", rates), + "lifetime": ("kinetic", lifetimes), + } - species_associated_estimation = data.clp.sel(clp_label=species).data - data["species_associated_estimation"] = ( - (global_dimension, "species"), - species_associated_estimation, - ) initial_concentrations = [] a_matrices = [] - kinetic_associated_estimations = [] + kinetic_amplitudes = [] for activation in model.activation: initial_concentration = np.array( - [float(activation.compartments.get(label, 0)) for label in species] + [float(activation.compartments.get(label, 0)) for label in self.species] ) initial_concentrations.append(initial_concentration) - a_matrix = kinetic.a_matrix(initial_concentration) + a_matrix = self.a_matrix(initial_concentration) a_matrices.append(a_matrix) - kinetic_associated_estimations.append(species_associated_estimation @ a_matrix.T) + kinetic_amplitudes.append(species_amplitude.to_numpy() @ a_matrix.T) - data["initial_concentration"] = ( - ("activation", "species"), + initial_concentration = xr.DataArray( initial_concentrations, + coords={"activation": range(len(initial_concentrations)), "species": self.species}, ) - data["a_matrix"] = ( - ("activation", "species", "kinetic"), + a_matrix = xr.DataArray( a_matrices, + coords={ + "activation": range(len(a_matrices)), + "species": self.species, + } + | kinetic_coords, + dims=("activation", "species", "kinetic"), + ) + kinetic_amplitude_coords = ( + {"activation": range(len(kinetic_amplitudes))} + | kinetic_coords + | dict(species_amplitude.coords) ) - data["kinetic_associated_estimation"] = ( - ("activation", global_dimension, "kinetic"), - kinetic_associated_estimations, + del kinetic_amplitude_coords["species"] + kinetic_amplitude = xr.DataArray( + kinetic_amplitudes, + coords=kinetic_amplitude_coords, + dims=("activation", global_dimension, "kinetic"), + ) + + return ElementResult( + amplitudes={ + "species": species_amplitude, + "kinetic": kinetic_amplitude, + }, + concentrations={"species": species_concentration}, + extra={ + "k_matrix": k_matrix, + "reduced_k_matrix": reduced_k_matrix, + "initial_concentration": initial_concentration, + "a_matrix": a_matrix, + }, ) diff --git a/glotaran/builtin/elements/spectral/element.py b/glotaran/builtin/elements/spectral/element.py index 39a5f1ec8..0dc7ad89a 100644 --- a/glotaran/builtin/elements/spectral/element.py +++ b/glotaran/builtin/elements/spectral/element.py @@ -9,6 +9,7 @@ from glotaran.builtin.elements.spectral.shape import SpectralShape # noqa: TCH001 from glotaran.model.data_model import DataModel from glotaran.model.element import Element +from glotaran.model.element import ElementResult if TYPE_CHECKING: import xarray as xr @@ -50,7 +51,7 @@ def calculate_matrix( # type:ignore[override] return compartments, matrix - def add_to_result_data( # type:ignore[override] + def add_to_result_data( self, model: SpectralDataModel, data: xr.Dataset, @@ -78,3 +79,23 @@ def add_to_result_data( # type:ignore[override] (data.attrs["global_dimension"], "spectrum"), data.clp.sel(clp_label=shapes).data, ) + + def create_result( + self, + model: SpectralDataModel, # type:ignore[override] + global_dimension: str, + model_dimension: str, + amplitudes: xr.Dataset, + concentrations: xr.Dataset, + ) -> ElementResult: + shapes = list(self.shapes.keys()) + + spectra_amplitude = amplitudes.sel(amplitude_label=shapes).rename(amplitude_label="shape") + spectra_concentration = concentrations.sel(amplitude_label=shapes).rename( + amplitude_label="shape" + ) + + return ElementResult( + amplitudes={"spectrum": spectra_amplitude}, + concentrations={"spectrum": spectra_concentration}, + ) diff --git a/glotaran/builtin/items/activation/__init__.py b/glotaran/builtin/items/activation/__init__.py index d0bbe1894..36cb21618 100644 --- a/glotaran/builtin/items/activation/__init__.py +++ b/glotaran/builtin/items/activation/__init__.py @@ -2,7 +2,6 @@ from glotaran.builtin.items.activation.activation import Activation from glotaran.builtin.items.activation.data_model import ActivationDataModel -from glotaran.builtin.items.activation.data_model import add_activation_to_result_data from glotaran.builtin.items.activation.gaussian import GaussianActivation from glotaran.builtin.items.activation.gaussian import GaussianActivationParameters from glotaran.builtin.items.activation.gaussian import MultiGaussianActivation diff --git a/glotaran/builtin/items/activation/data_model.py b/glotaran/builtin/items/activation/data_model.py index cb77b64dc..66295ce97 100644 --- a/glotaran/builtin/items/activation/data_model.py +++ b/glotaran/builtin/items/activation/data_model.py @@ -1,11 +1,13 @@ from __future__ import annotations from typing import TYPE_CHECKING +from typing import cast import numpy as np -import xarray as xr # noqa: TCH002 +import xarray as xr from glotaran.builtin.items.activation.activation import Activation # noqa: TCH001 +from glotaran.builtin.items.activation.gaussian import GaussianActivationParameters from glotaran.builtin.items.activation.gaussian import MultiGaussianActivation from glotaran.builtin.items.activation.instant import InstantActivation # noqa: F401 from glotaran.model.data_model import DataModel @@ -44,60 +46,88 @@ class ActivationDataModel(DataModel): description="The activation(s) of the dataset.", ) + @staticmethod + def create_result( + model: ActivationDataModel, # type:ignore[override] + global_dimension: str, + model_dimension: str, + amplitudes: xr.DataArray, + concentrations: xr.DataArray, + ) -> dict[str, xr.DataArray]: + gaussian_activations = [ + a for a in model.activation if isinstance(a, MultiGaussianActivation) + ] + if not len(gaussian_activations): + return {} -def add_activation_to_result_data(model: ActivationDataModel, data: xr.Dataset): - gaussian_activations = [a for a in model.activation if isinstance(a, MultiGaussianActivation)] - if "gaussian_activation" in data or not len(gaussian_activations): - return - data.coords["gaussian_activation"] = np.arange(1, len(gaussian_activations) + 1) - global_dimension = data.attrs["global_dimension"] - global_axis = data.coords[global_dimension] - model_dimension = data.attrs["model_dimension"] - model_axis = data.coords[model_dimension] - - activations = [] - activation_parameters = [] - activation_shifts = [] - has_shifts = any(a.shift is not None for a in gaussian_activations) - activation_dispersions = [] - has_dispersions = any(a.dispersion_center is not None for a in gaussian_activations) - for activation in gaussian_activations: - activations.append(activation.calculate_function(model_axis)) - activation_parameters.append(activation.parameters()) - if has_shifts: - activation_shifts.append( - activation.shift if activation.shift is not None else [0] * global_axis.size + global_axis = amplitudes.coords[global_dimension] + model_axis = concentrations.coords[model_dimension] + + activations = [] + activation_parameters: list[list[GaussianActivationParameters]] = [] + activation_shifts = [] + activation_dispersions = [] + + has_shifts = any(a.shift is not None for a in gaussian_activations) + has_dispersions = any(a.dispersion_center is not None for a in gaussian_activations) + + for activation in gaussian_activations: + activations.append(activation.calculate_function(model_axis)) + activation_parameters.append( + cast(list[GaussianActivationParameters], activation.parameters()) ) - if has_dispersions: - activation_dispersions.append( - activation.calculate_dispersion(global_axis) - if activation.dispersion_center is not None - else activation.center * global_axis.size + if has_shifts: + activation_shifts.append( + activation.shift if activation.shift is not None else [0] * global_axis.size + ) + if has_dispersions: + activation_dispersions.append( + activation.calculate_dispersion(global_axis) + if activation.dispersion_center is not None + else activation.center * global_axis.size + ) + + result = {} + + activation_coords = {"gaussian_activation": np.arange(1, len(gaussian_activations) + 1)} + result["gaussian_activation_function"] = xr.DataArray( + activations, + coords=activation_coords | {model_dimension: model_axis}, + dims=("gaussian_activation", model_dimension), + ) + + if has_shifts: + result["activation_shift"] = xr.DataArray( + activation_shifts, + coords=activation_coords | {global_dimension: global_axis}, + dims=("gaussian_activation", global_dimension), ) - data["gaussian_activation_function"] = ( - ("gaussian_activation", model_dimension), - activations, - ) - data["gaussian_activation_center"] = ( - ("gaussian_activation", "gaussian_activation_part"), - [[p.center for p in ps] for ps in activation_parameters], # type:ignore[union-attr] - ) - data["gaussian_activation_width"] = ( - ("gaussian_activation", "gaussian_activation_part"), - [[p.width for p in ps] for ps in activation_parameters], # type:ignore[union-attr] - ) - data["gaussian_activation_scale"] = ( - ("gaussian_activation", "gaussian_activation_part"), - [[p.scale for p in ps] for ps in activation_parameters], # type:ignore[union-attr] - ) - if has_shifts: - data["gaussian_activation_shift"] = ( - ("gaussian_activation", global_dimension), - activation_shifts, + activation_coords = activation_coords | { + "gaussian_activation_part": np.arange(max([len(ps) for ps in activation_parameters])) + } + + result["activation_center"] = xr.DataArray( + [[p.center for p in ps] for ps in activation_parameters], + coords=activation_coords, + dims=("gaussian_activation", "gaussian_activation_part"), ) - if has_dispersions: - data["gaussian_activation_dispersion"] = ( - ("gaussian_activation", "gaussian_activation_part", global_dimension), - activation_dispersions, + result["activation_width"] = xr.DataArray( + [[p.width for p in ps] for ps in activation_parameters], + coords=activation_coords, + dims=("gaussian_activation", "gaussian_activation_part"), ) + result["activation_scale"] = xr.DataArray( + [[p.scale for p in ps] for ps in activation_parameters], + coords=activation_coords, + dims=("gaussian_activation", "gaussian_activation_part"), + ) + + if has_dispersions: + result["activation_dispersion"] = xr.DataArray( + activation_dispersions, + coords=activation_coords | {global_dimension: global_axis}, + dims=("gaussian_activation", "gaussian_activation_part", global_dimension), + ) + + return result diff --git a/glotaran/model/clp_constraint.py b/glotaran/model/clp_constraint.py index cf4c8445b..83544a50c 100644 --- a/glotaran/model/clp_constraint.py +++ b/glotaran/model/clp_constraint.py @@ -2,8 +2,11 @@ from __future__ import annotations +from typing import Annotated from typing import Literal +from pydantic import AfterValidator # noqa: TCH002 + from glotaran.model.interval_item import IntervalItem from glotaran.model.item import TypedItem @@ -15,7 +18,7 @@ class ClpConstraint(TypedItem, IntervalItem): the respective classes for details. """ - target: str + target: Annotated[str | list[str], AfterValidator(lambda v: [v] if isinstance(v, str) else v)] class ZeroConstraint(ClpConstraint): diff --git a/glotaran/model/data_model.py b/glotaran/model/data_model.py index 76e0df5d7..9acf0709a 100644 --- a/glotaran/model/data_model.py +++ b/glotaran/model/data_model.py @@ -214,6 +214,16 @@ def from_dict(cls, library: ModelLibrary, model_dict: dict[str, Any]) -> DataMod elements = {type(library[label]) for label in element_labels} return cls.create_class_for_elements(elements)(**model_dict) + @staticmethod + def create_result( + model: DataModel, + global_dimension: str, + model_dimension: str, + amplitudes: xr.DataArray, + concentrations: xr.DataArray, + ) -> dict[str, xr.DataArray]: + return {} + def is_data_model_global(data_model: DataModel) -> bool: """Check if a data model can model the global dimension. diff --git a/glotaran/model/element.py b/glotaran/model/element.py index 3b9e14b95..12eb9efdc 100644 --- a/glotaran/model/element.py +++ b/glotaran/model/element.py @@ -3,12 +3,16 @@ from __future__ import annotations import abc +from dataclasses import dataclass +from dataclasses import field from typing import TYPE_CHECKING from typing import Any from typing import ClassVar from pydantic import ConfigDict +from pydantic import Field +from glotaran.model.clp_constraint import ClpConstraint # noqa: TCH001 from glotaran.model.item import TypedItem from glotaran.plugin_system.element_registration import register_element @@ -31,6 +35,13 @@ def _sanitize_json_schema(json_schema: dict[str, Any]) -> None: json_schema["required"].remove("label") +@dataclass +class ElementResult: + amplitudes: dict[str, xr.DataArray] + concentrations: dict[str, xr.DataArray] + extra: dict[str, xr.DataArray] = field(default_factory=dict) + + class Element(TypedItem, abc.ABC): """Subclasses must overwrite :method:`glotaran.model.Element.calculate_matrix`.""" @@ -41,6 +52,9 @@ class Element(TypedItem, abc.ABC): dimension: str | None = None label: str + clp_constraints: list[ClpConstraint.get_annotated_type()] = ( # type:ignore[valid-type] + Field(default_factory=list) + ) model_config = ConfigDict(json_schema_extra=_sanitize_json_schema) @@ -79,7 +93,14 @@ def calculate_matrix( .. # noqa: DAR202 """ - def add_to_result_data(self, model: DataModel, data: xr.Dataset, as_global: bool): + def create_result( + self, + model: DataModel, + global_dimension: str, + model_dimension: str, + amplitudes: xr.Dataset, + concentrations: xr.Dataset, + ) -> ElementResult: """ Parameters @@ -93,6 +114,7 @@ def add_to_result_data(self, model: DataModel, data: xr.Dataset, as_global: bool as_global: bool Whether model is calculated as global model. """ + return ElementResult({}, {}) class ExtendableElement(Element): diff --git a/glotaran/model/experiment_model.py b/glotaran/model/experiment_model.py index df852ead1..4361beff7 100644 --- a/glotaran/model/experiment_model.py +++ b/glotaran/model/experiment_model.py @@ -10,7 +10,6 @@ from pydantic import ConfigDict from pydantic import Field -from glotaran.model.clp_constraint import ClpConstraint # noqa: TCH001 from glotaran.model.clp_penalties import EqualAreaPenalty # noqa: TCH001 from glotaran.model.clp_relation import ClpRelation # noqa: TCH001 from glotaran.model.data_model import DataModel @@ -33,7 +32,6 @@ class ExperimentModel(BaseModel): clp_link_tolerance: float = 0.0 clp_link_method: Literal["nearest", "backward", "forward"] = "nearest" - clp_constraints: list[ClpConstraint.get_annotated_type()] = Field(default_factory=list) # type:ignore[valid-type] clp_penalties: list[EqualAreaPenalty] = Field(default_factory=list) clp_relations: list[ClpRelation] = Field(default_factory=list) datasets: dict[str, DataModel] diff --git a/glotaran/optimization/data.py b/glotaran/optimization/data.py index 85780644a..b8c30276b 100644 --- a/glotaran/optimization/data.py +++ b/glotaran/optimization/data.py @@ -235,6 +235,21 @@ def get_from_dataset(self, dataset: xr.Dataset, name: str) -> ArrayLike | None: data = data.T return data + def unweight_result_dataset(self, result_dataset: xr.Dataset): + if self.weight is None: + return + + if "weight" not in result_dataset: + result_dataset["weight"] = xr.DataArray(self.weight, coords=result_dataset.data.coords) + result_dataset["weighted_residual"] = result_dataset["residual"] + result_dataset["residual"] = result_dataset["residual"] / self.weight + result_dataset.attrs["weighted_root_mean_square_error"] = result_dataset.attrs[ + "root_mean_square_error" + ] + result_dataset.attrs["root_mean_square_error"] = np.sqrt( + (result_dataset.residual**2).sum() / sum(result_dataset.residual.shape) + ).to_numpy() + class LinkedOptimizationData(OptimizationDataProvider): def __init__( diff --git a/glotaran/optimization/matrix.py b/glotaran/optimization/matrix.py index 94c8d26de..a33efe8cf 100644 --- a/glotaran/optimization/matrix.py +++ b/glotaran/optimization/matrix.py @@ -2,11 +2,13 @@ from copy import deepcopy from dataclasses import dataclass +from dataclasses import field from dataclasses import replace from itertools import chain from typing import TYPE_CHECKING import numpy as np +import xarray as xr from glotaran.model.data_model import DataModel from glotaran.model.data_model import iterate_data_model_elements @@ -31,6 +33,7 @@ class OptimizationMatrix: clp_axis: list[str] """The clp labels.""" array: ArrayLike + constraints: list[ClpConstraint] = field(default_factory=list) @property def is_index_dependent(self) -> bool: @@ -65,7 +68,7 @@ def from_element( if scale is not None: array *= scale - return cls(clp_axis, array) + return cls(clp_axis, array, element.clp_constraints) @classmethod def combine(cls, matrices: list[OptimizationMatrix]) -> OptimizationMatrix: @@ -87,7 +90,7 @@ def combine(cls, matrices: list[OptimizationMatrix]) -> OptimizationMatrix: for matrix in matrices: clp_mask = [clp_axis.index(c) for c in matrix.clp_axis] array[..., clp_mask] += matrix.array - return cls(clp_axis, array) + return cls(clp_axis, array, [c for m in matrices for c in m.constraints]) @classmethod def link(cls, matrices: list[OptimizationMatrix]) -> OptimizationMatrix: @@ -104,7 +107,7 @@ def link(cls, matrices: list[OptimizationMatrix]) -> OptimizationMatrix: current_element_index_end = current_element_index + matrix.model_axis_size array[current_element_index:current_element_index_end, clp_mask] = matrix.array current_element_index = current_element_index_end - return cls(clp_axis, array) + return cls(clp_axis, array, [c for m in matrices for c in m.constraints]) @classmethod def from_data_model( @@ -209,14 +212,13 @@ def from_linked_data( def reduce( self, index: float, - constraints: list[ClpConstraint], relations: list[ClpRelation], copy: bool = False, ) -> OptimizationMatrix: result = deepcopy(self) if copy else self if result.is_index_dependent: raise GlotaranUserError("Cannot reduce index dependent matrix.") - constraints = [c for c in constraints if c.applies(index)] + constraints = [c for c in self.constraints if c.applies(index)] relations = [r for r in relations if r.applies(index)] if len(constraints) + len(relations) == 0: return result @@ -239,7 +241,9 @@ def reduce( result.array = result.array @ relation_matrix if len(constraints) > 0: - removed_clp_labels = [c.target for c in constraints if c.target in result.clp_axis] + removed_clp_labels = { + label for c in constraints for label in c.target if label in result.clp_axis + } if len(removed_clp_labels) > 0: mask = [label not in removed_clp_labels for label in result.clp_axis] result.clp_axis = [ @@ -318,3 +322,12 @@ def as_global_list(self, global_axis: ArrayLike) -> list[OptimizationMatrix]: A list of matrices. """ return [self.at_index(i) for i in range(global_axis.size)] + + def to_data_array( + self, global_dim: str, global_axis: ArrayLike, model_dim: str, model_axis: ArrayLike + ) -> xr.DataArray: + coords = {model_dim: model_axis, "amplitude_label": self.clp_axis} + if self.is_index_dependent: + coords = {global_dim: global_axis} | coords + + return xr.DataArray(self.array, dims=coords.keys(), coords=coords) diff --git a/glotaran/optimization/objective.py b/glotaran/optimization/objective.py index 12ddce78b..d7ef33bf1 100644 --- a/glotaran/optimization/objective.py +++ b/glotaran/optimization/objective.py @@ -2,13 +2,16 @@ from dataclasses import dataclass from typing import TYPE_CHECKING +from typing import Any +from typing import cast import numpy as np import xarray as xr -from glotaran.io.prepare_dataset import add_svd_to_dataset +from glotaran.model.data_model import DataModel from glotaran.model.data_model import iterate_data_model_elements -from glotaran.model.data_model import iterate_data_model_global_elements +from glotaran.model.element import Element +from glotaran.model.element import ElementResult from glotaran.optimization.data import LinkedOptimizationData from glotaran.optimization.data import OptimizationData from glotaran.optimization.estimation import OptimizationEstimation @@ -21,12 +24,34 @@ from glotaran.typing.types import ArrayLike +def add_svd_to_result_dataset(dataset: xr.Dataset, global_dim: str, model_dim: str): + for name in ["data", "residual"]: + if f"{name}_singular_values" in dataset: + continue + lsv, sv, rsv = np.linalg.svd(dataset[name].data, full_matrices=False) + dataset[f"{name}_left_singular_vectors"] = ( + (model_dim, "left_singular_value_index"), + lsv, + ) + dataset[f"{name}_singular_values"] = (("singular_value_index"), sv) + dataset[f"{name}_right_singular_vectors"] = ( + (global_dim, "right_singular_value_index"), + rsv.T, + ) + + +@dataclass +class DatasetResult: + data: xr.Dataset + elements: dict[str, ElementResult] + extra: dict[str, xr.Dataset] + + @dataclass class OptimizationObjectiveResult: - data: dict[str, xr.Dataset] - free_clp_size: int + data: dict[str, DatasetResult] additional_penalty: float - dataset_penalty: dict[str, float] + clp_size: int class OptimizationObjective: @@ -47,9 +72,7 @@ def calculate_reduced_matrices( self, matrices: list[OptimizationMatrix] ) -> list[OptimizationMatrix]: return [ - matrices[i].reduce( - index, self._model.clp_constraints, self._model.clp_relations, copy=True - ) + matrices[i].reduce(index, self._model.clp_relations, copy=True) for i, index in enumerate(self._data.global_axis) ] @@ -100,12 +123,13 @@ def calculate(self) -> ArrayLike: ) return np.concatenate(penalties) - def get_result(self) -> OptimizationObjectiveResult: - return ( - self.create_unlinked_result() - if isinstance(self._data, OptimizationData) - else self.create_linked_result() - ) + def get_global_indices(self, label: str) -> list[int]: + assert isinstance(self._data, LinkedOptimizationData) + return [ + i + for i, group_label in enumerate(self._data.group_labels) + if label in self._data.group_definitions[group_label] + ] def create_result_dataset(self, label: str, data: OptimizationData) -> xr.Dataset: assert isinstance(data.model.data, xr.Dataset) @@ -121,98 +145,28 @@ def create_result_dataset(self, label: str, data: OptimizationData) -> xr.Datase dataset.attrs["scale"] = scale.value if isinstance(scale, Parameter) else scale return dataset - def add_matrix_to_dataset(self, dataset: xr.Dataset, matrix: OptimizationMatrix): - dataset.coords["clp_label"] = matrix.clp_axis - matrix_dims = (dataset.attrs["model_dimension"], "clp_label") - if matrix.is_index_dependent: - matrix_dims = ( # type:ignore[assignment] - dataset.attrs["global_dimension"], - *matrix_dims, - ) - dataset["matrix"] = xr.DataArray(matrix.array, dims=matrix_dims) - - def add_linked_clp_and_residual_to_dataset( - self, - dataset: xr.Dataset, - label: str, - clp_axes: list[list[str]], - estimations: list[OptimizationEstimation], - ): - assert isinstance(self._data, LinkedOptimizationData) - global_indices = [ - i - for i, group_label in enumerate(self._data.group_labels) - if label in self._data.group_definitions[group_label] - ] - - clp_dims = (dataset.attrs["global_dimension"], "clp_label") - dataset["clp"] = xr.DataArray( - [ - [ - estimations[i].clp[clp_axes[i].index(clp_label)] - for clp_label in dataset.coords["clp_label"] - ] - for i in global_indices - ], - dims=clp_dims, - ) - - offsets = [] - for i in global_indices: - group_label = self._data._group_labels[i] - group_index = self._data.group_definitions[group_label].index(label) - offsets.append(sum(self._data.group_sizes[group_label][:group_index])) - size = dataset.coords[dataset.attrs["model_dimension"]].size - residual_dims = ( - dataset.attrs["global_dimension"], - dataset.attrs["model_dimension"], - ) - dataset["residual"] = xr.DataArray( - [ - estimations[i].residual[offset : offset + size] - for i, offset in zip(global_indices, offsets) - ], - dims=residual_dims, - ).T + def create_global_result(self) -> OptimizationObjectiveResult: + label = next(iter(self._model.datasets.keys())) + result_dataset = self.create_result_dataset(label, self._data) - def add_unlinked_clp_and_residual_to_dataset( - self, - dataset: xr.Dataset, - estimations: list[OptimizationEstimation], - ): - clp_dims = (dataset.attrs["global_dimension"], "clp_label") - dataset["clp"] = xr.DataArray([e.clp for e in estimations], dims=clp_dims) + global_dim = result_dataset.attrs["global_dimension"] + global_axis = result_dataset.coords[global_dim] + model_dim = result_dataset.attrs["model_dimension"] + model_axis = result_dataset.coords[model_dim] - residual_dims = ( - dataset.attrs["global_dimension"], - dataset.attrs["model_dimension"], + matrix = OptimizationMatrix.from_data(self._data).to_data_array( + global_dim, global_axis, model_dim, model_axis ) - dataset["residual"] = xr.DataArray([e.residual for e in estimations], dims=residual_dims).T - - def add_global_clp_and_residual_to_dataset( - self, - dataset: xr.Dataset, - data: OptimizationData, - matrix: OptimizationMatrix, - ): - global_matrix = OptimizationMatrix.from_data(data, global_matrix=True) - global_matrix_coords = ( - (data.global_dimension, data.global_axis), - ("global_clp_label", matrix.clp_axis), + global_matrix = OptimizationMatrix.from_data(self._data, global_matrix=True).to_data_array( + model_dim, model_axis, global_dim, global_axis ) - if global_matrix.is_index_dependent: - global_matrix_coords = ( # type:ignore[assignment] - (data.model_dimension, data.model_axis), - *global_matrix_coords, - ) - dataset["global_matrix"] = xr.DataArray(global_matrix.array, coords=global_matrix_coords) - _, _, full_matrix = OptimizationMatrix.from_global_data(data) + _, _, full_matrix = OptimizationMatrix.from_global_data(self._data) estimation = OptimizationEstimation.calculate( full_matrix.array, - data.flat_data, # type:ignore[arg-type] - data.model.residual_function, + self._data.flat_data, # type:ignore[arg-type] + self._data.model.residual_function, ) - dataset["clp"] = xr.DataArray( + clp = xr.DataArray( estimation.clp.reshape((len(global_matrix.clp_axis), len(matrix.clp_axis))), coords={ "global_clp_label": global_matrix.clp_axis, @@ -220,125 +174,284 @@ def add_global_clp_and_residual_to_dataset( }, dims=["global_clp_label", "clp_label"], ) - dataset["residual"] = xr.DataArray( - estimation.residual.reshape( - data.global_axis.size, - data.model_axis.size, - ), - coords=( - (data.global_dimension, data.global_axis), - (data.model_dimension, data.model_axis), - ), + result_dataset["residual"] = xr.DataArray( + estimation.residual.reshape(global_axis.size, model_axis.size), + coords=((global_dim, global_axis), (model_dim, model_axis)), ).T + result_dataset.attrs["root_mean_square_error"] = np.sqrt( + (result_dataset.residual.to_numpy() ** 2).sum() / sum(result_dataset.residual.shape) + ) + clp_size = len(matrix.clp_axis) + len(global_matrix.clp_axis) + self._data.unweight_result_dataset(result_dataset) + result_dataset["fit"] = result_dataset.data - result_dataset.residual + add_svd_to_result_dataset(result_dataset, global_dim, model_dim) + result = DatasetResult( + result_dataset, + { + label: ElementResult( + amplitudes={"clp": clp}, + concentrations={"global": global_matrix, "model": matrix}, + ) + }, + {}, + ) + return OptimizationObjectiveResult( + data={label: result}, clp_size=clp_size, additional_penalty=0 + ) - def finalize_result_dataset(self, dataset: xr.Dataset, data: OptimizationData, add_svd=True): - # Calculate RMS - size = dataset.residual.shape[0] * dataset.residual.shape[1] - dataset.attrs["root_mean_square_error"] = np.sqrt((dataset.residual**2).sum() / size).data - dataset["fitted_data"] = dataset.data - dataset.residual - - if data.weight is not None: - weight = data.weight - if data.is_global: - dataset["global_weighted_matrix"] = dataset["global_matrix"] - dataset["global_matrix"] = dataset["global_matrix"] / weight[..., np.newaxis] - if "weight" not in dataset: - dataset["weight"] = xr.DataArray(data.weight, coords=dataset.data.coords) - dataset["weighted_residual"] = dataset["residual"] - dataset["residual"] = dataset["residual"] / weight - dataset["weighted_matrix"] = dataset["matrix"] - dataset["matrix"] = dataset["matrix"] / weight.T[..., np.newaxis] - dataset.attrs["weighted_root_mean_square_error"] = dataset.attrs[ - "root_mean_square_error" - ] - dataset.attrs["root_mean_square_error"] = np.sqrt( - (dataset.residual**2).sum() / size - ).data - - if add_svd: - for name in ["data", "residual"]: - add_svd_to_dataset(dataset, name, data.model_dimension, data.global_dimension) - for _, model in iterate_data_model_elements(data.model): - model.add_to_result_data( # type:ignore[union-attr] - data.model, dataset, False - ) - for _, model in iterate_data_model_global_elements(data.model): - model.add_to_result_data( # type:ignore[union-attr] - data.model, dataset, True + def create_single_dataset_result(self) -> OptimizationObjectiveResult: + assert isinstance(self._data, OptimizationData) + if self._data.is_global: + return self.create_global_result() + + label = next(iter(self._model.datasets.keys())) + result_dataset = self.create_result_dataset(label, self._data) + + global_dim = result_dataset.attrs["global_dimension"] + global_axis = result_dataset.coords[global_dim] + model_dim = result_dataset.attrs["model_dimension"] + model_axis = result_dataset.coords[model_dim] + + concentrations = OptimizationMatrix.from_data(self._data) + additional_penalty = 0 + + clp_concentration = self.calculate_reduced_matrices( + concentrations.as_global_list(self._data.global_axis) + ) + clp_size = sum(len(c.clp_axis) for c in clp_concentration) + estimations = self.resolve_estimations( + concentrations.as_global_list(self._data.global_axis), + clp_concentration, + self.calculate_estimations(clp_concentration), + ) + amplitude_coords = { + global_dim: global_axis, + "amplitude_label": concentrations.clp_axis, + } + amplitude = xr.DataArray( + [e.clp for e in estimations], dims=amplitude_coords.keys(), coords=amplitude_coords + ) + concentration = concentrations.to_data_array( + global_dim, global_axis, model_dim, model_axis + ) + + residual_dims = (global_dim, model_dim) + result_dataset["residual"] = xr.DataArray( + [e.residual for e in estimations], dims=residual_dims + ).T + result_dataset.attrs["root_mean_square_error"] = np.sqrt( + (result_dataset.residual.to_numpy() ** 2).sum() / sum(result_dataset.residual.shape) + ) + additional_penalty = sum( + calculate_clp_penalties( + [concentrations], + estimations, + global_axis, + self._model.clp_penalties, ) + ) + element_results = self.create_element_results( + self._model.datasets[label], global_dim, model_dim, amplitude, concentration + ) + extra = self.create_data_model_results( + label, global_dim, model_dim, amplitude, concentration + ) + + self._data.unweight_result_dataset(result_dataset) + result_dataset["fit"] = result_dataset.data - result_dataset.residual + add_svd_to_result_dataset(result_dataset, global_dim, model_dim) + result = DatasetResult(result_dataset, element_results, extra) + return OptimizationObjectiveResult( + data={label: result}, clp_size=clp_size, additional_penalty=additional_penalty + ) - def create_linked_result(self) -> OptimizationObjectiveResult: + def create_multi_dataset_result(self) -> OptimizationObjectiveResult: assert isinstance(self._data, LinkedOptimizationData) - matrices = { + dataset_concentrations = { label: OptimizationMatrix.from_data(data) for label, data in self._data.data.items() } - linked_matrices = OptimizationMatrix.from_linked_data(self._data, matrices) - clp_axes = [matrix.clp_axis for matrix in linked_matrices] - reduced_matrices = self.calculate_reduced_matrices(linked_matrices) - free_clp_size = sum(len(matrix.clp_axis) for matrix in reduced_matrices) - estimations = self.resolve_estimations( - linked_matrices, - reduced_matrices, - self.calculate_estimations(reduced_matrices), + full_concentration = OptimizationMatrix.from_linked_data( + self._data, dataset_concentrations ) + estimated_amplitude_axes = [concentration.clp_axis for concentration in full_concentration] + clp_concentration = self.calculate_reduced_matrices(full_concentration) + clp_size = sum(len(concentration.clp_axis) for concentration in clp_concentration) - results = {} - for label, matrix in matrices.items(): - data = self._data.data[label] - results[label] = self.create_result_dataset(label, data) - self.add_matrix_to_dataset(results[label], matrix) - self.add_linked_clp_and_residual_to_dataset( - results[label], label, clp_axes, estimations - ) - self.finalize_result_dataset(results[label], data) + estimations = self.resolve_estimations( + full_concentration, + clp_concentration, + self.calculate_estimations(clp_concentration), + ) additional_penalty = sum( calculate_clp_penalties( - linked_matrices, + full_concentration, estimations, self._data.global_axis, self._model.clp_penalties, ) ) - dataset_penalties = {label: d.residual.sum().data for label, d in results.items()} + + results = { + label: self.create_dataset_result( + label, + data, + dataset_concentrations[label], + estimated_amplitude_axes, + estimations, + ) + for label, data in self._data.data.items() + } return OptimizationObjectiveResult( - results, free_clp_size, additional_penalty, dataset_penalties + data=results, + clp_size=clp_size, + additional_penalty=additional_penalty, ) - def create_unlinked_result(self) -> OptimizationObjectiveResult: - assert isinstance(self._data, OptimizationData) + def get_dataset_amplitudes( + self, + label: str, + estimated_amplitude_axes: list[list[str]], + estimated_amplitudes: list[OptimizationEstimation], + amplitude_axis: ArrayLike, + global_dim: str, + global_axis: ArrayLike, + ) -> xr.DataArray: + assert isinstance(self._data, LinkedOptimizationData) - label = next(iter(self._model.datasets.keys())) - result = self.create_result_dataset(label, self._data) + global_indices = self.get_global_indices(label) + coords = { + global_dim: global_axis, + "amplitude_label": amplitude_axis, + } + return xr.DataArray( + [ + [ + estimated_amplitudes[i].clp[estimated_amplitude_axes[i].index(amplitude_label)] + for amplitude_label in amplitude_axis + ] + for i in global_indices + ], + dims=coords.keys(), + coords=coords, + ) - matrix = OptimizationMatrix.from_data(self._data) - self.add_matrix_to_dataset(result, matrix) - additional_penalty = 0 - if self._data.is_global: - self.add_global_clp_and_residual_to_dataset(result, self._data, matrix) - free_clp_size = len(matrix.clp_axis) + def get_dataset_residual( + self, + label: str, + estimations: list[OptimizationEstimation], + model_dim: str, + model_axis: ArrayLike, + global_dim: str, + global_axis: ArrayLike, + ) -> xr.DataArray: + assert isinstance(self._data, LinkedOptimizationData) - else: - reduced_matrices = self.calculate_reduced_matrices( - matrix.as_global_list(self._data.global_axis) - ) - free_clp_size = sum(len(matrix.clp_axis) for matrix in reduced_matrices) - estimations = self.resolve_estimations( - matrix.as_global_list(self._data.global_axis), - reduced_matrices, - self.calculate_estimations(reduced_matrices), + global_indices = self.get_global_indices(label) + coords = {global_dim: global_axis, model_dim: model_axis} + offsets = [] + for i in global_indices: + group_label = self._data._group_labels[i] + group_index = self._data.group_definitions[group_label].index(label) + offsets.append(sum(self._data.group_sizes[group_label][:group_index])) + size = model_axis.size + return xr.DataArray( + [ + estimations[i].residual[offset : offset + size] + for i, offset in zip(global_indices, offsets) + ], + dims=coords.keys(), + coords=coords, + ).T + + def create_element_results( + self, + model: DataModel, + global_dim: str, + model_dim: str, + amplitudes: xr.DataArray, + concentrations: xr.DataArray, + ) -> dict[str, ElementResult]: + assert any(isinstance(element, str) for element in model.elements) is False + return { + element.label: element.create_result( + model, global_dim, model_dim, amplitudes, concentrations ) - self.add_unlinked_clp_and_residual_to_dataset(result, estimations) - additional_penalty = sum( - calculate_clp_penalties( - [matrix], - estimations, - self._data.global_axis, - self._model.clp_penalties, - ) + for element in cast(list[Element], model.elements) + } + + def create_dataset_result( + self, + label: str, + data: OptimizationData, + concentration: OptimizationMatrix, + estimated_amplitude_axes: list[list[str]], + estimations: list[OptimizationEstimation], + ) -> xr.Dataset: + assert isinstance(self._data, LinkedOptimizationData) + result_dataset = self.create_result_dataset(label, data) + + global_dim = result_dataset.attrs["global_dimension"] + global_axis = result_dataset.coords[global_dim] + model_dim = result_dataset.attrs["model_dimension"] + model_axis = result_dataset.coords[model_dim] + + result_dataset["residual"] = self.get_dataset_residual( + label, estimations, model_dim, model_axis, global_dim, global_axis + ) + result_dataset.attrs["root_mean_square_error"] = np.sqrt( + (result_dataset.residual.to_numpy() ** 2).sum() / sum(result_dataset.residual.shape) + ) + self._data.data[label].unweight_result_dataset(result_dataset) + result_dataset["fit"] = result_dataset.data - result_dataset.residual + add_svd_to_result_dataset(result_dataset, global_dim, model_dim) + + concentrations = concentration.to_data_array( + global_dim, global_axis, model_dim, model_axis + ) + amplitudes = self.get_dataset_amplitudes( + label, + estimated_amplitude_axes, + estimations, + concentrations.amplitude_label, + global_dim, + global_axis, + ) + element_results = self.create_element_results( + self._model.datasets[label], global_dim, model_dim, amplitudes, concentrations + ) + extra = self.create_data_model_results( + label, global_dim, model_dim, amplitudes, concentrations + ) + return DatasetResult(result_dataset, element_results, extra) + + def create_data_model_results( + self, + label: str, + global_dim: str, + model_dim: str, + amplitudes: xr.DataArray, + concentrations: xr.DataArray, + ) -> dict[str, ElementResult]: + result = {} + data_model = self._model.datasets[label] + assert any(isinstance(e, str) for _, e in iterate_data_model_elements(data_model)) is False + for data_model_cls in { + e.__class__.data_model_type + for _, e in cast(tuple[Any, Element], iterate_data_model_elements(data_model)) + if e.__class__.data_model_type is not None + }: + result = result | data_model_cls.create_result( + data_model, + global_dim, + model_dim, + amplitudes, + concentrations, ) + return result - self.finalize_result_dataset(result, self._data) - dataset_penalties = {label: result.residual.sum().data} - return OptimizationObjectiveResult( - {label: result}, free_clp_size, additional_penalty, dataset_penalties + def get_result(self) -> OptimizationObjectiveResult: + return ( + self.create_single_dataset_result() + if isinstance(self._data, OptimizationData) + else self.create_multi_dataset_result() ) diff --git a/glotaran/optimization/optimization.py b/glotaran/optimization/optimization.py index 96ca601be..3336af7ba 100644 --- a/glotaran/optimization/optimization.py +++ b/glotaran/optimization/optimization.py @@ -137,7 +137,7 @@ def run(self) -> tuple[Parameters, dict[str, xr.Dataset], OptimizationResult]: penalty = np.concatenate([o.calculate() for o in self._objectives]) results = [o.get_result() for o in self._objectives] data = dict(ChainMap(*[r.data for r in results])) - number_of_clps = sum(r.free_clp_size for r in results) + number_of_clps = sum(r.clp_size for r in results) additional_penalty = sum(r.additional_penalty for r in results) result = OptimizationResult.from_least_squares_result( ls_result, @@ -157,7 +157,7 @@ def dry_run(self) -> tuple[Parameters, dict[str, xr.Dataset], OptimizationResult penalty = np.concatenate([o.calculate() for o in self._objectives]) results = [o.get_result() for o in self._objectives] data = dict(ChainMap(*[r.data for r in results])) - number_of_clps = sum(r.free_clp_size for r in results) + number_of_clps = sum(r.clp_size for r in results) additional_penalty = sum(r.additional_penalty for r in results) result = OptimizationResult.from_least_squares_result( None, diff --git a/tests/builtin/elements/coherent_artifact/test_coherent_artifact_element.py b/tests/builtin/elements/coherent_artifact/test_coherent_artifact_element.py index 3a572f98c..9fd29f850 100644 --- a/tests/builtin/elements/coherent_artifact/test_coherent_artifact_element.py +++ b/tests/builtin/elements/coherent_artifact/test_coherent_artifact_element.py @@ -37,9 +37,9 @@ ( "clp_label", [ - "coherent_artifact_coherent-artifact_order_1_activation_0", - "coherent_artifact_coherent-artifact_order_2_activation_0", - "coherent_artifact_coherent-artifact_order_3_activation_0", + "coherent_artifact_coherent-artifact_order_1", + "coherent_artifact_coherent-artifact_order_2", + "coherent_artifact_coherent-artifact_order_3", ], ), ("spectral", test_global_axis.data), @@ -95,5 +95,11 @@ def test_coherent_artifact(activation: Activation): assert optimized_parameters.close_or_equal(test_parameters_simulation) assert "coherent_artifact" in optimized_data - assert "coherent_artifact_response" in optimized_data["coherent_artifact"] - assert "coherent_artifact_associated_estimation" in optimized_data["coherent_artifact"] + assert ( + "coherent_artifact_associated_concentrations_coherent-artifact" + in optimized_data["coherent_artifact"] + ) + assert ( + "coherent_artifact_associated_amplitudes_coherent-artifact" + in optimized_data["coherent_artifact"] + ) diff --git a/tests/builtin/elements/damped_oscillation/test_damped_oscillation_element.py b/tests/builtin/elements/damped_oscillation/test_damped_oscillation_element.py index 9e2ee80dd..966f4be3d 100644 --- a/tests/builtin/elements/damped_oscillation/test_damped_oscillation_element.py +++ b/tests/builtin/elements/damped_oscillation/test_damped_oscillation_element.py @@ -115,10 +115,35 @@ def test_coherent_artifact(activation: Activation): assert optimized_parameters.close_or_equal(test_parameters_simulation) assert "damped_oscillation" in optimized_data - assert "damped_oscillation" in optimized_data["damped_oscillation"] - assert "damped_oscillation_associated_estimation" in optimized_data["damped_oscillation"] - assert "damped_oscillation_frequency" in optimized_data["damped_oscillation"] - assert "damped_oscillation_rate" in optimized_data["damped_oscillation"] - assert "damped_oscillation_phase" in optimized_data["damped_oscillation"] - assert "damped_oscillation_sin" in optimized_data["damped_oscillation"] - assert "damped_oscillation_cos" in optimized_data["damped_oscillation"] + assert ( + "damped_oscillation_associated_amplitudes_damped-oscillation" + in optimized_data["damped_oscillation"] + ) + assert ( + "damped_oscillation_associated_concentrations_damped-oscillation" + in optimized_data["damped_oscillation"] + ) + assert ( + "damped_oscillation_frequency_damped-oscillation" in optimized_data["damped_oscillation"] + ) + assert "damped_oscillation_rate_damped-oscillation" in optimized_data["damped_oscillation"] + assert ( + "damped_oscillation_phase_associated_amplitudes_damped-oscillation" + in optimized_data["damped_oscillation"] + ) + assert ( + "damped_oscillation_sin_associated_amplitudes_damped-oscillation" + in optimized_data["damped_oscillation"] + ) + assert ( + "damped_oscillation_cos_associated_amplitudes_damped-oscillation" + in optimized_data["damped_oscillation"] + ) + assert ( + "damped_oscillation_sin_associated_concentrations_damped-oscillation" + in optimized_data["damped_oscillation"] + ) + assert ( + "damped_oscillation_cos_associated_concentrations_damped-oscillation" + in optimized_data["damped_oscillation"] + ) diff --git a/tests/builtin/elements/kinetic/test_kinetic_element.py b/tests/builtin/elements/kinetic/test_kinetic_element.py index 7ecde74a7..ee770b13e 100644 --- a/tests/builtin/elements/kinetic/test_kinetic_element.py +++ b/tests/builtin/elements/kinetic/test_kinetic_element.py @@ -10,7 +10,6 @@ from glotaran.builtin.items.activation import GaussianActivation from glotaran.builtin.items.activation import InstantActivation from glotaran.builtin.items.activation import MultiGaussianActivation -from glotaran.model.clp_constraint import ZeroConstraint from glotaran.model.experiment_model import ExperimentModel from glotaran.optimization import Optimization from glotaran.parameter import Parameters @@ -46,6 +45,10 @@ ("s2", "s2"): "rates.2", ("s1", "s2"): "rates.3", }, + "clp_constraints": [ + {"type": "zero", "target": "s1", "interval": (1, 1)}, + {"type": "zero", "target": "s2", "interval": (0, 0)}, + ], } ), } @@ -105,17 +108,7 @@ def test_decay(decay: str, activation: Activation): data_model.data = simulate( data_model, test_library, test_parameters_simulation, test_axies, clp=test_clp ) - experiments = [ - ExperimentModel( - datasets={"decay": data_model}, - clp_constraints=[ - ZeroConstraint(type="zero", target="s1", interval=(1, 1)), - ZeroConstraint(type="zero", target="s2", interval=(0, 0)), - ] - if decay == "equilibrium" - else [], - ) - ] + experiments = [ExperimentModel(datasets={"decay": data_model})] optimization = Optimization( experiments, test_parameters, @@ -127,11 +120,12 @@ def test_decay(decay: str, activation: Activation): assert result.success assert optimized_parameters.close_or_equal(test_parameters_simulation) assert "decay" in optimized_data - assert "clp" in optimized_data["decay"] + print(optimized_data["decay"]) assert "residual" in optimized_data["decay"] - assert "species_concentration" in optimized_data["decay"] - assert "species_associated_estimation" in optimized_data["decay"] - assert "kinetic_associated_estimation" in optimized_data["decay"] + assert f"species_associated_concentrations_{decay}" in optimized_data["decay"] + assert f"species_associated_amplitudes_{decay}" in optimized_data["decay"] + assert f"kinetic_associated_amplitudes_{decay}" in optimized_data["decay"] + assert f"k_matrix_{decay}" in optimized_data["decay"] if isinstance(activation, MultiGaussianActivation): assert "gaussian_activation" in optimized_data["decay"].coords assert "gaussian_activation_function" in optimized_data["decay"] diff --git a/tests/builtin/elements/spectral/test_spectral_element.py b/tests/builtin/elements/spectral/test_spectral_element.py index fdbd32b68..f16553c56 100644 --- a/tests/builtin/elements/spectral/test_spectral_element.py +++ b/tests/builtin/elements/spectral/test_spectral_element.py @@ -135,6 +135,5 @@ def test_spectral(shape: str): assert optimized_parameters.close_or_equal(test_parameters_simulation) assert "spectral" in optimized_data - assert "spectrum" in optimized_data["spectral"] - assert "spectrum_associated_estimation" in optimized_data["spectral"] - assert "spectra" in optimized_data["spectral"] + assert f"spectrum_associated_amplitudes_{shape}" in optimized_data["spectral"] + assert f"spectrum_associated_concentrations_{shape}" in optimized_data["spectral"] diff --git a/tests/builtin/elements/test_spectral_decay_linked_model.py b/tests/builtin/elements/test_spectral_decay_linked_model.py index 4906b2454..bc70a7d40 100644 --- a/tests/builtin/elements/test_spectral_decay_linked_model.py +++ b/tests/builtin/elements/test_spectral_decay_linked_model.py @@ -151,8 +151,8 @@ def test_spectral_decay_linking(): print(optimized_parameters) print(test_parameters_simulation) assert optimized_parameters.close_or_equal(test_parameters_simulation, rtol=1e-1) - sas_ds1 = optimized_data["decay_1"].clp.to_numpy() - sas_sd2 = optimized_data["decay_2"].clp.to_numpy() + sas_ds1 = optimized_data["decay_1"].species_associated_amplitudes_decay.to_numpy() + sas_sd2 = optimized_data["decay_2"].species_associated_amplitudes_decay.to_numpy() print("Diff SAS: ", sas_ds1.sum() - sas_sd2.sum()) print(sas_ds1[0, 0], sas_sd2[0, 0]) assert not np.allclose(sas_ds1, np.zeros_like(sas_ds1)) diff --git a/tests/model/test_experiment_model.py b/tests/model/test_experiment_model.py index 694f43961..6e7491bbd 100644 --- a/tests/model/test_experiment_model.py +++ b/tests/model/test_experiment_model.py @@ -37,18 +37,6 @@ def test_experiment_model_from_dict(): "weight": 1, } ], - "clp_constraints": [ - { - "type": "only", - "target": "t", - "interval": [(1, 2)], - }, - { - "type": "zero", - "target": "t", - "interval": (1, 2), - }, - ], "clp_relations": [ { "source": "s", @@ -69,9 +57,3 @@ def test_experiment_model_from_dict(): d3 = experiment_model.datasets["d3"] assert isinstance(d3, MockDataModel) - - only = experiment_model.clp_constraints[0] - assert isinstance(only, OnlyConstraint) - - zero = experiment_model.clp_constraints[1] - assert isinstance(zero, ZeroConstraint) diff --git a/tests/optimization/data.py b/tests/optimization/data.py index f0094072a..5af42b83b 100644 --- a/tests/optimization/data.py +++ b/tests/optimization/data.py @@ -13,7 +13,7 @@ elements=[ TestElementConstant( type="test-element-constant", - label="test", + label="test_ele", dimension="model", compartments=["c1"], value=5, @@ -29,7 +29,7 @@ elements=[ TestElementConstant( type="test-element-constant", - label="test", + label="test_ele_index_dependent", dimension="model", compartments=["c2"], value=2, diff --git a/tests/optimization/elements.py b/tests/optimization/elements.py index b16dfe008..867aec9c9 100644 --- a/tests/optimization/elements.py +++ b/tests/optimization/elements.py @@ -7,6 +7,7 @@ from glotaran.model.data_model import DataModel # noqa: TCH001 from glotaran.model.element import Element +from glotaran.model.element import ElementResult from glotaran.model.item import ParameterType # noqa: TCH001 if TYPE_CHECKING: @@ -32,13 +33,18 @@ def calculate_matrix( matrix = np.array([matrix] * global_axis.size) return self.compartments, matrix - def add_to_result_data( + def create_result( self, model: DataModel, - data: xr.Dataset, - as_global: bool = False, - ): - data.attrs["custom_element_result"] = True + global_dimension: str, + model_dimension: str, + amplitudes: xr.Dataset, + concentrations: xr.Dataset, + ) -> ElementResult: + return ElementResult( + amplitudes={"test": amplitudes}, + concentrations={"test": concentrations}, + ) class TestElementExponential(Element): diff --git a/tests/optimization/test_estimation.py b/tests/optimization/test_estimation.py index 151c8023b..adb91093d 100644 --- a/tests/optimization/test_estimation.py +++ b/tests/optimization/test_estimation.py @@ -34,7 +34,9 @@ def test_calculate(residual_function: str): def test_resolve_clp(): data_model = deepcopy(TestDataModelConstantThreeCompartments) - constraints = [ZeroConstraint(type="zero", target="c3_1", interval=[(3, 7)])] + data_model.elements[0].clp_constraints = [ + ZeroConstraint(type="zero", target="c3_1", interval=[(3, 7)]) + ] relations = [ ClpRelation( source="c3_2", target="c3_3", parameter=Parameter(label="", value=3), interval=[(3, 7)] @@ -43,7 +45,7 @@ def test_resolve_clp(): data = OptimizationData(data_model) matrix = OptimizationMatrix.from_data(data) index = 3 - reduced_matrix = matrix.at_index(index).reduce(index, constraints, relations) + reduced_matrix = matrix.at_index(index).reduce(index, relations) estimation = OptimizationEstimation.calculate( reduced_matrix.array, data.data[:, 1], "variable_projection" ) diff --git a/tests/optimization/test_matrix.py b/tests/optimization/test_matrix.py index b7b2e1011..03cdbe417 100644 --- a/tests/optimization/test_matrix.py +++ b/tests/optimization/test_matrix.py @@ -54,16 +54,18 @@ def test_from_global_data(weight: bool): def test_constraints(): data_model = deepcopy(TestDataModelConstantThreeCompartments) - constraints = [ZeroConstraint(type="zero", target="c3_3", interval=[(3, 7)])] + data_model.elements[0].clp_constraints = [ + ZeroConstraint(type="zero", target="c3_3", interval=[(3, 7)]) + ] data = OptimizationData(data_model) matrix = OptimizationMatrix.from_data(data) assert matrix.array.shape == (5, 3) assert matrix.clp_axis == ["c3_1", "c3_2", "c3_3"] - reduced_matrix = matrix.reduce(0, constraints, []) + reduced_matrix = matrix.reduce(0, []) assert reduced_matrix.array.shape == (5, 3) assert reduced_matrix.clp_axis == ["c3_1", "c3_2", "c3_3"] - reduced_matrix = matrix.reduce(3, constraints, []) + reduced_matrix = matrix.reduce(3, []) assert reduced_matrix.array.shape == (5, 2) assert reduced_matrix.clp_axis == ["c3_1", "c3_2"] @@ -82,10 +84,10 @@ def test_relations(): assert matrix.array.shape == (5, 3) assert matrix.clp_axis == ["c3_1", "c3_2", "c3_3"] - reduced_matrix = matrix.at_index(0).reduce(0, [], relations) + reduced_matrix = matrix.at_index(0).reduce(0, relations) assert reduced_matrix.array.shape == (5, 3) assert reduced_matrix.clp_axis == ["c3_1", "c3_2", "c3_3"] - reduced_matrix = matrix.at_index(3).reduce(3, [], relations) + reduced_matrix = matrix.at_index(3).reduce(3, relations) assert reduced_matrix.array.shape == (5, 2) assert reduced_matrix.clp_axis == ["c3_1", "c3_2"] assert np.array_equal(reduced_matrix.array[:, 1], matrix.array[:, 1] + matrix.array[:, 2] * 3) diff --git a/tests/optimization/test_objective.py b/tests/optimization/test_objective.py index e840b2aef..25202ddcd 100644 --- a/tests/optimization/test_objective.py +++ b/tests/optimization/test_objective.py @@ -18,7 +18,7 @@ def test_single_data(): data_model = deepcopy(TestDataModelConstantIndexIndependent) - experiment = ExperimentModel(datasets={"test": data_model}) + experiment = ExperimentModel(datasets={"test_data": data_model}) objective = OptimizationObjective(experiment) assert isinstance(objective._data, OptimizationData) @@ -27,15 +27,24 @@ def test_single_data(): assert penalty.size == data_size result = objective.get_result().data - assert "test" in result - - result_data = result["test"] - assert "matrix" in result_data - assert result_data.matrix.shape == (data_model.data.model.size, 1) - assert "clp" in result_data - assert result_data.clp.shape == (data_model.data["global"].size, 1) - assert "residual" in result_data - assert result_data.residual.shape == data_model.data.data.shape + assert "test_data" in result + result_data = result["test_data"] + print(result_data) + assert "test_ele" in result_data.elements + element_result = result_data.elements["test_ele"] + + assert "test" in element_result.concentrations + assert element_result.concentrations["test"].shape == ( + data_model.data.model.size, + 1, + ) + assert "test" in element_result.amplitudes + assert element_result.amplitudes["test"].shape == ( + data_model.data["global"].size, + 1, + ) + assert "residual" in result_data.data + assert result_data.data.residual.shape == data_model.data.data.shape @pytest.mark.parametrize("weight", {True, False}) @@ -55,6 +64,7 @@ def test_global_data(weight: bool): assert "test" in result result_data = result["test"] + print(result_data) assert "matrix" in result_data assert result_data.matrix.shape == ( (data_model.data["global"].size, data_model.data.model.size, 1) @@ -94,23 +104,33 @@ def test_multiple_data(): assert "independent" in result result_data = result["independent"] - assert "matrix" in result_data - assert result_data.matrix.shape == (data_model_one.data.model.size, 1) - assert "clp" in result_data - assert result_data.clp.shape == (data_model_one.data["global"].size, 1) + print(result_data) + assert "test_associated_concentrations_test_ele" in result_data + assert result_data.test_associated_concentrations_test_ele.shape == ( + data_model_one.data.model.size, + 1, + ) + assert "test_associated_amplitudes_test_ele" in result_data + assert result_data.test_associated_amplitudes_test_ele.shape == ( + data_model_one.data["global"].size, + 1, + ) assert "residual" in result_data assert result_data.residual.shape == data_model_one.data.data.shape assert "dependent" in result result_data = result["dependent"] - assert "matrix" in result_data - assert result_data.matrix.shape == ( + assert "test_associated_concentrations_test_ele_index_dependent" in result_data + assert result_data.test_associated_concentrations_test_ele_index_dependent.shape == ( data_model_two.data["global"].size, data_model_two.data.model.size, 1, ) - assert "clp" in result_data - assert result_data.clp.shape == (data_model_two.data["global"].size, 1) + assert "test_associated_amplitudes_test_ele_index_dependent" in result_data + assert result_data.test_associated_amplitudes_test_ele_index_dependent.shape == ( + data_model_two.data["global"].size, + 1, + ) assert "residual" in result_data # this datamodel has transposed input assert result_data.residual.shape == data_model_two.data.data.T.shape @@ -134,7 +154,6 @@ def test_result_data(weight: bool): result_data = result["test"] assert "root_mean_square_error" in result_data.attrs - assert "custom_element_result" in result_data.attrs assert "data_left_singular_vectors" in result_data assert "data_right_singular_vectors" in result_data assert "residual_left_singular_vectors" in result_data @@ -148,7 +167,6 @@ def test_result_data(weight: bool): if weight: assert "weight" in result_data assert "weighted_root_mean_square_error" in result_data.attrs - assert "weighted_matrix" in result_data assert "weighted_residual" in result_data diff --git a/tests/optimization/test_optimization.py b/tests/optimization/test_optimization.py index 6480285db..119bb1d36 100644 --- a/tests/optimization/test_optimization.py +++ b/tests/optimization/test_optimization.py @@ -42,9 +42,9 @@ def test_single_data(): assert optimized_parameters.close_or_equal(parameters) assert "decay_independent" in optimized_data result_data = optimized_data["decay_independent"] - assert "clp" in result_data + print(result_data) assert "residual" in result_data - assert "fitted_data" in result_data + assert "fit" in result_data def test_multiple_experiments(): diff --git a/tests/optimization/test_relations_and_constraints.py b/tests/optimization/test_relations_and_constraints.py index f6a6c2217..23d287f44 100644 --- a/tests/optimization/test_relations_and_constraints.py +++ b/tests/optimization/test_relations_and_constraints.py @@ -1,5 +1,7 @@ from __future__ import annotations +from copy import deepcopy + import numpy as np import xarray as xr @@ -13,11 +15,13 @@ def test_zero_contraint(): + library = deepcopy(test_library) + library["decay_independent"].clp_constraints = [ZeroConstraint(type="zero", target="c1")] + library["constant"].clp_constraints = [ZeroConstraint(type="zero", target="c1")] data_model_one = DataModel(elements=["decay_independent"]) data_model_two = DataModel(elements=["constant"]) experiment = ExperimentModel( datasets={"decay_independent": data_model_one, "constant": data_model_two}, - clp_constraints=[ZeroConstraint(type="zero", target="c1")], ) parameters = Parameters.from_dict({"rates": {"decay": [0.8, 0.04]}}) @@ -29,14 +33,14 @@ def test_zero_contraint(): ) data_model_one.data = simulate( data_model_one, - test_library, + library, parameters, {"global": global_axis, "model": model_axis}, clp, ) data_model_two.data = simulate( data_model_two, - test_library, + library, parameters, {"global": global_axis, "model": model_axis}, clp, @@ -45,7 +49,7 @@ def test_zero_contraint(): optimization = Optimization( [experiment], parameters, - test_library, + library, raise_exception=True, maximum_number_function_evaluations=1, )