-
Notifications
You must be signed in to change notification settings - Fork 55
/
vspec.py
133 lines (103 loc) · 3.77 KB
/
vspec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright (c) 2023 Contributors to COVESA
#
# This program and the accompanying materials are made available under the
# terms of the Mozilla Public License 2.0 which is available at
# https://www.mozilla.org/en-US/MPL/2.0/
#
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from vss_tools import log
class IncludeStatementException(Exception):
pass
class IncludeNotFoundException(Exception):
pass
class InvalidSpecDuplicatedEntryException(Exception):
pass
class SpecException(Exception):
pass
class Include:
def __init__(self, statement: str, prefix: str | None = None):
self.statement = statement
split = statement.split()
if len(split) < 2:
raise IncludeStatementException(f"Malformed include statement: {statement}")
self.target = split[1]
self.prefix = prefix
if len(split) == 3:
if self.prefix is not None:
self.prefix += f".{split[2]}"
else:
self.prefix = split[2]
def resolve_path(self, include_dirs: list[Path]) -> Path:
for dir in include_dirs:
path = dir / self.target
if path.exists():
log.debug(f"'{self.statement}', resolved={path}")
return path
raise IncludeNotFoundException(f"Unable to find include {self.target}. Include dirs: {include_dirs}")
def deep_update(base: dict[str, Any], update: dict[str, Any]) -> None:
for key, value in update.items():
if isinstance(value, dict):
if key in base and isinstance(base[key], dict):
deep_update(base[key], value)
else:
base[key] = value
else:
base[key] = value
class VSpec:
def __init__(
self,
source: Path,
prefix: str | None = None,
):
self.source = source
self.prefix = prefix
content = source.read_text()
self.data = yaml.safe_load(content)
if self.data is None:
self.data = {}
if prefix:
tmp_data = {}
for k, v in self.data.items():
new_key = f"{prefix}.{k}"
tmp_data[new_key] = v
self.data = tmp_data
lines = content.splitlines()
include_statements = [line.strip() for line in lines if line.strip().startswith("#include")]
self.includes = [Include(statement, prefix) for statement in include_statements]
def __str__(self) -> str:
return f"{self.__class__.__name__}, src={self.source}, prefix={self.prefix}, includes={len(self.includes)}"
def update(self, other: VSpec) -> None:
deep_update(self.data, other.data)
def get_vspecs(includes: list[Path], spec: Path, prefix: str | None = None) -> list[VSpec]:
vspecs: list[VSpec] = []
vspec = VSpec(spec, prefix)
vspecs.append(vspec)
for include in vspec.includes:
include_spec = include.resolve_path(includes + [vspec.source.parent])
vspecs.extend(get_vspecs(includes, include_spec, include.prefix))
return vspecs
def load_vspec(include_dirs: list[Path], specs: list[Path], identifier: str | None = None) -> VSpec:
spec = None
vspecs: list[VSpec] = []
for s in specs:
includes = [s.parent] + include_dirs
vspecs.extend(get_vspecs(includes, s))
pre = "VSpecs"
if identifier:
pre += f" ({identifier})"
log.info(f"{pre} loaded, amount={len(vspecs)}")
for vspec in vspecs:
log.debug(vspec)
if spec is None:
spec = vspec
else:
spec.update(vspec)
if not spec:
msg = f"Weird behavior. Could not load any spec: {specs}"
log.error(msg)
raise SpecException(msg)
return spec