Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Honor match directives when installing in a gap using layout.mode: use_gap #2083

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 57 additions & 14 deletions subiquity/models/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@
import secrets
import tempfile
from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union
from typing import (
Callable,
Dict,
List,
Literal,
Optional,
Sequence,
Set,
Tuple,
TypedDict,
Union,
)

import attr
import more_itertools
Expand Down Expand Up @@ -55,6 +66,26 @@ class NotFinalPartitionError(Exception):
the last one."""


# a match directive is a dict that specifies
# * zero or more keys to filter on
# * an optional sort on size
MatchDirective = TypedDict(
"MatchDirective",
{
"serial": str,
"model": str,
"vendor": str,
"path": str,
"id_path": str,
"devpath": str,
"ssd": bool,
"size": Literal["smallest", "largest"],
"install-media": bool,
},
total=False,
)


@attr.s(auto_attribs=True)
class RecoveryKeyHandler:
# Where to store the key on the live system
Expand Down Expand Up @@ -1650,7 +1681,7 @@ def load_server_data(self, status: StorageResponse):
status.config, blockdevs=None, is_probe_data=False
)

def _make_matchers(self, match: dict) -> Sequence[Callable]:
def _make_matchers(self, match: MatchDirective) -> Sequence[Callable]:
def _udev_val(disk, key):
return self._probe_data["blockdev"].get(disk.path, {}).get(key, "")

Expand Down Expand Up @@ -1709,7 +1740,7 @@ def match_nonzero_size(disk):

return matchers

def _sorted_matches(self, disks: Sequence[_Device], match: dict):
def _sorted_matches(self, disks: Sequence[_Device], match: MatchDirective):
# sort first on the sort_key. Objective here is that if we are falling
# back to arbitrary disk selection, we're at least consistent in what
# disk we arbitrarily select across runs
Expand All @@ -1723,27 +1754,39 @@ def _sorted_matches(self, disks: Sequence[_Device], match: dict):
disks.sort(key=lambda d: d.size, reverse=True)
return disks

def _filtered_matches(self, disks: Sequence[_Device], match: dict):
def _filtered_matches(self, disks: Sequence[_Device], match: MatchDirective):
matchers = self._make_matchers(match)
return [disk for disk in disks if all(match_fn(disk) for match_fn in matchers)]

def disk_for_match(
self, disks: Sequence[_Device], match: dict | Sequence[dict]
) -> _Device:
# a match directive is a dict, or a list of dicts, that specify
# * zero or more keys to filter on
# * an optional sort on size
def _matching_disks(
self, disks: Sequence[_Device], match: MatchDirective | Sequence[MatchDirective]
) -> tuple[list[_Device], Optional[MatchDirective]]:
log.info(f"considering {disks} for {match}")
if isinstance(match, dict):
if not isinstance(match, Sequence):
match = [match]
for m in match:
candidates = self._filtered_matches(disks, m)
candidates = self._sorted_matches(candidates, m)
if candidates:
log.info(f"For match {m}, using the first candidate from {candidates}")
return candidates[0]
return candidates, m
log.info(f"No devices satisfy criteria {match}")
return None
return [], None

def disks_for_match(
self, disks: Sequence[_Device], match: MatchDirective | Sequence[MatchDirective]
) -> list[_Device]:
candidates, _ = self._matching_disks(disks, match)
return candidates

def disk_for_match(
self, disks: Sequence[_Device], match: MatchDirective | Sequence[MatchDirective]
) -> Optional[_Device]:
candidates, m = self._matching_disks(disks, match)
if candidates:
log.info(f"For match {m}, using the first candidate from {candidates}")
return candidates[0]
else:
return None

def assign_omitted_offsets(self):
"""Assign offsets to partitions that do not already have one.
Expand Down
90 changes: 54 additions & 36 deletions subiquity/models/tests/test_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -1870,24 +1870,28 @@ def test_empty_match_directive(self):
d2 = make_disk(m)

# this test relies heavily on the assumptions in make_disk
actual = m.disk_for_match([d1, d2], {})
self.assertEqual(d1, actual)
actual = m.disk_for_match([d2, d1], {})
self.assertEqual(d1, actual)
self.assertEqual(d1, m.disk_for_match([d1, d2], {}))
self.assertEqual([d1, d2], m.disks_for_match([d1, d2], {}))
self.assertEqual(d1, m.disk_for_match([d2, d1], {}))
self.assertEqual([d1, d2], m.disks_for_match([d2, d1], {}))

def test_sort_largest(self):
m = make_model()
d100 = make_disk(m, size=100 << 30, serial="s1", path="/dev/d1")
d200 = make_disk(m, size=200 << 30, serial="s2", path="/dev/d2")
actual = m.disk_for_match([d100, d200], {"size": "largest"})
self.assertEqual(d200, actual)
self.assertEqual(d200, m.disk_for_match([d100, d200], {"size": "largest"}))
self.assertEqual(
[d200, d100], m.disks_for_match([d100, d200], {"size": "largest"})
)

def test_sort_smallest(self):
m = make_model()
d200 = make_disk(m, size=200 << 30)
d100 = make_disk(m, size=100 << 30)
actual = m.disk_for_match([d200, d100], {"size": "smallest"})
self.assertEqual(d100, actual)
self.assertEqual(d100, m.disk_for_match([d200, d100], {"size": "smallest"}))
self.assertEqual(
[d100, d200], m.disks_for_match([d200, d100], {"size": "smallest"})
)

@parameterized.expand(match_sort_criteria)
def test_sort_serial(self, sort_criteria: str):
Expand All @@ -1896,48 +1900,48 @@ def test_sort_serial(self, sort_criteria: str):
d2 = make_disk(m, serial="s2", path=None, wwn=None)
# while the size sort is reversed when doing largest,
# we pre-sort on the other criteria, and stable sort helps out
actual = m.disk_for_match([d2, d1], {"size": sort_criteria})
self.assertEqual(d1, actual)
self.assertEqual(d1, m.disk_for_match([d2, d1], {"size": sort_criteria}))
self.assertEqual([d1, d2], m.disks_for_match([d2, d1], {"size": sort_criteria}))

@parameterized.expand(match_sort_criteria)
def test_sort_path(self, sort_criteria: str):
m = make_model()
d1 = make_disk(m, serial=None, path="/dev/d1", wwn=None)
d2 = make_disk(m, serial=None, path="/dev/d2", wwn=None)
actual = m.disk_for_match([d2, d1], {"size": sort_criteria})
self.assertEqual(d1, actual)
self.assertEqual(d1, m.disk_for_match([d2, d1], {"size": sort_criteria}))
self.assertEqual([d1, d2], m.disks_for_match([d2, d1], {"size": sort_criteria}))

@parameterized.expand(match_sort_criteria)
def test_sort_wwn(self, sort_criteria: str):
m = make_model()
d1 = make_disk(m, serial=None, path=None, wwn="w1")
d2 = make_disk(m, serial=None, path=None, wwn="w2")
actual = m.disk_for_match([d2, d1], {"size": sort_criteria})
self.assertEqual(d1, actual)
self.assertEqual(d1, m.disk_for_match([d2, d1], {"size": sort_criteria}))
self.assertEqual([d1, d2], m.disks_for_match([d2, d1], {"size": sort_criteria}))

@parameterized.expand(match_sort_criteria)
def test_sort_wwn_wins(self, sort_criteria: str):
m = make_model()
d1 = make_disk(m, serial="s2", path="/dev/d2", wwn="w1")
d2 = make_disk(m, serial="s1", path="/dev/d1", wwn="w2")
actual = m.disk_for_match([d2, d1], {"size": sort_criteria})
self.assertEqual(d1, actual)
self.assertEqual(d1, m.disk_for_match([d2, d1], {"size": sort_criteria}))
self.assertEqual([d1, d2], m.disks_for_match([d2, d1], {"size": sort_criteria}))

@parameterized.expand(match_sort_criteria)
def test_sort_serial_wins(self, sort_criteria: str):
m = make_model()
d1 = make_disk(m, serial="s1", path="/dev/d2", wwn="w")
d2 = make_disk(m, serial="s2", path="/dev/d1", wwn="w")
actual = m.disk_for_match([d2, d1], {"size": sort_criteria})
self.assertEqual(d1, actual)
self.assertEqual(d1, m.disk_for_match([d2, d1], {"size": sort_criteria}))
self.assertEqual([d1, d2], m.disks_for_match([d2, d1], {"size": sort_criteria}))

@parameterized.expand(match_sort_criteria)
def test_sort_path_wins(self, sort_criteria: str):
m = make_model()
d1 = make_disk(m, serial="s", path="/dev/d1", wwn="w")
d2 = make_disk(m, serial="s", path="/dev/d2", wwn="w")
actual = m.disk_for_match([d2, d1], {"size": sort_criteria})
self.assertEqual(d1, actual)
self.assertEqual(d1, m.disk_for_match([d2, d1], {"size": sort_criteria}))
self.assertEqual([d1, d2], m.disks_for_match([d2, d1], {"size": sort_criteria}))

def test_sort_raid(self):
m = make_model()
Expand All @@ -1947,8 +1951,8 @@ def test_sort_raid(self):
d2_2 = make_disk(m, size=200 << 30)
r1 = make_raid(m, disks={d1_1, d1_2})
r2 = make_raid(m, disks={d2_1, d2_2})
actual = m.disk_for_match([r1, r2], {"size": "largest"})
self.assertEqual(r2, actual)
self.assertEqual(r2, m.disk_for_match([r1, r2], {"size": "largest"}))
self.assertEqual([r2, r1], m.disks_for_match([r1, r2], {"size": "largest"}))

@parameterized.expand(match_sort_criteria)
def test_sort_raid_on_disks(self, sort_criteria: str):
Expand All @@ -1959,23 +1963,23 @@ def test_sort_raid_on_disks(self, sort_criteria: str):
d2_2 = make_disk(m, serial=None, path=None, wwn="w2_2")
r1 = make_raid(m, disks={d1_1, d1_2})
r2 = make_raid(m, disks={d2_1, d2_2})
actual = m.disk_for_match([r1, r2], {"size": sort_criteria})
self.assertEqual(r1, actual)
self.assertEqual(r1, m.disk_for_match([r1, r2], {"size": sort_criteria}))
self.assertEqual([r1, r2], m.disks_for_match([r1, r2], {"size": sort_criteria}))

def test_skip_empty(self):
m = make_model()
d0 = make_disk(m, size=0)
d100 = make_disk(m, size=100 << 30)
actual = m.disk_for_match([d0, d100], {"size": "smallest"})
self.assertEqual(d100, actual)
self.assertEqual(d100, m.disk_for_match([d0, d100], {"size": "smallest"}))
self.assertEqual([d100], m.disks_for_match([d0, d100], {"size": "smallest"}))

def test_skip_in_use_size(self):
m = make_model()
d100 = make_disk(m, size=100 << 30)
d200 = make_disk(m, size=200 << 30)
d100._has_in_use_partition = True
actual = m.disk_for_match([d100, d200], {"size": "smallest"})
self.assertEqual(d200, actual)
self.assertEqual(d200, m.disk_for_match([d100, d200], {"size": "smallest"}))
self.assertEqual([d200], m.disks_for_match([d100, d200], {"size": "smallest"}))

def test_skip_in_use_ssd(self):
m = make_model()
Expand All @@ -1984,16 +1988,22 @@ def test_skip_in_use_ssd(self):
d_in_use._has_in_use_partition = True
d_in_use.info_for_display = Mock(return_value={"rotational": "false"})
d_not_used.info_for_display = Mock(return_value={"rotational": "false"})
actual = m.disk_for_match([d_in_use, d_not_used], {"ssd": True})
self.assertEqual(d_not_used, actual)
self.assertEqual(
d_not_used, m.disk_for_match([d_in_use, d_not_used], {"ssd": True})
)
self.assertEqual(
[d_not_used], m.disks_for_match([d_in_use, d_not_used], {"ssd": True})
)

def test_matcher_serial(self):
m = make_model()
d1 = make_disk(m, serial="1")
d2 = make_disk(m, serial="2")
fake_up_blockdata(m)
self.assertEqual(d1, m.disk_for_match([d1, d2], {"serial": "1"}))
self.assertEqual([d1], m.disks_for_match([d1, d2], {"serial": "1"}))
self.assertEqual(d2, m.disk_for_match([d1, d2], {"serial": "2"}))
self.assertEqual([d2], m.disks_for_match([d1, d2], {"serial": "2"}))

def test_matcher_model(self):
m = make_model()
Expand All @@ -2002,7 +2012,9 @@ def test_matcher_model(self):
d2 = make_disk(m)
fake_up_blockdata_disk(d2, ID_MODEL="m2")
self.assertEqual(d1, m.disk_for_match([d1, d2], {"model": "m1"}))
self.assertEqual([d1], m.disks_for_match([d1, d2], {"model": "m1"}))
self.assertEqual(d2, m.disk_for_match([d1, d2], {"model": "m2"}))
self.assertEqual([d2], m.disks_for_match([d1, d2], {"model": "m2"}))

def test_matcher_vendor(self):
m = make_model()
Expand All @@ -2011,35 +2023,39 @@ def test_matcher_vendor(self):
d2 = make_disk(m)
fake_up_blockdata_disk(d2, ID_VENDOR="v2")
self.assertEqual(d1, m.disk_for_match([d1, d2], {"vendor": "v1"}))
self.assertEqual([d1], m.disks_for_match([d1, d2], {"vendor": "v1"}))
self.assertEqual(d2, m.disk_for_match([d1, d2], {"vendor": "v2"}))
self.assertEqual([d2], m.disks_for_match([d1, d2], {"vendor": "v2"}))

def test_matcher_path(self):
m = make_model()
vda = make_disk(m, path="/dev/vda")
vdb = make_disk(m, path="/dev/vdb")
fake_up_blockdata(m)
self.assertEqual(vda, m.disk_for_match([vda, vdb], {"path": "/dev/vda"}))
self.assertEqual([vda], m.disks_for_match([vda, vdb], {"path": "/dev/vda"}))
self.assertEqual(vdb, m.disk_for_match([vda, vdb], {"path": "/dev/vdb"}))
self.assertEqual([vdb], m.disks_for_match([vda, vdb], {"path": "/dev/vdb"}))

def test_matcher_id_path(self):
m = make_model()
vda = make_disk(m)
fake_up_blockdata_disk(vda, ID_PATH="pci-0000:00:00.0-nvme-vda")
vdb = make_disk(m)
fake_up_blockdata_disk(vdb, ID_PATH="pci-0000:00:00.0-nvme-vdb")
actual = m.disk_for_match([vda, vdb], {"id_path": "*vda"})
self.assertEqual(vda, actual)
actual = m.disk_for_match([vda, vdb], {"id_path": "*vdb"})
self.assertEqual(vdb, actual)
self.assertEqual(vda, m.disk_for_match([vda, vdb], {"id_path": "*vda"}))
self.assertEqual([vda], m.disks_for_match([vda, vdb], {"id_path": "*vda"}))
self.assertEqual(vdb, m.disk_for_match([vda, vdb], {"id_path": "*vdb"}))
self.assertEqual([vdb], m.disks_for_match([vda, vdb], {"id_path": "*vdb"}))

def test_matcher_install_media(self):
m = make_model()
iso = make_disk(m)
iso._has_in_use_partition = True
disk = make_disk(m)
fake_up_blockdata(m)
actual = m.disk_for_match([iso, disk], {"install-media": True})
self.assertEqual(iso, actual)
self.assertEqual(iso, m.disk_for_match([iso, disk], {"install-media": True}))
self.assertEqual([iso], m.disks_for_match([iso, disk], {"install-media": True}))

def test_match_from_list_first(self):
m = make_model()
Expand All @@ -2051,6 +2067,7 @@ def test_match_from_list_first(self):
{"path": "/dev/vdb"},
]
self.assertEqual(vda, m.disk_for_match([vda, vdb], match))
self.assertEqual([vda], m.disks_for_match([vda, vdb], match))

def test_match_from_list_second(self):
m = make_model()
Expand All @@ -2062,3 +2079,4 @@ def test_match_from_list_second(self):
{"path": "/dev/vdb"},
]
self.assertEqual(vdb, m.disk_for_match([vda, vdb], match))
self.assertEqual([vdb], m.disks_for_match([vda, vdb], match))
Loading
Loading