Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ Upcoming Version

* ``add_variables(binary=True, ...)`` now accepts ``lower``/``upper`` bounds, as long as they are 0 or 1. Previously binary bounds could only be set via the ``.lower``/``.upper`` setters after creation. (https://github.com/PyPSA/linopy/issues/776)

**Performance**

* ``LinearExpression.groupby(...).sum()`` now scatters terms directly into the padded result arrays via ``xarray.apply_ufunc``, avoiding intermediate copies and speeding up the grouping. A single kernel covers both numpy and chunked (dask) data, the latter staying lazy. On representative models this lowers build and export peak memory by up to ~3x.

**Deprecations**

* Mutation via assignment to ``Variable.lower`` / ``Variable.upper`` / ``Constraint.coeffs`` / ``Constraint.vars`` / ``Constraint.lhs`` / ``Constraint.sign`` / ``Constraint.rhs`` is deprecated and emits a ``DeprecationWarning``. Use ``Variable.update(...)`` / ``Constraint.update(...)`` instead — the canonical mutation API with one validation path and one place that flips the persistent-solver dirty flag. Read access to these properties is unchanged. The setters will be removed in a future release.
Expand Down
102 changes: 87 additions & 15 deletions linopy/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
FACTOR_DIM,
GREATER_EQUAL,
GROUP_DIM,
GROUPED_TERM_DIM,
HELPER_DIMS,
LESS_EQUAL,
STACKED_TERM_DIM,
Expand Down Expand Up @@ -340,20 +339,8 @@ def sum(

# At this point, group is always a pandas Series
assert isinstance(group, pd.Series)
group_dim = group.index.name

arrays = [group, group.groupby(group).cumcount()]
idx = pd.MultiIndex.from_arrays(arrays, names=[GROUP_DIM, GROUPED_TERM_DIM])
new_coords = Coordinates.from_pandas_multiindex(idx, group_dim)
# collapsing group_dim invalidates every coordinate aligned to it
names_to_drop = [
name
for name, coord in self.data.coords.items()
if group_dim in coord.dims
]
ds = self.data.drop_vars(names_to_drop).assign_coords(new_coords)
ds = ds.unstack(group_dim, fill_value=LinearExpression._fill_value)
ds = LinearExpression._sum(ds, dim=GROUPED_TERM_DIM)

ds = self._grouped_sum(group)

if int_map is not None:
index = ds.indexes[GROUP_DIM].map({v: k for k, v in int_map.items()})
Expand All @@ -374,6 +361,91 @@ def func(ds: Dataset) -> Dataset:

return self.map(func, **kwargs, shortcut=True)

def _grouped_sum(self, group: pd.Series) -> Dataset:
"""
Sum groups by scattering all terms directly into the final padded arrays.

Every group member keeps its block of ``nterm`` terms, so the resulting
term dimension has size ``max_group_size * nterm`` and smaller groups are
padded with fill values. Only the result arrays are allocated, keeping
peak memory at input + result.

The scatter runs inside :func:`xarray.apply_ufunc`, so it covers numpy
and chunked (dask) data alike: for dask the grouped dimension is gathered
into a single chunk and the scatter is applied lazily.
"""
data = self.data
group_dim = group.index.name
fill_value = LinearExpression._fill_value

codes, unique_groups = pd.factorize(group, sort=True)
if (codes == -1).any():
raise ValueError(
"Cannot group by a pandas object containing NaN values. "
"Drop or fill the corresponding entries before grouping."
)

n_groups = len(unique_groups)
max_size = int(np.bincount(codes, minlength=n_groups).max()) if n_groups else 0
# position of each element within its group (order of appearance)
positions = pd.Series(codes).groupby(codes).cumcount().to_numpy()
nterm = data.sizes[TERM_DIM]

def scatter_terms(values: np.ndarray, fill: Any) -> np.ndarray:
# (..., n_elem, nterm) -> (..., n_groups, nterm * max_size); each
# member's nterm block is kept together, padding at the block's end
rest = values.shape[:-2]
out = np.full((*rest, n_groups, nterm, max_size), fill, dtype=values.dtype)
out[..., codes, :, positions] = np.moveaxis(values, -2, 0)
return out.reshape((*rest, n_groups, nterm * max_size))

def group_sum(values: np.ndarray) -> np.ndarray:
# (..., n_elem) -> (..., n_groups), summing within groups, skipping NaN
moved = np.moveaxis(values, -1, 0)
out = np.zeros((n_groups, *moved.shape[1:]), dtype=values.dtype)
np.add.at(out, codes, np.where(np.isnan(moved), 0, moved))
return np.moveaxis(out, 0, -1)

def single_chunk(da: DataArray) -> DataArray:
# the scatter's core dims must each sit in one chunk
if da.chunks is None:
return da
return da.chunk({d: -1 for d in (group_dim, TERM_DIM) if d in da.dims})

def scatter(da: DataArray, fill: Any) -> DataArray:
return xr.apply_ufunc(
scatter_terms,
single_chunk(da),
kwargs={"fill": fill},
input_core_dims=[[group_dim, TERM_DIM]],
output_core_dims=[[GROUP_DIM, TERM_DIM]],
exclude_dims={group_dim, TERM_DIM},
dask="parallelized",
dask_gufunc_kwargs={
"output_sizes": {GROUP_DIM: n_groups, TERM_DIM: nterm * max_size}
},
output_dtypes=[da.dtype],
)

const = xr.apply_ufunc(
group_sum,
single_chunk(data.const),
input_core_dims=[[group_dim]],
output_core_dims=[[GROUP_DIM]],
exclude_dims={group_dim},
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {GROUP_DIM: n_groups}},
output_dtypes=[data.const.dtype],
)
ds = Dataset(
{
"coeffs": scatter(data.coeffs, fill_value["coeffs"]),
"vars": scatter(data.vars, fill_value["vars"]),
"const": const,
}
)
return ds.assign_coords({GROUP_DIM: unique_groups})

def roll(self, **kwargs: Any) -> LinearExpression:
"""
Roll the groupby object.
Expand Down
Loading
Loading