diff --git a/pyalicat/comm.py b/pyalicat/comm.py index 1bcbe46..a20b296 100644 --- a/pyalicat/comm.py +++ b/pyalicat/comm.py @@ -34,7 +34,7 @@ async def _read(self, len: int) -> ByteString | None: Returns: ByteString: The serial communication. """ - pass + raise NotImplementedError @abstractmethod async def _write(self, command: str) -> None: @@ -43,12 +43,12 @@ async def _write(self, command: str) -> None: Args: command (str): The serial communication. """ - pass + raise NotImplementedError @abstractmethod async def close(self): """Closes the serial communication.""" - pass + raise NotImplementedError @abstractmethod async def _readline(self) -> str | None: @@ -57,7 +57,7 @@ async def _readline(self) -> str | None: Returns: str: The serial communication. """ - pass + raise NotImplementedError @abstractmethod async def _write_readline(self, command: str) -> str | None: @@ -69,7 +69,7 @@ async def _write_readline(self, command: str) -> str | None: Returns: str: The serial communication. """ - pass + raise NotImplementedError class SerialDevice(CommDevice): @@ -115,6 +115,10 @@ def __init__( # "rtscts": rtscts, } self.isOpen = False + if SerialStream is None: + raise ImportError( + "SerialStream could not be loaded. OS possibly not supported." + ) self.ser_devc = SerialStream(**self.serial_setup) async def _read(self, len: int | None = None) -> ByteString | None: diff --git a/pyalicat/daq.py b/pyalicat/daq.py index 13b9131..5ba79c8 100644 --- a/pyalicat/daq.py +++ b/pyalicat/daq.py @@ -348,10 +348,10 @@ async def logging( for dev in self.df: unique.update(self.df[dev]) await self.create_table(unique, conn) - start = time.time_ns() + start = time.perf_counter_ns() prev = start reps = 0 - while (time.time_ns() - start) / 1e9 <= duration: + while (time.perf_counter_ns() - start) / 1e9 <= duration: # Check if something in queue if not self.qin.empty(): comm = self.qin.get() @@ -362,20 +362,23 @@ async def logging( df = await comm[0](*comm[1:]) self.qout.put(df) # if not, continue logging - if time.time_ns() / 1e9 - (reps * 1 / rate + start / 1e9) >= 1 / rate: + if ( + time.perf_counter_ns() / 1e9 - (reps * 1 / rate + start / 1e9) + >= 1 / rate + ): # Check if something is in the queue print( - f"Difference between readings: {(time.time_ns() - prev) / 1e9} s" + f"Difference between readings: {(time.perf_counter_ns() - prev) / 1e9} s" ) # if ( - # abs(time.time_ns() / 1e9 - reps * 1 / rate - start / 1e9) + # abs(time.perf_counter_ns() / 1e9 - reps * 1 / rate - start / 1e9) # > 1.003 / rate # ): # warnings.warn("Warning! Acquisition rate is too high!") - time1 = time.time_ns() - prev = time.time_ns() + time1 = time.perf_counter_ns() + prev = time.perf_counter_ns() if write_async: - nurse_time = time.time_ns() + nurse_time = time.perf_counter_ns() # open_nursery async with create_task_group() as g: # insert_data from the previous iteration @@ -383,11 +386,11 @@ async def logging( # get g.start_soon(self.update_dict_log, self.Daq, self.qualities) else: - nurse_time = time.time_ns() + nurse_time = time.perf_counter_ns() # Get the data self.df = await self.Daq.get(self.qualities) # Write the data from this iteration - time2 = time.time_ns() + time2 = time.perf_counter_ns() rows = [] for dev in self.df: rows.append( @@ -407,23 +410,25 @@ async def logging( } ) print(f"Process took {(time2 - time1) / 1e6} ms") - time3 = time.time_ns() + time3 = time.perf_counter_ns() if not write_async: await self.insert_data( rows, conn ) # This takes a little bit (~8 ms). I think we should run this in a nursery with the next .get() call. That means that we will have to wait until the next loop to submit the data from the previous iteration. - time4 = time.time_ns() + time4 = time.perf_counter_ns() print(f"Insert took {(time4 - time3) / 1e6} ms") print( f"Time with nursery is {write_async}: {(time4 - nurse_time) / 1e6} ms" ) reps += 1 - while (time.time_ns() - start) / 1e9 / ( + while (time.perf_counter_ns() - start) / 1e9 / ( 1 / rate ) >= 1.00 * reps + 1: reps += 1 warnings.warn("Warning! Process takes too long!") - print(f"Total time: {(time.time_ns() - start) / 1e9} s with {reps} reps") + print( + f"Total time: {(time.perf_counter_ns() - start) / 1e9} s with {reps} reps" + ) def start_logging( self, write_async: bool = False, duration: float = "", rate: float = "" diff --git a/pyalicat/device.py b/pyalicat/device.py index de9d5c1..c913c41 100644 --- a/pyalicat/device.py +++ b/pyalicat/device.py @@ -7,7 +7,7 @@ dict: The dictionary of devices with the updated values. """ -import importlib +import importlib.resources import json import re import warnings @@ -25,8 +25,8 @@ class VersionError(Exception): pass -codes = importlib.resources.files("pyalicat").joinpath("codes.json") -with open(codes) as f: +codes_path = importlib.resources.files("pyalicat").joinpath("codes.json") +with open(codes_path) as f: codes = json.load(f) @@ -108,10 +108,10 @@ async def new_device(cls, port: str, id: str = "A", **kwargs: Any) -> Self: """ if port.startswith("/dev/"): device = SerialDevice(port, **kwargs) - dev_info = await device._write_readall(f"{id}??M*") - if not dev_info: + dev_info_ret = await device._write_readall(f"{id}??M*") + if not dev_info_ret: raise ValueError("No device found on port") - INFO_KEYS = [ + INFO_KEYS = ( "manufacturer", "website", "phone", @@ -122,11 +122,12 @@ async def new_device(cls, port: str, id: str = "A", **kwargs: Any) -> Self: "calibrated", "calibrated_by", "software", - ] + ) try: dev_info = dict( zip( - INFO_KEYS, [i[re.search(r"M\d\d", i).end() + 1 :] for i in dev_info] + INFO_KEYS, + [i[re.search(r"M\d\d", i).end() + 1 :] for i in dev_info_ret], ) ) except AttributeError: @@ -157,7 +158,7 @@ async def poll(self) -> dict[str, str | float]: return dict(zip(self._df_format, df)) async def request( - self, stats: list[str] = [], time: int = 1 + self, stats: list[str] | None = None, time: int = 1 ) -> dict[str, str | float]: """Gets requested measurements averaged over specified time. @@ -171,6 +172,8 @@ async def request( Returns: dict[str, str | float]: The requested statistics. """ + if stats is None: + stats = [] if len(stats) > 13: raise IndexError("Too many statistics requested") # stats = stats[:13] @@ -351,7 +354,7 @@ async def tare_gauge_P(self) -> dict[str, str | float]: return dict(zip(self._df_format, df)) async def auto_tare( - self, enable: bool = "", delay: float = "" + self, enable: bool | None = None, delay: float | None = None ) -> dict[str, str | float]: """Gets/Sets if the controller auto tares. @@ -371,18 +374,23 @@ async def auto_tare( if self._vers and self._vers < 10.05: # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Auto-tare", "Delay_(s)"] - if isinstance(enable, bool): - enable = "1" if enable else "0" - ret = await self._device._write_readline(f"{self._id}ZCA {enable} {delay}") + LABELS = ("Unit_ID", "Auto-tare", "Delay_(s)") + enable_str = ( + "1" if enable else "0" if enable is not None else "" + ) # Converts boolean to string or empty string if None + delay_str = ( + f"{delay:.1f}" if delay is not None else "" + ) # Converts float to string or empty string if None + ret = await self._device._write_readline( + f"{self._id}ZCA {enable_str} {delay_str}" + ) ret = ret.split() output_mapping = {"1": "Enabled", "0": "Disabled"} ret[1] = output_mapping.get(str(ret[1]), ret[1]) ret[2] = float(ret[2]) return dict(zip(LABELS, ret)) - async def configure_data_frame(self, format: int = "") -> dict[str, str | float]: + async def configure_data_frame(self, format: int) -> dict[str, str | float]: """Sets data frame's format. Example: @@ -399,10 +407,7 @@ async def configure_data_frame(self, format: int = "") -> dict[str, str | float] dict[str, str | float]: Data Frame in new format """ if self._vers and self._vers < 6.00: - # print("Error: Version earlier than 6v00") raise VersionError("Version earlier than 6v00") - return - # Gets the format of the dataframe if it is not already known if self._df_format is None: await self.get_df_format() ret = await self._device._write_readline(f"{self._id}FDF {format}") @@ -413,10 +418,10 @@ async def configure_data_frame(self, format: int = "") -> dict[str, str | float] async def _engineering_units( self, - statistic_value: str = "", - unit: str = "", - group: bool = "", - override: bool = "", + statistic_value: str, + unit: str | None = None, + group: bool | None = None, + override: bool | None = None, ) -> dict[str, str]: """Gets/Sets units for desired statistics. @@ -436,24 +441,24 @@ async def _engineering_units( dict[str, str]: Responds with unit """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Unit_Code", "Unit_Label"] - if isinstance(group, bool): - group = "1" if group else "0" - if isinstance(override, bool): - override = "1" if override else "" + LABELS = ("Unit_ID", "Unit_Code", "Unit_Label") + group_str = ( + "1" if group else "0" if group is not None else "" + ) # Converts boolean to string or empty string if None + override_str = ( + "1" if override else "0" if override is not None else "" + ) # Converts boolean to string or empty string if None ret = await self._device._write_readline( - f"{self._id}DCU {statistics[statistic_value]} {units[unit]} {group} {override}" + f"{self._id}DCU {statistics[statistic_value]} {units[unit]} {group_str} {override_str}" ) ret = ret.split() return dict(zip(LABELS, ret)) async def flow_press_avg( self, - stat_val: str, - avg_time: int = "", + stat_val: str | int, + avg_time: int | None = None, ) -> dict[str, str | float]: """Gets/Set the length of time a statistic is averaged over. @@ -468,21 +473,21 @@ async def flow_press_avg( dict[str, str | float]: Responds value of queried average and avg time const """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Value", "Time_Const"] + LABELS = ("Unit_ID", "Value", "Time_Const") if stat_val.upper() == "ALL": stat_val = 1 else: - statistics[stat_val] - ret = await self._device._write_readline(f"{self._id}DCA {stat_val} {avg_time}") + stat_val = statistics[stat_val] + ret = await self._device._write_readline( + f"{self._id}DCA {stat_val} {avg_time or ""}" + ) ret = ret.split() ret[1] = int(ret[1]) return dict(zip(LABELS, ret)) async def full_scale_val( - self, stat_val: str = "", unit: str = "" + self, stat_val: str | int, unit: str | None = None ) -> dict[str, str | float]: """Gets measurement range of given statistic. @@ -497,10 +502,10 @@ async def full_scale_val( dict[str, str | float]: Responds max value of statistic and units """ if self._vers and self._vers < 6.00: - # print("Error: Version earlier than 6v00") raise VersionError("Version earlier than 6v00") - return - LABELS = ["Unit_ID", "Max_Value", "Unit_Code", "Unit_Label"] + LABELS = ("Unit_ID", "Max_Value", "Unit_Code", "Unit_Label") + if unit is None: + unit = "" ret = await self._device._write_readline( f"{self._id}FPF {statistics[stat_val]} {units[unit]}" ) @@ -508,7 +513,7 @@ async def full_scale_val( ret[1] = float(ret[1]) return dict(zip(LABELS, ret)) - async def power_up_tare(self, enable: bool = "") -> dict[str, str]: + async def power_up_tare(self, enable: bool | None = None) -> dict[str, str]: """Gets/Sets if device tares on power-up. Example: @@ -521,19 +526,16 @@ async def power_up_tare(self, enable: bool = "") -> dict[str, str]: dict[str, str]: If tare is enabled """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return LABELS = ["Unit_ID", "Power-Up_Tare"] - if isinstance(enable, bool): - enable = "1" if enable else "0" - ret = await self._device._write_readline(f"{self._id}ZCP {enable}") + enable_str = "1" if enable else "0" if enable is not None else "" + ret = await self._device._write_readline(f"{self._id}ZCP {enable_str}") ret = ret.split() output_mapping = {"1": "Enabled", "0": "Disabled"} ret[1] = output_mapping.get(str(ret[1]), ret[1]) return dict(zip(LABELS, ret)) - async def data_frame(self) -> str: + async def data_frame(self) -> list[str]: """Gets info about current data frame. Example: @@ -543,14 +545,12 @@ async def data_frame(self) -> str: str: table that outlines data frame format """ if self._vers and self._vers < 6.00: - # print("Error: Version earlier than 6v00") raise VersionError("Version earlier than 6v00") - return ret = await self._device._write_readall(f"{self._id}??D*") return ret async def stp_press( - self, stp: str = "S", unit: str = "", press: float = "" + self, stp: str = "S", unit: str | None = None, press: float | None = None ) -> dict[str, str | float]: """Gets/Sets standard or normal pressure reference point. @@ -568,23 +568,23 @@ async def stp_press( dict[str, str | float]: Current pressure reference point and units """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Curr_Press_Ref", "Unit_Code", "Unit_Label"] - if stp.upper == "NTP": + LABELS = ("Unit_ID", "Curr_Press_Ref", "Unit_Code", "Unit_Label") + if stp.upper() == "NTP": stp = "N" if stp.upper() != "N": stp = "S" + if unit is None: + unit = "" ret = await self._device._write_readline( - f"{self._id}DCFRP {stp.upper()} {str(units[unit])} {str(press)}" + f"{self._id}DCFRP {stp.upper()} {str(units[unit])} {press or ""}" ) ret = ret.split() ret[1] = float(ret[1]) return dict(zip(LABELS, ret)) async def stp_temp( - self, stp: str = "S", unit: str = "", temp: float = "" + self, stp: str = "S", unit: str | None = None, temp: float | None = None ) -> dict[str, str | float]: """Gets/Sets standard or normal temperature reference point. @@ -602,22 +602,22 @@ async def stp_temp( dict[str, str | float]: Current temperature reference point and units """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Curr_Temp_Ref", "Unit_Code", "Unit_Label"] - if stp.upper == "NTP": + LABELS = ("Unit_ID", "Curr_Temp_Ref", "Unit_Code", "Unit_Label") + if stp.upper() == "NTP": stp = "N" if stp.upper() != "N": stp = "S" + if unit is None: + unit = "" ret = await self._device._write_readline( - f"{self._id}DCFRT {stp.upper()} {str(units[unit])} {str(temp)}" + f"{self._id}DCFRT {stp.upper()} {str(units[unit])} {temp or ""}" ) ret = ret.split() ret[1] = float(ret[1]) return dict(zip(LABELS, ret)) - async def zero_band(self, zb: float = "") -> dict[str, str | float]: + async def zero_band(self, zb: float | int | None = None) -> dict[str, str | float]: """Gets/Sets the zero band of the device. Example: @@ -633,20 +633,17 @@ async def zero_band(self, zb: float = "") -> dict[str, str | float]: dict[str, str | float]: Returns current zero band as percent of full scale """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Zero_Band_(%)"] - if isinstance(zb, (float, int)): - zb = f"0 {zb}" - ret = await self._device._write_readline(f"{self._id}DCZ {zb}") + LABELS = ("Unit_ID", "Zero_Band_(%)") + zb_str = f"0 {zb:.2f}" if zb is not None else "" + ret = await self._device._write_readline(f"{self._id}DCZ {zb_str}") ret = ret.split() ret.pop(1) ret[1] = float(ret[1]) return dict(zip(LABELS, ret)) async def analog_out_source( - self, primary: str = "0", val: str = "", unit: str = "" + self, primary: str | int = 0, val: str | None = None, unit: str | None = None ) -> dict[str, str]: """Gets/Sets the source of the analog output. @@ -654,7 +651,7 @@ async def analog_out_source( df = run(dev.analog_out_source, "PRIMARY", "Mass_Flow", "SCCM") Args: - primary (str): Primary or secondary analog output + primary (int): Primary or secondary analog output. 0 for primary, 1 for secondary val (str): Statistic being tracked - 'MAX' to fix min possible output - 'MIN' to fix max possible output @@ -665,21 +662,22 @@ async def analog_out_source( dict[str, str]: Statistic and units """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Value", "Unit_Code", "Unit_Label"] - if primary.upper() == "SECONDARY" or primary.upper() == "2ND": - primary = "1" - if val != "": + LABELS = ("Unit_ID", "Value", "Unit_Code", "Unit_Label") + if isinstance(primary, str): + if primary.upper() == "SECONDARY" or primary.upper() == "2ND": + primary = "1" + if val is not None: if val.upper() == "MAX": val = "0" elif val.upper() == "MIN": val = "1" else: val = str(statistics[val]) + if unit is not None: + unit = str(units[unit]) ret = await self._device._write_readline( - f"{self._id}ASOCV {primary} {val} {unit}" + f"{self._id}ASOCV {primary} {val or ""} {unit or ""}" ) ret = ret.split() if ret[1] == "0": @@ -693,7 +691,7 @@ async def analog_out_source( break return dict(zip(LABELS, ret)) - async def baud(self, new_baud: int = "") -> dict[str, str | float]: + async def baud(self, new_baud: int | None = None) -> dict[str, str | float]: """Gets/Sets the baud rate of the device. Example: @@ -713,19 +711,17 @@ async def baud(self, new_baud: int = "") -> dict[str, str | float]: dict[str, str | float]: Baud rate, either current or new """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Baud"] - VALID_BAUD_RATES = [2400, 4800, 9600, 19200, 38400, 57600, 115200] - if new_baud != "" and int(new_baud) not in VALID_BAUD_RATES: - new_baud = "" - ret = await self._device._write_readline(f"{self._id}NCB {new_baud}") + LABELS = ("Unit_ID", "Baud") + VALID_BAUD_RATES = (2400, 4800, 9600, 19200, 38400, 57600, 115200) + if new_baud is not None and int(new_baud) not in VALID_BAUD_RATES: + raise ValueError("Invalid baud rate") + ret = await self._device._write_readline(f"{self._id}NCB {new_baud or ""}") ret = ret.split() ret[1] = int(ret[1]) return dict(zip(LABELS, ret)) - async def blink(self, dur: int = "") -> dict[str, str]: + async def blink(self, dur: int | None = None) -> dict[str, str]: """Blinks the device. Gets the blinking state. Example: @@ -740,17 +736,15 @@ async def blink(self, dur: int = "") -> dict[str, str]: dict[str, str]: If the display is currently blinking """ if self._vers and self._vers < 8.28: - # print("Error: Version earlier than 8v28") raise VersionError("Version earlier than 8v28") - return - LABELS = ["Unit_ID", "Flashing?"] - ret = await self._device._write_readline(f"{self._id}FFP {dur}") + LABELS = ("Unit_ID", "Flashing?") + ret = await self._device._write_readline(f"{self._id}FFP {dur or ""}") ret = ret.split() output_mapping = {"1": "Yes", "0": "No"} ret[1] = output_mapping.get(str(ret[1]), ret[1]) return dict(zip(LABELS, ret)) - async def change_unit_id(self, new_id: str = "") -> None: + async def change_unit_id(self, new_id: str) -> None: """Sets the unit ID of the device. Example: @@ -762,6 +756,8 @@ async def change_unit_id(self, new_id: str = "") -> None: Args: new_id (str): New ID for the device. A-Z accepted """ + if new_id.upper() not in "ABCDEFGHIJKLMNOPQRSTUVWXYZ": + raise ValueError("Invalid ID") await self._device._write(f"{self._id}@ {new_id}") self._id = new_id return @@ -775,7 +771,7 @@ async def firmware_version(self) -> dict[str, str]: Returns: dict[str, str]: Current firmware vesion and its date of creation """ - LABELS = ["Unit_ID", "Vers", "Creation_Date"] + LABELS = ("Unit_ID", "Vers", "Creation_Date") ret = await self._device._write_readline(f"{self._id}VE") ret = ret.split() ret[2] = " ".join(ret[2:]) @@ -808,7 +804,9 @@ async def manufacturing_info(self) -> list[str]: ret = await self._device._write_readall(f"{self._id}??M*") return ret - async def remote_tare(self, actions: list[str] = []) -> dict[str, str | float]: + async def remote_tare( + self, actions: list[str] | None = None + ) -> dict[str, str | float]: """Gets/Sets the remote tare value. Example: @@ -827,10 +825,10 @@ async def remote_tare(self, actions: list[str] = []) -> dict[str, str | float]: dict[str, str | float]: Total value of active actions """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Active_Actions_Total"] + LABELS = ("Unit_ID", "Active_Actions_Total") + if actions is None: + actions = [] action_dict = { "Primary_Press": 1, "Secondary_Press": 2, @@ -838,10 +836,10 @@ async def remote_tare(self, actions: list[str] = []) -> dict[str, str | float]: "Reset_Totalizer_1": 8, "Reset_Totalizer_2": 16, } - act_tot = sum([action_dict.get(act, 0) for act in actions]) + act_tot: int | None = sum([action_dict.get(act, 0) for act in actions]) if not actions: - act_tot = "" - ret = await self._device._write_readline(f"{self._id}ASRCA {act_tot}") + act_tot = None + ret = await self._device._write_readline(f"{self._id}ASRCA {act_tot or ""}") ret = ret.split() ret[1] = int(ret[1]) return dict(zip(LABELS, ret)) @@ -861,13 +859,11 @@ async def restore_factory_settings(self) -> str: Confirmation of restoration """ if self._vers and self._vers < 7.00: - # print("Error: Version earlier than 7v00") raise VersionError("Version earlier than 7v00") - return ret = await self._device._write_readline(f"{self._id}FACTORY RESTORE ALL") return ret - async def user_data(self, slot: int = "", val: str = "") -> dict[str, str]: + async def user_data(self, slot: int, val: str | None = None) -> dict[str, str]: """Gets/Sets user data in slot. Gets the user data from the string is slot. @@ -884,18 +880,18 @@ async def user_data(self, slot: int = "", val: str = "") -> dict[str, str]: dict[str, str]: Value in called slot (either new or read) """ if self._vers and self._vers < 8.24: - # print("Error: Version earlier than 8v24") raise VersionError("Version earlier than 8v24") - return - if val == "": - LABELS = ["Unit_ID", "Curr_Value"] + if val is None: + LABELS = ("Unit_ID", "Curr_Value") else: - LABELS = ["Unit_ID", "New_Value"] - ret = await self._device._write_readline(f"{self._id}UD {slot} {val}") + LABELS = ("Unit_ID", "New_Value") + ret = await self._device._write_readline(f"{self._id}UD {slot} {val or ""}") ret = ret.split() return dict(zip(LABELS, ret)) - async def streaming_rate(self, interval: int = "") -> dict[str, str | float]: + async def streaming_rate( + self, interval: int | None = None + ) -> dict[str, str | float]: """Gets/Sets the streaming rate of the device. Example: @@ -908,11 +904,9 @@ async def streaming_rate(self, interval: int = "") -> dict[str, str | float]: dict[str, str | float]: Interval of streaming rate """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Interval_(ms)"] - ret = await self._device._write_readline(f"{self._id}NCS {interval}") + LABELS = ("Unit_ID", "Interval_(ms)") + ret = await self._device._write_readline(f"{self._id}NCS {interval or ""}") ret = ret.split() ret[1] = int(ret[1]) return dict(zip(LABELS, ret)) @@ -936,7 +930,7 @@ async def unlock_display(self) -> dict[str, str | float]: return dict(zip(self._df_format, df)) async def create_gas_mix( - self, name: str = "", number: int = "", gas_dict: dict[str, float] = {} + self, name: str, number: int, gas_dict: dict[str, float] = {} ) -> dict[str, str | float]: """Sets custom gas mixture. @@ -954,29 +948,18 @@ async def create_gas_mix( gas_dict (dict[str, float]): Gas name : Percentage of Mixture. Maximum of 5 gases. Percent is Molar percent up to 2 decimals. Total percentages must sum to 100.00% - gas[n]P (float): Molar percent up to 2 decimals. Total percentages must sum to 100.00% - - - n is the number (1 to 5) of the gas in the function call - gas[n]N (str): Name of gas in mixture - - - n is the number (1 to 5) of the gas in the function call - Returns: dict: Gas number of new mix and percentages and names of each constituent """ if self._vers and self._vers < 5.00: - # print("Error: Version earlier than 5v00") raise VersionError("Version earlier than 5v00") - return if len(gas_dict) > 5: - # print("Error: Too many input gases") raise IndexError("Too many input gases") - return gas_string = "" for x in gas_dict: gas_string += f" {gas_dict[x]} {gases[x]}" - LABELS = [ + LABELS = ( "Unit_ID", "Gas_Num", "Gas1_Name", @@ -989,9 +972,9 @@ async def create_gas_mix( "Gas4_Perc", "Gas5_Name", "Gas5_Perc", - ] + ) ret = await self._device._write_readline( - f"{self._id}GM {name} {number}{gas_string}" + f"{self._id}GM {name} {number} {gas_string}" ) ret = ret.split() return dict(zip(LABELS, ret)) @@ -1012,15 +995,13 @@ async def delete_gas_mix(self, gasN: str = "") -> dict[str, float]: dict[str, float]: Deleted gas' number """ if self._vers and self._vers < 5.00: - # print("Error: Version earlier than 5v00") raise VersionError("Version earlier than 5v00") - return - LABELS = ["Unit_ID", "Deleted_Gas_Num"] + LABELS = ("Unit_ID", "Deleted_Gas_Num") ret = await self._device._write_readline(f"{self._id}GD {gasN}") ret = ret.split() return dict(zip(LABELS, ret)) - async def query_gas_mix(self, gasN: int = "") -> dict[str, str]: + async def query_gas_mix(self, gasN: int) -> dict[str, str]: """Gets percentages of gases in mixture. Example: @@ -1033,10 +1014,8 @@ async def query_gas_mix(self, gasN: int = "") -> dict[str, str]: dict[str, str]: Gas numbers and their percentages in mixture """ if self._vers and self._vers < 9.00: - # print("Error: Version earlier than 9v00") raise VersionError("Version earlier than 9v00") - return - LABELS = [ + LABELS = ( "Unit_ID", "Gas_Num", "Gas1_Name", @@ -1049,7 +1028,7 @@ async def query_gas_mix(self, gasN: int = "") -> dict[str, str]: "Gas4_Perc", "Gas5_Name", "Gas5_Perc", - ] + ) ret = await self._device._write_readall(f"{self._id}GC {gasN}") ret = ret[0].replace("=", " ").split() for i in range(len(ret)): @@ -1063,11 +1042,11 @@ async def query_gas_mix(self, gasN: int = "") -> dict[str, str]: async def config_totalizer( self, totalizer: int = 1, - flow_stat_val: str = "", - mode: int = "", - limit_mode: int = "", - num: int = "", - dec: int = "", + flow_stat_val: str | None = None, + mode: int | None = None, + limit_mode: int | None = None, + num: int | None = None, + dec: int | None = None, ) -> dict[str, str]: """Enables/Disables and Configures totalizer. @@ -1098,10 +1077,8 @@ async def config_totalizer( dict[str, str]: Configuration of totalizer """ if self._vers and self._vers < 10.00: - # print("Error: Version earlier than 10v00") raise VersionError("Version earlier than 10v00") - return - LABELS = [ + LABELS = ( "Unit_ID", "Totalizer", "Flow_Stat_Val", @@ -1109,11 +1086,11 @@ async def config_totalizer( "Limit_Mode", "Num_Digits", "Dec_Place", - ] + ) if flow_stat_val != "": flow_stat_val = statistics.get(flow_stat_val, -1) ret = await self._device._write_readline( - f"{self._id}TC {totalizer} {flow_stat_val} {mode} {limit_mode} {num} {dec}" + f"{self._id}TC {totalizer} {flow_stat_val or ""} {mode or ""} {limit_mode or ""} {num or ""} {dec or ""}" ) ret = ret.split() return dict(zip(LABELS, ret)) # Need to convert codes to text @@ -1134,10 +1111,7 @@ async def reset_totalizer(self, totalizer: int = 1) -> dict[str, str | float]: dict[str, str | float]: Dataframe with totalizer set to zero. """ if self._vers and self._vers < 8.00: - # print("Error: Version earlier than 8v00") raise VersionError("Version earlier than 8v00") - return - # Gets the format of the dataframe if it is not already known if self._df_format is None: await self.get_df_format() ret = await self._device._write_readline(f"{self._id}T {totalizer}") @@ -1162,10 +1136,7 @@ async def reset_totalizer_peak(self, totalizer: int = 1) -> dict[str, str | floa dict[str, str | float]: Data frame """ if self._vers and self._vers < 8.00: - # print("Error: Version earlier than 8v00") raise VersionError("Version earlier than 8v00") - return - # Gets the format of the dataframe if it is not already known if self._df_format is None: await self.get_df_format() ret = await self._device._write_readline(f"{self._id}TP {totalizer}") @@ -1174,8 +1145,8 @@ async def reset_totalizer_peak(self, totalizer: int = 1) -> dict[str, str | floa df[index] = float(df[index]) return dict(zip(self._df_format, df)) - async def save_totalizer(self, enable: bool = "") -> dict[str, str]: - """Enables/disables saving totalizer values. + async def save_totalizer(self, enable: bool | None = None) -> dict[str, str]: + """Enables/disables saving totalizer values at regular intervals. If enabled, restore last saved totalizer on power-up. @@ -1189,13 +1160,10 @@ async def save_totalizer(self, enable: bool = "") -> dict[str, str]: dict[str, str]: Says if totalizer is enabled or disabled """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Saving"] - if isinstance(enable, bool): - enable = "1" if enable else "0" - ret = await self._device._write_readline(f"{self._id}TCR {enable}") + LABELS = ("Unit_ID", "Saving") + enable_str = "1" if enable else "0" if enable is not None else "" + ret = await self._device._write_readline(f"{self._id}TCR {enable_str}") ret = ret.split() output_mapping = {"1": "Enabled", "0": "Disabled"} ret[1] = output_mapping.get(str(ret[1]), ret[1]) @@ -1232,7 +1200,7 @@ async def get_df_format(self) -> list[list[str]]: self._df_ret = df_ret return [df_stand, df_stand_ret] - async def get_units(self, stats: list) -> dict: + async def get_units(self, stats: list[str]) -> dict[str, str]: """Gets the units of the current dataframe format of the device. Args: @@ -1265,7 +1233,7 @@ async def get(self, measurements: list[str] = ["@"]) -> dict[str, str | float]: """Gets the value of a measurement from the device. Args: - measurements (list[str]): List of measurements to get + measurements (list[str]): List of measurements to get. If not specified, gets all measurements in standard dataframe. Returns: dict[str, str | float]: Dictionary of measurements and their values @@ -1273,8 +1241,6 @@ async def get(self, measurements: list[str] = ["@"]) -> dict[str, str | float]: resp = {} flag = 0 reqs = [] - if isinstance(measurements, str): - measurements = measurements.split() # Request if not measurements: measurements = ["@"] @@ -1323,8 +1289,8 @@ def is_model(cls, model: str) -> bool: Returns: bool: True if model matches. """ - cls._models = [" M-", " MS-", " MQ-", " MW-"] - return any([bool(re.search(i, model)) for i in cls._models]) + models = [" M-", " MS-", " MQ-", " MW-"] + return any([bool(re.search(i, model)) for i in models]) def __init__( self, device: SerialDevice, dev_info: dict, id: str = "A", **kwargs: Any @@ -1353,8 +1319,8 @@ def is_model(cls, model: str) -> bool: Returns: bool: True if model matches. """ - cls._models = [" MC-", " MCS-", " MCQ-", " MCW-"] - return any([bool(re.search(i, model)) for i in cls._models]) + models = [" MC-", " MCS-", " MCQ-", " MCW-"] + return any([bool(re.search(i, model)) for i in models]) def __init__( self, device: SerialDevice, dev_info: dict, id: str = "A", **kwargs: Any @@ -1370,7 +1336,7 @@ def __init__( super().__init__(device, dev_info, id, **kwargs) async def setpoint( - self, value: float = "", unit: str = "" + self, value: float | None = None, unit: str | None = None ) -> dict[str, str | float]: """Gets/Sets the setpoint of the device. @@ -1388,24 +1354,30 @@ async def setpoint( dict[str, str | float]: Reports setpoint with units """ if self._vers and self._vers < 9.00: - # print("Error: Version earlier than 9v00, running Change Setpoint") - warnings.warn("Version earlier than 9v00, running Change Setpoint") - return await self.change_setpoint(value) - LABELS = [ + if value is None: + raise ValueError("Query setpoint is not supported for this software") + else: + warnings.warn("Version earlier than 9v00, running Change Setpoint") + return await self.change_setpoint(value) + LABELS = ( "Unit_ID", "Curr_Setpt", "Requested_Setpt", "Unit_Code", "Unit_Label", - ] - ret = await self._device._write_readline(f"{self._id}LS {value} {units[unit]}") + ) + ret = await self._device._write_readline( + f"{self._id}LS {value or ""} {units[unit]}" + ) if ret == "?": raise ValueError("Invalid setpoint value") ret = ret.split() ret[1], ret[2] = float(ret[1]), float(ret[2]) return dict(zip(LABELS, ret)) - async def _change_setpoint(self, value: float = "") -> dict[str, str | float]: + async def _change_setpoint( + self, value: float | None = None + ) -> dict[str, str | float]: """Changes the setpoint of the device. Example: @@ -1421,11 +1393,8 @@ async def _change_setpoint(self, value: float = "") -> dict[str, str | float]: dict[str, str | float]: Dataframe with new setpoint """ if self._vers and self._vers < 4.33: - # print("Error: Version earlier than 4v33") raise VersionError("Version earlier than 4v33") - return if self._vers and self._vers >= 9.00: - # print("Error: Version later than 9v00, running Query/Change Setpoint") warnings.warn("Version later than 9v00, running Query/Change Setpoint") return await self.setpoint(value) if self._df_format is None: @@ -1437,7 +1406,7 @@ async def _change_setpoint(self, value: float = "") -> dict[str, str | float]: return dict(zip(self._df_format, df)) async def batch( - self, totalizer: int = 1, batch_vol: int = "", unit: str = "" + self, totalizer: int = 1, batch_vol: int | None = None, unit: str | None = None ) -> dict[str, str | float]: """Directs controller to flow a set amount then close the valve. @@ -1456,17 +1425,16 @@ async def batch( dict[str, str | float]: Reports totalizer, batch size, units. """ if self._vers and self._vers < 10.00: - # print("Error: Version earlier than 10v00") raise VersionError("Version earlier than 10v00") - return - LABELS = ["Unit_ID", "Totalizer", "Batch_Size", "Unit_Code", "Unit_Label"] + LABELS = ("Unit_ID", "Totalizer", "Batch_Size", "Unit_Code", "Unit_Label") + unit_str = units[unit] if unit else "" ret = await self._device._write_readline( - f"{self._id}TB {totalizer} {batch_vol} {units[unit]}" + f"{self._id}TB {totalizer} {batch_vol or ""} {unit_str}" ) return dict(zip(LABELS, ret.split())) async def deadband_limit( - self, save: bool = "", limit: float = "" + self, save: bool | None = None, limit: float | None = None ) -> dict[str, str | float]: """Gets/Sets the range the controller allows for drift around setpoint. @@ -1480,15 +1448,16 @@ async def deadband_limit( Returns: dict[str, str | float]: Reports deadband with units """ - LABELS = ["Unit_ID", "Deadband", "Unit_Code", "Unit_Label"] - if isinstance(save, bool): - save = "1" if save else "0" - ret = await self._device._write_readline(f"{self._id}LCDB {save} {limit}") + LABELS = ("Unit_ID", "Deadband", "Unit_Code", "Unit_Label") + save_str = "1" if save else "0" if save is not None else "" + ret = await self._device._write_readline( + f"{self._id}LCDB {save_str} {limit or ""}" + ) ret = ret.split() ret[1] = float(ret[1]) return dict(zip(LABELS, ret)) - async def deadband_mode(self, mode: str = "") -> dict[str, str]: + async def deadband_mode(self, mode: str | None = None) -> dict[str, str]: """Gets/Sets the reaction the controller has for values around setpoint. Example: @@ -1501,24 +1470,23 @@ async def deadband_mode(self, mode: str = "") -> dict[str, str]: dict[str, str]: Reports mode """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Mode"] - mode = ( - "1" - if mode.upper() in ["HOLD", "CURRENT"] - else "2" - if mode.upper() in ["CLOSE"] - else mode - ) - ret = await self._device._write_readline(f"{self._id}LCDM {mode}") + LABELS = ("Unit_ID", "Mode") + if mode: + mode = ( + "1" + if mode.upper() in ["HOLD", "CURRENT"] + else "2" + if mode.upper() in ["CLOSE"] + else mode + ) + ret = await self._device._write_readline(f"{self._id}LCDM {mode or ""}") ret = ret.split() output_mapping = {"1": "Hold valve at current", "2": "Close valve"} ret[1] = output_mapping.get(str(ret[1]), ret[1]) return dict(zip(LABELS, ret)) - async def loop_control_alg(self, algo: str = "") -> dict[str, str]: + async def loop_control_alg(self, algo: str | None = None) -> dict[str, str]: """Gets/Sets the control algorithm the controller uses. - algorithm 1 = PD/PDF @@ -1534,24 +1502,23 @@ async def loop_control_alg(self, algo: str = "") -> dict[str, str]: dict[str, str]: Reports algorithm """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Algorithm"] - algo = ( - "2" - if algo.upper() in ["PD2I"] - else "1" - if algo.upper() in ["PD", "PDF", "PD/PDF"] - else algo - ) - ret = await self._device._write_readline(f"{self._id}LCA {algo}") + LABELS = ("Unit_ID", "Algorithm") + if algo: + algo = ( + "2" + if algo.upper() in ["PD2I"] + else "1" + if algo.upper() in ["PD", "PDF", "PD/PDF"] + else algo + ) + ret = await self._device._write_readline(f"{self._id}LCA {algo or ""}") ret = ret.split() algorithm_mapping = {"1": "PD/PDF", "2": "PD2I"} ret[1] = algorithm_mapping.get(str(ret[1]), ret[1]) return dict(zip(LABELS, ret)) - async def loop_control_var(self, var: str = "") -> dict[str, str]: + async def loop_control_var(self, var: str) -> dict[str, str]: """Sets the statistic the setpoint controls. Example: @@ -1564,10 +1531,8 @@ async def loop_control_var(self, var: str = "") -> dict[str, str]: dict[str, str]: Reports new loop variable """ if self._vers and self._vers < 9.00: - # print("Error: Version earlier than 9v00") raise VersionError("Version earlier than 9v00") - return - LABELS = ["Unit_ID", "Loop_Var_Val"] + LABELS = ("Unit_ID", "Loop_Var_Val") # If the user did not specify setpoint, assume Setpt if var and var[-6:] != "_Setpt": var += "_Setpt" @@ -1596,12 +1561,9 @@ async def loop_control_range( dict[str, str | float]: Reports loop variable, units, min, and max """ if self._vers and self._vers < 9.00: - # print("Error: Version earlier than 9v00") raise VersionError("Version earlier than 9v00") - return LABELS = ["Unit_ID", "Loop_Var", "Min", "Max", "Unit_Code", "Unit_Label"] if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05, max and min not supported") warnings.warn("Version earlier than 10v05") LABELS = LABELS[:-2] min = "" @@ -1632,10 +1594,8 @@ async def max_ramp_rate( dict[str, str | float]: Reports max ramp rate with unit """ if self._vers and self._vers < 7.11: - # print("Error: Version earlier than 7v11") raise VersionError("Version earlier than 7v11") - return - LABELS = ["Unit_ID", "Max_Ramp_Rate", "Unit_Code", "Time_Code", "Units"] + LABELS = ("Unit_ID", "Max_Ramp_Rate", "Unit_Code", "Time_Code", "Units") ret = await self._device._write_readline(f"{self._id}SR {max} {units[unit]}") ret = ret.split() ret[1] = float(ret[1]) @@ -1660,10 +1620,8 @@ async def pdf_gains( dict[str, str | float]: Reports P and D gains """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "P_Gain", "D_Gain"] + LABELS = ("Unit_ID", "P_Gain", "D_Gain") if isinstance(save, bool): save = "1" if save else "0" ret = await self._device._write_readline( @@ -1694,10 +1652,8 @@ async def pd2i_gains( dict[str, str | float]: Reports P, I, and D gains """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "P_Gain", "I_Gain", "D_Gain"] + LABELS = ("Unit_ID", "P_Gain", "I_Gain", "D_Gain") if isinstance(save, bool): save = "1" if save else "0" ret = await self._device._write_readline( @@ -1723,10 +1679,7 @@ async def power_up_setpoint(self, val: float = "") -> dict[str, str | float]: dict[str, str | float]: Dataframe with current (not power-up) setpoint """ if self._vers and self._vers < 8.04: - # print("Error: Version earlier than 8v04") raise VersionError("Version earlier than 8v04") - return - # Gets the format of the dataframe if it is not already known if self._df_format is None: await self.get_df_format() ret = await self._device._write_readline(f"{self._id}SPUE {val}") @@ -1751,10 +1704,7 @@ async def overpressure(self, limit: float = "") -> dict[str, str | float]: dict[str, str | float]: Dataframe """ if self._vers and self._vers < 5.09: - # print("Error: Version earlier than 5v09") raise VersionError("Version earlier than 5v09") - return - # Gets the format of the dataframe if it is not already known if self._df_format is None: await self.get_df_format() ret = await self._device._write_readline(f"{self._id}OPL {limit}") @@ -1781,10 +1731,8 @@ async def ramp( dict[str, str]: Dataframe """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Ramp_Up", "Ramp_Down", "Zero_Ramp", "Power_Up_Ramp"] + LABELS = ("Unit_ID", "Ramp_Up", "Ramp_Down", "Zero_Ramp", "Power_Up_Ramp") if isinstance(up, bool): up = "1" if up else "0" if isinstance(down, bool): @@ -1821,10 +1769,8 @@ async def setpoint_source(self, mode: str = "") -> dict[str, str]: dict[str, str]: Setpoint source mode """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Mode"] + LABELS = ("Unit_ID", "Mode") ret = await self._device._write_readline(f"{self._id}LSS {mode}") ret = ret.split() mapping = { @@ -1852,10 +1798,8 @@ async def valve_offset( dict[str, str | float]: Offset values """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - return - LABELS = ["Unit_ID", "Init_Offset_(%)", "Closed_Offset_(%)"] + LABELS = ("Unit_ID", "Init_Offset_(%)", "Closed_Offset_(%)") if isinstance(save, bool): save = "0 1" if save else "0 0" ret = await self._device._write_readline( @@ -1878,9 +1822,8 @@ async def zero_pressure_control(self, enable: bool = "") -> dict[str, str]: dict[str, str]: If active control is active or not """ if self._vers and self._vers < 10.05: - # print("Error: Version earlier than 10v05") raise VersionError("Version earlier than 10v05") - LABELS = ["Unit_ID", "Active_Ctrl"] + LABELS = ("Unit_ID", "Active_Ctrl") if isinstance(enable, bool): enable = "1" if enable else "0" ret = await self._device._write_readline(f"{self._id}LCZA {enable}") @@ -1923,9 +1866,7 @@ async def exhaust(self) -> dict[str, str | float]: dict[str, str | float]: Returns data frame with 'hold' status """ if self._vers and self._vers < 4.37: - # print("Error: Version earlier than 4v37") raise VersionError("Version earlier than 4v37") - return if self._df_format is None: await self.get_df_format() gas = gases.get(gas, "") @@ -1948,9 +1889,7 @@ async def hold_valves(self) -> dict[str, str | float]: dict[str, str | float]: Returns data frame with 'hold' status """ if self._vers and self._vers < 5.07: - # print("Error: Version earlier than 5v07") raise VersionError("Version earlier than 5v07") - return if self._df_format is None: await self.get_df_format() gas = gases.get(gas, "") @@ -1973,9 +1912,7 @@ async def hold_valves_closed(self) -> dict[str, str | float]: dict[str, str | float]: Returns data frame with 'hold' status """ if self._vers and self._vers < 5.07: - # print("Error: Version earlier than 5v07") raise VersionError("Version earlier than 5v07") - return if self._df_format is None: await self.get_df_format() gas = gases.get(gas, "") @@ -1998,10 +1935,8 @@ async def query_valve(self) -> dict[str, str | float]: dict[str, str | float]: Valve drive percentages """ if self._vers and self._vers < 8.18: - # print("Error: Version earlier than 8v18") raise VersionError("Version earlier than 8v18") - return - LABELS = ["Unit_ID", "Upstream_Valve", "Downstream_Valve", "Exhaust_Valve"] + LABELS = ("Unit_ID", "Upstream_Valve", "Downstream_Valve", "Exhaust_Valve") if isinstance(enable, bool): enable = "1" if enable else "0" ret = await self._device._write_readline(f"{self._id}LCZA {enable}") diff --git a/pyproject.toml b/pyproject.toml index bc518c3..34c0ac7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ uvloop = "^0.19.0" [tool.poetry.dev-dependencies] bandit = "^1.7.7" -mypy = "^1.0" +mypy = "^1.10.0" mypy-extensions = "^1.0.0" pyupgrade = "^3.16.0" pre-commit = "^3.6.0" @@ -57,7 +57,7 @@ pytest-html = "^4.1.1" pytest-cov = "^5.0.0" mkdocs-material = "^9.5.17" mkdocstrings = {extras = ["python"], version = "^0.25.0"} -ruff = "^0.4.0" +ruff = "^0.4.9" [tool.mypy] # https://mypy.readthedocs.io/en/latest/config_file.html#using-a-pyproject-toml-file