Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add safety check #28

Merged
merged 4 commits into from
Sep 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pocs sensor monitor weather --endpoint http://localhost:8080
The web service will serve the weather data as json. The data can be accessed
by going to the `/weather` endpoint. For example, if the web service is
running on `localhost` on port `8080` then the weather data can be accessed at
`http://localhost:8000/weather`.
`http://localhost:8080/weather`.

The [httpie](https://httpie.io/) is installed with this package and can be
used to read the weather data from the command line:
Expand All @@ -84,7 +84,7 @@ http :8080/weather
### Starting

The `aag-weather` command line tool can be used to read the weather data from
the CloudWatcher and store it in a file. The `aag-weather` command line tool
the CloudWatcher and store it in a csv file. The `aag-weather` command line tool
can be run with:

```bash
Expand Down
119 changes: 97 additions & 22 deletions src/aag/weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from rich import print

from aag.commands import WeatherCommand, WeatherResponseCodes
from aag.settings import WeatherSettings, WhichUnits
from aag.settings import WeatherSettings, WhichUnits, Thresholds


class CloudSensor(object):
Expand Down Expand Up @@ -40,12 +40,12 @@ def __init__(self, connect: bool = True, **kwargs):
# Set up a queue for readings
self.readings = deque(maxlen=self.config.num_readings)

self.name = 'CloudWatcher'
self.firmware = None
self.serial_number = None
self.has_anemometer = False
self.name: str = 'CloudWatcher'
self.firmware: str | None = None
self.serial_number: str | None = None
self.has_anemometer: bool = False

self._is_connected = False
self._is_connected: bool = False

if connect:
self._is_connected = self.connect()
Expand All @@ -55,8 +55,30 @@ def is_connected(self) -> bool:
""" Is the sensor connected?"""
return self._is_connected

def connect(self) -> bool:
""" Connect to the sensor. """
@property
def thresholds(self) -> Thresholds:
"""Thresholds for the safety checks."""
return self.config.thresholds

@property
def status(self) -> dict:
"""Returns the most recent reading and safety value."""
return self.readings[-1]

@property
def is_safe(self) -> bool:
"""Is the sensor safe?"""
return self.status['is_safe']

def connect(self, raise_exceptions: bool = True) -> bool:
""" Connect to the sensor.

Args:
raise_exceptions: Whether to raise exceptions, default True.

Returns:
True if connected, False otherwise.
"""
try:
# Initialize and get static values.
self.name = self.query(WeatherCommand.GET_INTERNAL_NAME)
Expand All @@ -71,6 +93,9 @@ def connect(self) -> bool:

self._is_connected = True
except Exception as e:
print(f'[red]Unable to connect to weather sensor. Check the port. {e}')
if raise_exceptions:
raise e
self._is_connected = False

return self._is_connected
Expand All @@ -92,17 +117,17 @@ def capture(self, callback: Callable | None = None, units: WhichUnits = 'none')
except KeyboardInterrupt:
pass

def get_reading(self, enqueue: bool = True, units: WhichUnits = 'none') -> dict:
def get_reading(self, units: WhichUnits = 'none') -> dict:
""" Get a single reading of all values.

Args:
enqueue: Whether to add the reading to the queue, default True.
units: The units to return the reading in, default 'none'.
units: The astropy units to return the reading in, default 'none',
can be 'metric' or 'imperial'.

Returns:
A dictionary of readings.
"""
readings = {
reading = {
'timestamp': datetime.now().isoformat(),
'ambient_temp': self.get_ambient_temperature(),
'sky_temp': self.get_sky_temperature(),
Expand All @@ -112,22 +137,72 @@ def get_reading(self, enqueue: bool = True, units: WhichUnits = 'none') -> dict:
**{f'error_{i}': err for i, err in enumerate(self.get_errors())}
}

# Add the safety values.
reading = self.get_safe_reading(reading)

# Add astropy units if requested.
if units != 'none':
# First make them metric units.
readings['ambient_temp'] *= u.Celsius
readings['sky_temp'] *= u.Celsius
readings['wind_speed'] *= u.m / u.s
readings['pwm'] *= u.percent
reading['ambient_temp'] *= u.Celsius
reading['sky_temp'] *= u.Celsius
reading['wind_speed'] *= u.m / u.s
reading['pwm'] *= u.percent
# Then convert if needed.
if units == 'imperial':
readings['ambient_temp'] = readings['ambient_temp'].to(u.imperial.deg_F, equivalencies=u.temperature())
readings['sky_temp'] = readings['sky_temp'].to(u.imperial.deg_F, equivalencies=u.temperature())
readings['wind_speed'] = readings['wind_speed'].to(u.imperial.mile / u.hour)
reading['ambient_temp'] = reading['ambient_temp'].to(u.imperial.deg_F, equivalencies=u.temperature())
reading['sky_temp'] = reading['sky_temp'].to(u.imperial.deg_F, equivalencies=u.temperature())
reading['wind_speed'] = reading['wind_speed'].to(u.imperial.mile / u.hour)

self.readings.append(reading)

return reading

if enqueue:
self.readings.append(readings)
def get_safe_reading(self, reading: dict) -> dict:
""" Checks the reading against the thresholds.

return readings
Args:
reading: The reading to check.

Returns:
The reading with the safety values added.
"""
reading['cloud_condition'] = 'unknown'
temp_diff = reading['sky_temp'] - reading['ambient_temp']
if temp_diff >= self.thresholds.very_cloudy:
reading['cloud_condition'] = 'very cloudy'
elif temp_diff >= self.thresholds.cloudy:
reading['cloud_condition'] = 'cloudy'
elif temp_diff < self.thresholds.cloudy:
reading['cloud_condition'] = 'clear'

reading['wind_condition'] = 'unknown'
if reading['wind_speed'] is not None:
if reading['wind_speed'] >= self.thresholds.very_gusty:
reading['wind_condition'] = 'very gusty'
elif reading['wind_speed'] >= self.thresholds.gusty:
reading['wind_condition'] = 'gusty'
elif reading['wind_speed'] >= self.thresholds.very_windy:
reading['wind_condition'] = 'very windy'
elif reading['wind_speed'] >= self.thresholds.windy:
reading['wind_condition'] = 'windy'
elif reading['wind_speed'] < self.thresholds.windy:
reading['wind_condition'] = 'calm'

reading['rain_condition'] = 'unknown'
if reading['rain_frequency'] <= self.thresholds.rainy:
reading['rain_condition'] = 'rainy'
elif reading['rain_frequency'] <= self.thresholds.wet:
reading['rain_condition'] = 'wet'
elif reading['rain_frequency'] > self.thresholds.wet:
reading['rain_condition'] = 'dry'

reading['cloud_safe'] = True if reading['cloud_condition'] == 'clear' else False
reading['wind_safe'] = True if reading['wind_condition'] == 'calm' else False
reading['rain_safe'] = True if reading['rain_condition'] == 'dry' else False

reading['is_safe'] = True if reading['cloud_safe'] and reading['wind_safe'] and reading['rain_safe'] else False

return reading

def get_errors(self) -> list[int]:
"""Gets the number of internal errors
Expand Down
84 changes: 83 additions & 1 deletion tests/test_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_create_sensor():
sensor = CloudSensor(connect=False)
assert isinstance(sensor, CloudSensor)
assert not sensor.is_connected
assert sensor.connect() is False
assert sensor.connect(raise_exceptions=False) is False


def test_bad_port():
Expand All @@ -18,3 +18,85 @@ def test_bad_port():
# Should raise an exception
with pytest.raises(Exception):
CloudSensor(connect=False)


def test_connect_loop():
with pytest.raises(Exception):
CloudSensor(connect=True, serial_port='loop://')

sensor = CloudSensor(connect=False, serial_port='loop://')
is_connected = sensor.connect(raise_exceptions=False)
assert isinstance(sensor, CloudSensor)
assert is_connected is False
assert sensor.is_connected is False


def test_get_safe_reading():
os.environ['AAG_SERIAL_PORT'] = 'loop://'
sensor = CloudSensor(connect=False)
assert isinstance(sensor, CloudSensor)
assert not sensor.is_connected
assert sensor.connect(raise_exceptions=False) is False

# Make a fake reading entry.
reading = {
'wind_speed': 10,
'ambient_temp': 20,
'sky_temp': 10,
'timestamp': '2021-01-01T00:00:00',
'rain_frequency': 2500,
'pwm': 0,
}

# Check safety.
reading = sensor.get_safe_reading(reading=reading)
assert reading['is_safe'] is False
assert reading['cloud_safe'] is False
assert reading['cloud_condition'] == 'very cloudy'

# Make safe
reading['ambient_temp'] = 20
reading['sky_temp'] = -20
print(reading)
reading = sensor.get_safe_reading(reading=reading)
print(reading)
assert reading['is_safe'] is True
assert reading['cloud_safe'] is True
assert reading['cloud_condition'] == 'clear'

# Make windy
reading['wind_speed'] = 51
reading = sensor.get_safe_reading(reading=reading)
assert reading['is_safe'] is False
assert reading['wind_safe'] is False
assert reading['wind_condition'] == 'windy'

reading['wind_speed'] = 76
reading = sensor.get_safe_reading(reading=reading)
assert reading['wind_condition'] == 'very windy'

reading['wind_speed'] = 101
reading = sensor.get_safe_reading(reading=reading)
assert reading['wind_condition'] == 'gusty'

reading['wind_speed'] = 126
reading = sensor.get_safe_reading(reading=reading)
assert reading['wind_condition'] == 'very gusty'

# Make rainy
reading['rain_frequency'] = 2000
reading = sensor.get_safe_reading(reading=reading)
assert reading['is_safe'] is False
assert reading['rain_safe'] is False
assert reading['rain_condition'] == 'wet'

reading['rain_frequency'] = 1700
reading = sensor.get_safe_reading(reading=reading)
assert reading['is_safe'] is False
assert reading['rain_safe'] is False
assert reading['rain_condition'] == 'rainy'

# Make dry
reading['rain_frequency'] = 2300
reading = sensor.get_safe_reading(reading=reading)
assert reading['rain_condition'] == 'dry'
Loading