Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 225 additions & 68 deletions src/muse/decisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ def weighted_sum(objectives: Dataset, parameters: Any, **kwargs) -> DataArray:
"weighted_sum",
]

from collections.abc import Mapping, MutableMapping, Sequence
from collections.abc import Hashable, Mapping, MutableMapping, Sequence
from typing import (
Any,
Callable,
)

import numpy as np
import xarray as xr
from xarray import DataArray, Dataset

from muse.registration import registrator
from muse.timeslices import broadcast_timeslice, drop_timeslice
from muse.utilities import tupled_dimension

PARAMS_TYPE = Sequence[tuple[str, bool, float]]
"""Standard decision parameter type.
Expand Down Expand Up @@ -170,76 +174,210 @@ def weighted_sum(objectives: Dataset, parameters: Mapping[str, float]) -> DataAr
def lexical_comparison(
objectives: Dataset, parameters: PARAMS_TYPE | Sequence[tuple[str, float]]
) -> DataArray:
"""Lexical comparison over the objectives.
"""Lexicographic comparison using the best available replacements as reference.

Lexical comparison operates by binning the objectives into bins of width
w_i = min_j(p_i o_i^j). Once binned, dimensions other than `asset` and
`technology` are reduced by taking the max, e.g. the largest constraint.
Finally, the objectives are ranked lexographically, in the order given by the
parameters.

w_i = min_j(p_i o_i^j),

where o_i^j are the objective values of the candidate replacements and the
minimum is taken over the replacement dimension. Once binned, dimensions
other than ``asset`` and ``replacement`` are reduced by taking the maximum
(e.g. the largest constraint). Finally, the objectives are ranked
lexicographically in the order given by ``parameters``.
Comment on lines +183 to +187

The result is an array of tuples which can subsequently be compared
lexicographically.
"""
from muse.utilities import lexical_comparison

assert len(parameters) > 0
if len(parameters[0]) == 3:
parameters = [(u[0], coeff_sign(u[1], u[2])) for u in parameters]
assert set(objectives.data_vars).issuperset([u[0] for u in parameters])
order = [u[0] for u in parameters]

binsize = objectives.copy()
for obj_name, weight in parameters:
binsize[obj_name] = binsize[obj_name] * weight
# Convert (name, min/max, coefficient) specifications to
# (name, signed coefficient), where the sign encodes whether the
# objective should be minimised or maximised.
if len(parameters[0]) == 3:
parameters = [
(name, coeff_sign(minmax, coeff)) for name, minmax, coeff in parameters
]

# Lexicographic priority of the objectives.
order = tuple(name for name, _ in parameters)

# All requested objectives must be present.
assert set(objectives.data_vars).issuperset(order)

# Define the bin widths as
#
# w_i = min_j(p_i o_i^j),
#
# i.e. the weighted objective values of the candidate replacements,
# taking the minimum over the replacement dimension.
binsize = objectives.copy(deep=True)

# Temporarily flatten the timeslice MultiIndex to avoid xarray's
# deprecated Dataset assignment path when modifying variables.
if "timeslice" in binsize.indexes:
index_names = binsize.indexes["timeslice"].names
binsize = binsize.reset_index("timeslice")

# Apply the objective weights (including minimisation/maximisation
# direction encoded in the sign).
for name, weight in parameters:
binsize[name] *= weight

# Restore the original timeslice MultiIndex structure.
if "timeslice" in objectives.indexes:
binsize = binsize.set_index(timeslice=index_names)

# Use the smallest weighted objective across replacements as the
# bin width for each objective.
binsize = binsize.min("replacement")

return lexical_comparison(objectives, binsize, order=order, bin_last=False).rank(
"replacement"
)
# Construct lexicographically comparable tuples and rank the
# replacement options accordingly.
return _lexical_comparison(
objectives,
binsize,
order=order,
keep_last_continuous=True,
).rank("replacement")


@register_decision(name="retro_lexo")
def retro_lexical_comparison(
objectives: Dataset, parameters: PARAMS_TYPE | Sequence[tuple[str, float]]
objectives: Dataset,
parameters: PARAMS_TYPE | Sequence[tuple[str, float]],
) -> DataArray:
"""Lexical comparison over the objectives.
"""Lexicographic comparison using current assets as reference.

Lexical comparison operates by binning the objectives into bins of width
w_i = p_i o_i, where i are the current assets. Once binned, dimensions other
than `asset` and `replacement` are reduced by taking the max, e.g. the
largest constraint. Finally, the objectives are ranked lexographically, in
the order given by the parameters.

w_i = p_i o_i,

where o_i are the objective values of the current assets. Once binned,
dimensions other than ``asset`` and ``replacement`` are reduced by taking
the maximum (e.g. the largest constraint). Finally, the objectives are
ranked lexicographically in the order given by ``parameters``.
Comment on lines +256 to +259

The result is an array of tuples which can subsequently be compared
lexicographically.
"""
from muse.utilities import lexical_comparison

assert len(parameters) > 0

# Convert (name, min/max, coefficient) specifications to
# (name, signed coefficient), where the sign encodes whether the
# objective should be minimised or maximised.
if len(parameters[0]) == 3:
parameters = [(u[0], coeff_sign(u[1], u[2])) for u in parameters]
parameters = [
(name, coeff_sign(minmax, coeff)) for name, minmax, coeff in parameters
]

# Retrofitting compares candidate replacements against the current
# asset, so every asset must also appear amongst the replacements.
assert objectives.asset.isin(objectives.replacement).all()
assert set(objectives.data_vars).issuperset([u[0] for u in parameters])

order = [u[0] for u in parameters]
binsize = Dataset(dict(parameters)) * objectives.sel(replacement=objectives.asset)
return lexical_comparison(objectives, binsize, order=order, bin_last=False).rank(
"replacement"
)
# Lexicographic priority of the objectives.
order = tuple(name for name, _ in parameters)

# All requested objectives must be present.
assert set(objectives.data_vars).issuperset(order)

# Define the bin widths as
#
# w_i = p_i o_i,
#
# where o_i are the objective values of the current assets. The
# objective values are selected by matching each asset to itself
# along the replacement dimension.
binwidths = Dataset(dict(parameters)) * objectives.sel(replacement=objectives.asset)

# Construct lexicographically comparable tuples and rank the
# replacement options accordingly.
return _lexical_comparison(
objectives,
binwidths,
order=order,
keep_last_continuous=True,
).rank("replacement")


def _lexical_comparison(
objectives: xr.Dataset,
binwidths: xr.Dataset,
order: Sequence[Hashable],
*,
keep_last_continuous: bool = False,
) -> xr.DataArray:
"""Lexical comparison over the objectives.

Lexical comparison operates by binning the objectives into bins of width
``binwidths``. Once binned, dimensions other than ``asset`` and
``replacement`` are reduced by taking the maximum (e.g. the largest
constraint). Finally, the objectives are ranked lexicographically in the
order given by ``order``.
Comment on lines +312 to +316

Arguments:
objectives: xr.Dataset containing the objectives to rank.
binwidths: Bin widths used to discretise the objectives.
order: Order in which objectives are compared lexicographically.
keep_last_continuous: Whether the final objective should be left as a
continuous value, rather than being discretised.

Result:
An array of tuples which can subsequently be compared
lexicographically.
"""
# Restrict to the objectives participating in the comparison and
# temporarily flatten the timeslice MultiIndex to avoid Dataset
# assignment issues in xarray.
result = drop_timeslice(objectives[list(order)]).copy()

# All objectives are discretised except, optionally, the final
# tie-breaking objective.
discretized = order[:-1] if keep_last_continuous else order

# Convert objectives to integer-valued bins.
for name in discretized:
result[name] = np.floor(result[name] / binwidths[name]).astype(np.int64)

# Preserve the final objective as a continuous quantity for
# tie-breaking, while still normalising by its bin width.
if keep_last_continuous:
name = order[-1]
result[name] = result[name] / binwidths[name]

# Combine the ordered objectives into tuples that can be compared
# lexicographically.
return result.to_array(dim="objective").reduce(tupled_dimension, dim="objective")


def _epsilon_constraints(
objectives: Dataset, optimize: str, mask: Any | None = None, **epsilons
objectives: Dataset,
optimize: str,
mask: Any | None = None,
**epsilons,
) -> DataArray:
"""Minimizes one objective subject to constraints on other objectives."""
"""Selects the best value of a target objective subject to epsilon constraints.

Each constraint enforces that an objective must be below (or above, after
sign handling upstream) a threshold, aggregated over all non-(asset,
replacement) dimensions.
"""
# Start with all options feasible
constraints = True

# Build feasibility mask from epsilon constraints
for name, epsilon in epsilons.items():
# Reduce over all non-decision dimensions (e.g. timeslice, region)
reduced_dims = set(objectives[name].dims) - {"asset", "replacement"}

# All slices must satisfy constraint
constraints = constraints & (objectives[name] <= epsilon).all(reduced_dims)

# Default mask = something worse than any feasible objective value
if mask is None:
mask = objectives[optimize].max() + 1

# Return objective values, masking infeasible alternatives
return objectives[optimize].where(constraints, mask)


Expand All @@ -249,60 +387,79 @@ def epsilon_constraints(
parameters: PARAMS_TYPE | Sequence[tuple[str, bool, float]],
mask: Any | None = None,
) -> DataArray:
r"""Minimizes first objective subject to constraints on other objectives.
"""Epsilon-constraint optimisation.

The parameters are a sequence of tuples `(name, minimize, epsilon)`, where
`name` is the name of the objective, `minimize` is `True` if minimizing and
false if maximizing that objective, and `epsilon` is the constraint. The
first objective is the one that will be minimized according to:
The first objective is optimised (min or max), while all subsequent
objectives are treated as constraints of the form:

Given objectives :math:`O^{(i)}_t`, with :math:`i \in [|1, N|]` and :math:`t` the
replacement technologies, this function computes the ranking with respect to
:math:`t`:
objective_i <= epsilon_i

.. math::
after sign normalization.
"""
assert set(objectives.data_vars).issuperset([p[0] for p in parameters])

\mathrm{ranking}_{O^{(i)}_t < \epsilon_i} O^{(0)}_t
# Remove obj_data parameters if present
optimize_name, optimize_minimize, _ = parameters[0]

Comment on lines +401 to 403
# Encode optimization direction
do_minimize = Dataset({optimize_name: 1 if optimize_minimize else -1})

The first tuple can be restricted to `(name, minimize)`, since `epsilon` is ignored.
# Remaining objectives also get sign encoding
for name, minimize, _ in parameters[1:]:
do_minimize[name] = coeff_sign(minimize, 1)

The result is the matrix :math:`O^{(0)}` modified such minimizing over the
replacement dimension value would take into account the constraints and the
optimization direction (minimize or maximize). In other words, calling
`result.rank('replacement')` will yield the expected result.
"""
assert set(objectives.data_vars).issuperset([param[0] for param in parameters])
do_minimize = Dataset({k: coeff_sign(v, 1) for k, v, _ in parameters[1:]})
do_minimize[parameters[0][0]] = 1 if parameters[0][1] else -1
dict_params = {k: v for k, _, v in parameters[1:] if k in objectives.data_vars}
constraints = do_minimize * Dataset(dict_params)
# Extract epsilon constraints
epsilons = {
name: coeff_sign(minimize, 1) * eps
for name, minimize, eps in parameters[1:]
if name in objectives.data_vars
}

# Apply sign transformation + constraints
if "timeslice" in objectives.indexes:
do_minimize = broadcast_timeslice(do_minimize)
return _epsilon_constraints(
objectives * do_minimize, parameters[0][0], mask=mask, **constraints.data_vars
objectives * do_minimize,
optimize_name,
mask=mask,
**epsilons,
)


@register_decision(name="retro_epsilon")
def retro_epsilon_constraints(
objectives: Dataset, parameters: PARAMS_TYPE
objectives: Dataset,
parameters: PARAMS_TYPE,
) -> DataArray:
"""Epsilon constraints where the current tech is included.
"""Epsilon-constraint optimisation with asset-relative thresholds.

Modifies the parameters to the function such that the existing technologies are
always competitive.
Epsilon thresholds are adjusted so that the current technology is always
feasible, ensuring it remains in the choice set.
"""
# Extract current asset baseline
asset_objectives = objectives.sel(replacement=objectives.asset)

def transform(name, minimize, epsilon=None):
def adapt_param(name, minimize, epsilon=None):
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
"""Adjust epsilon so that current asset is always feasible."""
if epsilon is None:
return name, minimize
am = getattr(asset_objectives, name)
new_eps = am.where(am > epsilon if minimize else am < epsilon, epsilon)
return name, minimize, new_eps

parameters = [
transform(*param) for param in parameters if param[0] in objectives.data_vars
]
current = asset_objectives[name]

# Work in the same transformed logic as epsilon_constraints
sign = -1 if minimize else 1

# Ensure current asset is not excluded by its own constraint
adjusted = current.where(
(sign * current) <= (sign * epsilon),
epsilon,
)
Comment on lines +449 to +456

return name, minimize, adjusted

# Filter valid objectives and adapt epsilons
parameters = [adapt_param(*p) for p in parameters if p[0] in objectives.data_vars]

return epsilon_constraints(objectives, parameters)


Expand Down
Loading
Loading