Skip to content

Commit

Permalink
More minor typing edits
Browse files Browse the repository at this point in the history
  • Loading branch information
GraysonBellamy committed Jun 11, 2024
1 parent 26ef4d0 commit 2173d35
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
19 changes: 10 additions & 9 deletions pyalicat/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from abc import ABC, abstractmethod
from collections.abc import ByteString
from typing import Optional

import anyio
import anyio.lowlevel
Expand All @@ -26,14 +25,14 @@ def __init__(self, timeout: int) -> None:
self.timeout = timeout

@abstractmethod
async def _read(self, len: int) -> Optional[str]:
async def _read(self, len: int) -> ByteString | None:
"""Reads the serial communication.
Args:
len (int): The length of the serial communication to read. One character if not specified.
Returns:
str: The serial communication.
ByteString: The serial communication.
"""
pass

Expand All @@ -52,7 +51,7 @@ async def close(self):
pass

@abstractmethod
async def _readline(self) -> Optional[str]:
async def _readline(self) -> str | None:
"""Reads the serial communication until end-of-line character reached.
Returns:
Expand All @@ -61,7 +60,7 @@ async def _readline(self) -> Optional[str]:
pass

@abstractmethod
async def _write_readline(self, command: str) -> Optional[str]:
async def _write_readline(self, command: str) -> str | None:
"""Writes the serial communication and reads the response until end-of-line character reached.
Args:
Expand Down Expand Up @@ -118,7 +117,7 @@ def __init__(
self.isOpen = False
self.ser_devc = SerialStream(**self.serial_setup)

async def _read(self, len: int = None) -> ByteString:
async def _read(self, len: int | None = None) -> ByteString | None:
"""Reads the serial communication.
Args:
Expand Down Expand Up @@ -178,22 +177,24 @@ async def _readline(self) -> str:
self.isOpen = False
return line.decode("ascii")

async def _write_readall(self, command: str, timeout: int = None) -> list[str]:
async def _write_readall(
self, command: str, timeout: int | None = None
) -> list[str]:
"""Write command and read until timeout reached.
Args:
command (str): The serial communication.
timeout (int): The timeout of the Alicat device in ms.
Returns:
list[str]: List of lines read from the device.
arr_line (list[str]): List of lines read from the device.
"""
timeout = self.timeout if timeout is None else timeout
async with self.ser_devc:
self.isOpen = True
await self._write(command)
line = bytearray()
arr_line = []
arr_line: list[str] = []
while True:
c = None
with anyio.move_on_after(
Expand Down
54 changes: 33 additions & 21 deletions pyalicat/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@
import json
import re
import warnings
from abc import ABC
from typing import Any
from abc import ABC, abstractmethod
from typing import Any, Self

import anyio
from anyio import run
from comm import SerialDevice

# from .device import Device
warnings.filterwarnings("always")


Expand All @@ -34,7 +31,7 @@ class VersionError(Exception):
gases = codes["gases"][0]


def all_subclasses(cls: type[ABC]) -> set[type[ABC]]:
def all_subclasses(cls: type) -> set[type]:
"""Returns all subclasses of a class.
Args:
Expand All @@ -51,8 +48,25 @@ def all_subclasses(cls: type[ABC]) -> set[type[ABC]]:
class Device(ABC):
"""Generic device class."""

@classmethod
@abstractmethod
def is_model(cls, model: str) -> bool:
"""Determines if the model is the correct model for the device.
Args:
model (str): The model to check.
Returns:
bool: True if the model is correct, False otherwise.
"""
pass

def __init__(
self, device: SerialDevice, dev_info: dict, id: str = "A", **kwargs: Any
self,
device: SerialDevice,
dev_info: dict[str, str],
id: str = "A",
**kwargs: Any,
) -> None:
"""Initialize the Device object.
Expand All @@ -74,7 +88,7 @@ def __init__(
)

@classmethod
async def new_device(cls, port: str, id: str = "A", **kwargs: Any):
async def new_device(cls, port: str, id: str = "A", **kwargs: Any) -> Self:
"""Creates a new device. Chooses appropriate device based on characteristics.
Example:
Expand Down Expand Up @@ -149,11 +163,9 @@ async def request(
dict[str, str | float]: The requested statistics.
"""
if len(stats) > 13:
# print("Too many statistics requested, discarding excess")
raise IndexError("Too many statistics requested")
stats = stats[:13]
# stats = stats[:13]
ret = await self._device._write_readline(
# Add 150 ms to the given time
f"{self._id}DV {time} {' '.join(str(statistics[stat]) for stat in stats)}" # add a parameter for time out here
)
ret = ret.split()
Expand All @@ -171,7 +183,7 @@ async def start_stream(self) -> None:
await self._device._write(f"{self._id}@ @")
return

async def stop_stream(self, new_id: str = None) -> None:
async def stop_stream(self, new_id: str | None = None) -> None:
"""Stops streaming data from device.
Example:
Expand All @@ -186,7 +198,9 @@ async def stop_stream(self, new_id: str = None) -> None:
self.id = new_id
return

async def gas(self, gas: str = "", save: bool = "") -> dict[str, str]:
async def gas(
self, gas: str | None = None, save: bool | None = None
) -> dict[str, str]:
"""Gets/Sets the gas of the device.
Example:
Expand All @@ -204,18 +218,17 @@ async def gas(self, gas: str = "", save: bool = "") -> dict[str, str]:
"""
LABELS = ["Unit_ID", "Gas_Code", "Gas", "Gas_Long"]
if gas and self._vers and self._vers < 10.05:
# print("Error: Version earlier than 10v05, running Set Gas")
warnings.warn("Version earlier than 10v05, running Set Gas")
return await self.set_gas(gas)
return await self._set_gas(gas)
gas = gases.get(gas, "")
if not gas:
save = ""
save = None
if isinstance(save, bool):
save = "1" if save else "0"
ret = await self._device._write_readline(f"{self._id}GS {gas} {save}")
savestr = "1" if save else "0"
ret = await self._device._write_readline(f"{self._id}GS {gas or ""} {savestr}")
return dict(zip(LABELS, ret.split()))

async def _set_gas(self, gas: str = "") -> dict[str, str]:
async def _set_gas(self, gas: str | None = None) -> dict[str, str]:
"""Sets the gas of the device.
Example:
Expand All @@ -237,7 +250,7 @@ async def _set_gas(self, gas: str = "") -> dict[str, str]:
if self._df_format is None:
await self.get_df_format()
gas = gases.get(gas, "")
ret = await self._device._write_readline(f"{self._id}G {gas}")
ret = await self._device._write_readline(f"{self._id}G {gas or ""}")
df = ret.split()
for index in [idx for idx, s in enumerate(self._df_ret) if "decimal" in s]:
df[index] = float(df[index])
Expand Down Expand Up @@ -1858,7 +1871,6 @@ async def zero_pressure_control(self, enable: bool = "") -> dict[str, str]:
if self._vers and self._vers < 10.05:
# print("Error: Version earlier than 10v05")
raise VersionError("Version earlier than 10v05")
return
LABELS = ["Unit_ID", "Active_Ctrl"]
if isinstance(enable, bool):
enable = "1" if enable else "0"
Expand Down

0 comments on commit 2173d35

Please sign in to comment.