From cb5aeb0a67214aa76d98a5a64509a20693083bf4 Mon Sep 17 00:00:00 2001 From: GraysonBellamy Date: Thu, 23 May 2024 16:42:37 -0400 Subject: [PATCH] Errors --- pyalicat/daq.py | 32 +++++--- pyalicat/device.py | 182 +++++++++++++++++++++++++++++---------------- pyalicat/util.py | 37 +++------ 3 files changed, 149 insertions(+), 102 deletions(-) diff --git a/pyalicat/daq.py b/pyalicat/daq.py index bbda876..ef48e21 100644 --- a/pyalicat/daq.py +++ b/pyalicat/daq.py @@ -5,6 +5,7 @@ """ import device +from typing import Any from trio import run @@ -28,14 +29,14 @@ def __init__(self) -> None: return @classmethod - async def init(cls, devs: dict[str, str]) -> "DAQ": + async def init(cls, devs: dict[str, str | device.Device]) -> "DAQ": """Initializes the DAQ. Example: Daq = run(DAQ.init, {'A':'/dev/ttyUSB0', 'B':'/dev/ttyUSB1'}) Args: - devs (dict[str, str]): The dictionary of devices to add. Name:Port + devs (dict[str, str | device.Device]): The dictionary of devices to add. Name:Port Returns: DAQ: The DAQ object. @@ -44,19 +45,26 @@ async def init(cls, devs: dict[str, str]) -> "DAQ": await daq.add_device(devs) return daq - async def add_device(self, devs: dict[str, str]) -> None: + async def add_device( + self, devs: dict[str, str | device.Device], **kwargs: Any + ) -> None: """Creates and initializes the devices. Args: - devs (dict[str, str]): The dictionary of devices to add. Name:Port + devs (dict[str, str | Device]): The dictionary of devices to add. Name:Port or Name:Device + **kwargs: Any """ - if isinstance(devs, str): - devs = devs.split() - # This works if the string is the format "Name Port" - devs = {devs[0]: devs[1]} - for name in devs: - dev = await device.Device.new_device(devs[name]) - dev_list.update({name: dev}) + if devs: + if isinstance(devs, str): + devs = devs.split() + # This works if the string is the format "Name Port" + devs = {devs[0]: devs[1]} + for name in devs: + if isinstance(devs[name], str): + dev = await device.Device.new_device(devs[name], **kwargs) + dev_list.update({name: dev}) + elif isinstance(devs[name], device.Device): + dev_list.update({name: devs[name]}) return async def remove_device(self, name: list[str]) -> None: @@ -148,5 +156,7 @@ def __init__(self, config: dict) -> None: ---------- config : dict The configuration dictionary. {Name : port} + Device, quality, and rate are required. + Prototype with a .csv, eventually .hdf5 when ready. """ pass diff --git a/pyalicat/device.py b/pyalicat/device.py index a649427..623e5d5 100644 --- a/pyalicat/device.py +++ b/pyalicat/device.py @@ -7,9 +7,17 @@ from comm import SerialDevice import trio from trio import run +import warnings # from .device import Device + +class VersionError(Exception): + """Raised when the version of the device does not support command.""" + + pass + + with open("codes.json") as f: codes = json.load(f) statistics = codes["statistics"][0] @@ -132,7 +140,8 @@ async def request( dict[str, str | float]: The requested statistics. """ if len(stats) > 13: - print("Too many statistics requested, discarding excess") + # print("Too many statistics requested, discarding excess") + raise IndexError("Too many statistics requested") stats = stats[:13] ret = await self._device._write_readline( # Add 150 ms to the given time @@ -184,7 +193,8 @@ async def gas(self, gas: str = "", save: bool = "") -> dict[str, str]: """ LABELS = ["Unit ID", "Gas Code", "Gas", "Gas Long"] if gas and self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05, running Set Gas") + # print("Error: Version earlier than 10v05, running Set Gas") + warnings.warn("Version earlier than 10v05, running Set Gas") return await self.set_gas(gas) gas = gases.get(gas, "") if not gas: @@ -210,7 +220,8 @@ async def _set_gas(self, gas: str = "") -> dict[str, str]: dict[str, str]: Dataframe with new gas. """ if self._vers and self._vers >= 10.05: - print("Error: Version later than 10v05, running Active Gas") + # print("Error: Version later than 10v05, running Active Gas") + warnings.warn("Version later than 10v05, running Active Gas") return await self.gas(gas) if self._df_format is None: await self.get_df_format() @@ -325,7 +336,8 @@ async def auto_tare( dict[str, str | float]: If tare is active or not and delay length in seconds """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Auto-tare", "Delay (s)"] if isinstance(enable, bool): @@ -354,7 +366,8 @@ async def configure_data_frame(self, format: int = "") -> dict[str, str | float] dict[str, str | float]: Data Frame in new format """ if self._vers and self._vers < 6.00: - print("Error: Version earlier than 6v00") + # print("Error: Version earlier than 6v00") + raise VersionError("Version earlier than 6v00") return # Gets the format of the dataframe if it is not already known if self._df_format is None: @@ -365,7 +378,7 @@ async def configure_data_frame(self, format: int = "") -> dict[str, str | float] df[index] = float(df[index]) return dict(zip(self._df_format, df)) - async def engineering_units( + async def _engineering_units( self, statistic_value: str = "", unit: str = "", @@ -390,7 +403,8 @@ async def engineering_units( dict[str, str]: Responds with unit """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Unit Code", "Unit Label"] if isinstance(group, bool): @@ -421,7 +435,8 @@ async def flow_press_avg( dict[str, str | float]: Responds value of queried average and avg time const """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Value", "Time Const"] if stat_val.upper() == "ALL": @@ -449,7 +464,8 @@ async def full_scale_val( dict[str, str | float]: Responds max value of statistic and units """ if self._vers and self._vers < 6.00: - print("Error: Version earlier than 6v00") + # print("Error: Version earlier than 6v00") + raise VersionError("Version earlier than 6v00") return LABELS = ["Unit ID", "Max Value", "Unit Code", "Unit Label"] ret = await self._device._write_readline( @@ -472,7 +488,8 @@ async def power_up_tare(self, enable: bool = "") -> dict[str, str]: dict[str, str]: If tare is enabled """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Power-Up Tare"] if isinstance(enable, bool): @@ -493,7 +510,8 @@ async def data_frame(self) -> str: str: table that outlines data frame format """ if self._vers and self._vers < 6.00: - print("Error: Version earlier than 6v00") + # print("Error: Version earlier than 6v00") + raise VersionError("Version earlier than 6v00") return ret = await self._device._write_readall(f"{self._id}??D*") return ret @@ -517,7 +535,8 @@ async def stp_press( dict[str, str | float]: Current pressure reference point and units """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Curr Press Ref", "Unit Code", "Unit Label"] if stp.upper == "NTP": @@ -550,7 +569,8 @@ async def stp_temp( dict[str, str | float]: Current temperature reference point and units """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Curr Temp Ref", "Unit Code", "Unit Label"] if stp.upper == "NTP": @@ -580,7 +600,8 @@ async def zero_band(self, zb: float = "") -> dict[str, str | float]: dict[str, str | float]: Returns current zero band as percent of full scale """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Zero Band (%)"] if isinstance(zb, (float, int)): @@ -611,7 +632,8 @@ async def analog_out_source( dict[str, str]: Statistic and units """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Value", "Unit Code", "Unit Label"] if primary.upper() == "SECONDARY" or primary.upper() == "2ND": @@ -658,7 +680,8 @@ async def baud(self, new_baud: int = "") -> dict[str, str | float]: dict[str, str | float]: Baud rate, either current or new """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Baud"] VALID_BAUD_RATES = [2400, 4800, 9600, 19200, 38400, 57600, 115200] @@ -684,7 +707,8 @@ async def blink(self, dur: int = "") -> dict[str, str]: dict[str, str]: If the display is currently blinking """ if self._vers and self._vers < 8.28: - print("Error: Version earlier than 8v28") + # print("Error: Version earlier than 8v28") + raise VersionError("Version earlier than 8v28") return LABELS = ["Unit ID", "Flashing?"] ret = await self._device._write_readline(f"{self._id}FFP {dur}") @@ -770,7 +794,8 @@ async def remote_tare(self, actions: list[str] = []) -> dict[str, str | float]: dict[str, str | float]: Total value of active actions """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Active Actions Total"] action_dict = { @@ -803,7 +828,8 @@ async def restore_factory_settings(self) -> str: Confirmation of restoration """ if self._vers and self._vers < 7.00: - print("Error: Version earlier than 7v00") + # print("Error: Version earlier than 7v00") + raise VersionError("Version earlier than 7v00") return ret = await self._device._write_readline(f"{self._id}FACTORY RESTORE ALL") return ret @@ -825,7 +851,8 @@ async def user_data(self, slot: int = "", val: str = "") -> dict[str, str]: dict[str, str]: Value in called slot (either new or read) """ if self._vers and self._vers < 8.24: - print("Error: Version earlier than 8v24") + # print("Error: Version earlier than 8v24") + raise VersionError("Version earlier than 8v24") return if val == "": LABELS = ["Unit ID", "Curr. Value"] @@ -848,7 +875,8 @@ async def streaming_rate(self, interval: int = "") -> dict[str, str | float]: dict[str, str | float]: Interval of streaming rate """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Interval (ms)"] ret = await self._device._write_readline(f"{self._id}NCS {interval}") @@ -904,10 +932,12 @@ async def create_gas_mix( dict: Gas number of new mix and percentages and names of each constituent """ if self._vers and self._vers < 5.00: - print("Error: Version earlier than 5v00") + # print("Error: Version earlier than 5v00") + raise VersionError("Version earlier than 5v00") return if len(gas_dict) > 5: - print("Error: Too many input gases") + # print("Error: Too many input gases") + raise IndexError("Too many input gases") return gas_string = "" for x in gas_dict: @@ -949,7 +979,8 @@ async def delete_gas_mix(self, gasN: str = "") -> dict[str, float]: dict[str, float]: Deleted gas' number """ if self._vers and self._vers < 5.00: - print("Error: Version earlier than 5v00") + # print("Error: Version earlier than 5v00") + raise VersionError("Version earlier than 5v00") return LABELS = ["Unit ID", "Deleted Gas Num"] ret = await self._device._write_readline(f"{self._id}GD {gasN}") @@ -969,7 +1000,8 @@ async def query_gas_mix(self, gasN: int = "") -> dict[str, str]: dict[str, str]: Gas numbers and their percentages in mixture """ if self._vers and self._vers < 9.00: - print("Error: Version earlier than 9v00") + # print("Error: Version earlier than 9v00") + raise VersionError("Version earlier than 9v00") return LABELS = [ "Unit ID", @@ -1033,7 +1065,8 @@ async def config_totalizer( dict[str, str]: Configuration of totalizer """ if self._vers and self._vers < 10.00: - print("Error: Version earlier than 10v00") + # print("Error: Version earlier than 10v00") + raise VersionError("Version earlier than 10v00") return LABELS = [ "Unit ID", @@ -1068,7 +1101,8 @@ async def reset_totalizer(self, totalizer: int = 1) -> dict[str, str | float]: dict[str, str | float]: Dataframe with totalizer set to zero. """ if self._vers and self._vers < 8.00: - print("Error: Version earlier than 8v00") + # print("Error: Version earlier than 8v00") + raise VersionError("Version earlier than 8v00") return # Gets the format of the dataframe if it is not already known if self._df_format is None: @@ -1095,7 +1129,8 @@ async def reset_totalizer_peak(self, totalizer: int = 1) -> dict[str, str | floa dict[str, str | float]: Data frame """ if self._vers and self._vers < 8.00: - print("Error: Version earlier than 8v00") + # print("Error: Version earlier than 8v00") + raise VersionError("Version earlier than 8v00") return # Gets the format of the dataframe if it is not already known if self._df_format is None: @@ -1121,7 +1156,8 @@ async def save_totalizer(self, enable: bool = "") -> dict[str, str]: dict[str, str]: Says if totalizer is enabled or disabled """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Saving"] if isinstance(enable, bool): @@ -1162,23 +1198,20 @@ async def get_df_format(self) -> list[list[str]]: self._df_ret = df_ret return [df_stand, df_stand_ret] - async def get_units(self, measurement: dict) -> dict: + async def get_units(self, stats: list) -> dict: """Gets the units of the current dataframe format of the device. Args: - measurement (dict): Dictionary of statistics + measurement (list): List of measurements to get units for. Returns: list: Units of statistics in measurement """ - units = [None] * len(measurement) - for index in [idx for idx, s in enumerate(self._df_ret) if "decimal" in s]: - ret = await self._device._write_readline( - f"{self._id}DCU {statistics[list(measurement.keys())[index]]}" - ) - units[index] = ret.split()[2] - self._df_units = units - return units + units = [] + for stat in stats: + ret = await self._engineering_units(stat) + units.append(ret["Unit Label"]) + return dict(zip(stats, units)) async def set_units(self, stats: dict[str, str]) -> dict[str, str]: """Sets the units of the current dataframe format of the device. @@ -1190,9 +1223,8 @@ async def set_units(self, stats: dict[str, str]) -> dict[str, str]: dict[str, str]: Units of statistics in measurement """ for stat in stats: - await self._device._write_readline( - f"{self._id}DCU {statistics[stat]} {units[stats[stat]]}" - ) + ret = await self._engineering_units(stat, stats[stat]) + stats[stat] = ret["Unit Label"] return stats async def get(self, measurements: list[str] = ["@"]) -> dict[str, str | float]: @@ -1320,7 +1352,8 @@ async def setpoint( dict[str, str | float]: Reports setpoint with units """ if self._vers and self._vers < 9.00: - print("Error: Version earlier than 9v00, running Change Setpoint") + # print("Error: Version earlier than 9v00, running Change Setpoint") + warnings.warn("Version earlier than 9v00, running Change Setpoint") return await self.change_setpoint(value) LABELS = [ "Unit ID", @@ -1350,10 +1383,12 @@ async def _change_setpoint(self, value: float = "") -> dict[str, str | float]: dict[str, str | float]: Dataframe with new setpoint """ if self._vers and self._vers < 4.33: - print("Error: Version earlier than 4v33") + # print("Error: Version earlier than 4v33") + raise VersionError("Version earlier than 4v33") return if self._vers and self._vers >= 9.00: - print("Error: Version later than 9v00, running Query/Change Setpoint") + # print("Error: Version later than 9v00, running Query/Change Setpoint") + warnings.warn("Version later than 9v00, running Query/Change Setpoint") return await self.setpoint(value) if self._df_format is None: await self.get_df_format() @@ -1383,7 +1418,8 @@ async def batch( dict[str, str | float]: Reports totalizer, batch size, units. """ if self._vers and self._vers < 10.00: - print("Error: Version earlier than 10v00") + # print("Error: Version earlier than 10v00") + raise VersionError("Version earlier than 10v00") return LABELS = ["Unit ID", "Totalizer", "Batch Size", "Unit Code", "Unit Label"] ret = await self._device._write_readline( @@ -1427,7 +1463,8 @@ async def deadband_mode(self, mode: str = "") -> dict[str, str]: dict[str, str]: Reports mode """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Mode"] mode = ( @@ -1459,7 +1496,8 @@ async def loop_control_alg(self, algo: str = "") -> dict[str, str]: dict[str, str]: Reports algorithm """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Algorithm"] algo = ( @@ -1488,7 +1526,8 @@ async def loop_control_var(self, var: str = "") -> dict[str, str]: dict[str, str]: Reports new loop variable """ if self._vers and self._vers < 9.00: - print("Error: Version earlier than 9v00") + # print("Error: Version earlier than 9v00") + raise VersionError("Version earlier than 9v00") return LABELS = ["Unit ID", "Loop Var Val"] # If the user did not specify setpoint, assume Setpt @@ -1519,11 +1558,13 @@ async def loop_control_range( dict[str, str | float]: Reports loop variable, units, min, and max """ if self._vers and self._vers < 9.00: - print("Error: Version earlier than 9v00") + # print("Error: Version earlier than 9v00") + raise VersionError("Version earlier than 9v00") return LABELS = ["Unit ID", "Loop Var", "Min", "Max", "Unit Code", "Unit Label"] if self._vers and self._vers < 10.05: - print("Error: Version earlier than 9v00, max and min not supported") + # print("Error: Version earlier than 10v05, max and min not supported") + warnings.warn("Version earlier than 10v05") LABELS = LABELS[:-2] min = "" max = "" @@ -1553,7 +1594,8 @@ async def max_ramp_rate( dict[str, str | float]: Reports max ramp rate with unit """ if self._vers and self._vers < 7.11: - print("Error: Version earlier than 7v11") + # print("Error: Version earlier than 7v11") + raise VersionError("Version earlier than 7v11") return LABELS = ["Unit ID", "Max Ramp Rate", "Unit Code", "Time Code", "Units"] ret = await self._device._write_readline(f"{self._id}SR {max} {units[unit]}") @@ -1580,7 +1622,8 @@ async def pdf_gains( dict[str, str | float]: Reports P and D gains """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "P Gain", "D Gain"] if isinstance(save, bool): @@ -1613,7 +1656,8 @@ async def pd2i_gains( dict[str, str | float]: Reports P, I, and D gains """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "P Gain", "I Gain", "D Gain"] if isinstance(save, bool): @@ -1641,7 +1685,8 @@ async def power_up_setpoint(self, val: float = "") -> dict[str, str | float]: dict[str, str | float]: Dataframe with current (not power-up) setpoint """ if self._vers and self._vers < 8.04: - print("Error: Version earlier than 8v04") + # print("Error: Version earlier than 8v04") + raise VersionError("Version earlier than 8v04") return # Gets the format of the dataframe if it is not already known if self._df_format is None: @@ -1668,7 +1713,8 @@ async def overpressure(self, limit: float = "") -> dict[str, str | float]: dict[str, str | float]: Dataframe """ if self._vers and self._vers < 5.09: - print("Error: Version earlier than 5v09") + # print("Error: Version earlier than 5v09") + raise VersionError("Version earlier than 5v09") return # Gets the format of the dataframe if it is not already known if self._df_format is None: @@ -1697,7 +1743,8 @@ async def ramp( dict[str, str]: Dataframe """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Ramp Up", "Ramp Down", "Zero Ramp", "Power Up Ramp"] if isinstance(up, bool): @@ -1736,7 +1783,8 @@ async def setpoint_source(self, mode: str = "") -> dict[str, str]: dict[str, str]: Setpoint source mode """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Mode"] ret = await self._device._write_readline(f"{self._id}LSS {mode}") @@ -1766,7 +1814,8 @@ async def valve_offset( dict[str, str | float]: Offset values """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Init Offset (%)", "Closed Offset (%)"] if isinstance(save, bool): @@ -1791,7 +1840,8 @@ async def zero_pressure_control(self, enable: bool = "") -> dict[str, str]: dict[str, str]: If active control is active or not """ if self._vers and self._vers < 10.05: - print("Error: Version earlier than 10v05") + # print("Error: Version earlier than 10v05") + raise VersionError("Version earlier than 10v05") return LABELS = ["Unit ID", "Active Ctrl"] if isinstance(enable, bool): @@ -1836,7 +1886,8 @@ async def exhaust(self) -> dict[str, str | float]: dict[str, str | float]: Returns data frame with 'hold' status """ if self._vers and self._vers < 4.37: - print("Error: Version earlier than 4v37") + # print("Error: Version earlier than 4v37") + raise VersionError("Version earlier than 4v37") return if self._df_format is None: await self.get_df_format() @@ -1860,7 +1911,8 @@ async def hold_valves(self) -> dict[str, str | float]: dict[str, str | float]: Returns data frame with 'hold' status """ if self._vers and self._vers < 5.07: - print("Error: Version earlier than 5v07") + # print("Error: Version earlier than 5v07") + raise VersionError("Version earlier than 5v07") return if self._df_format is None: await self.get_df_format() @@ -1884,7 +1936,8 @@ async def hold_valves_closed(self) -> dict[str, str | float]: dict[str, str | float]: Returns data frame with 'hold' status """ if self._vers and self._vers < 5.07: - print("Error: Version earlier than 5v07") + # print("Error: Version earlier than 5v07") + raise VersionError("Version earlier than 5v07") return if self._df_format is None: await self.get_df_format() @@ -1908,7 +1961,8 @@ async def query_valve(self) -> dict[str, str | float]: dict[str, str | float]: Valve drive percentages """ if self._vers and self._vers < 8.18: - print("Error: Version earlier than 8v18") + # print("Error: Version earlier than 8v18") + raise VersionError("Version earlier than 8v18") return LABELS = ["Unit ID", "Upstream Valve", "Downstream Valve", "Exhaust Valve"] if isinstance(enable, bool): diff --git a/pyalicat/util.py b/pyalicat/util.py index 05262fe..3f8af46 100644 --- a/pyalicat/util.py +++ b/pyalicat/util.py @@ -23,7 +23,7 @@ def gas_correction(): pass -async def find_devices() -> list[str]: +async def find_devices() -> dict[str, Device]: """Finds all connected Alicat devices. Find all available serial ports using the `ls` command @@ -36,7 +36,7 @@ async def find_devices() -> list[str]: Returns: - list[str]: A list of all connected Alicat devices. + dict[str, Omron]: A dictionary of all connected Alicat devices. Port:Object """ # Get the list of available serial ports result = glob.glob("/dev/ttyUSB*") @@ -51,41 +51,24 @@ async def find_devices() -> list[str]: return devices -async def is_alicat_device(port: str, id: str = "A", **kwargs: Any) -> bool: +async def is_alicat_device( + port: str, id: str = "A", **kwargs: Any +) -> bool | tuple[bool, Device]: """Check if the given port is an Alicat device. Parameters: port (str): The name of the serial port. id (str): The device ID. Default is "A". + **kwargs: Additional keyword arguments. Returns: bool: True if the port is an Alicat device, False otherwise. + Device: The device object if the port is an Alicat device. """ - if port.startswith("/dev/"): - device = SerialDevice(port, **kwargs) - dev_info = await device._write_readall(f"{id}??M*") - if not dev_info: + try: + return (True, await Device.new_device(port, **kwargs)) + except ValueError: return False - INFO_KEYS = [ - "manufacturer", - "website", - "phone", - "website", - "model", - "serial", - "manufactured", - "calibrated", - "calibrated_by", - "software", - ] - dev_info = dict( - zip(INFO_KEYS, [i[re.search(r"M\d\d", i).end() + 1 :] for i in dev_info]) - ) - - for cls in all_subclasses(Device): - if cls.is_model(dev_info.get("model", "")): - return (True, cls.__name__) - return False def get_device_type(port):