Skip to content

Commit

Permalink
Add new serial module, remove unused dependencies, and use in_waiting…
Browse files Browse the repository at this point in the history
… to intelligently read
  • Loading branch information
GraysonBellamy committed Jun 9, 2024
1 parent efe06e0 commit f77609d
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 140 deletions.
85 changes: 49 additions & 36 deletions pyalicat/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
Date: 2024-01-05
"""

from typing import Optional
from collections.abc import ByteString

from abc import ABC, abstractmethod
from collections.abc import ByteString
from typing import Optional

import trio
from trio_serial import Parity, SerialStream, StopBits
import anyio
import anyio.lowlevel
from anyserial import SerialStream
from anyserial.abstract import Parity, StopBits


class CommDevice(ABC):
Expand Down Expand Up @@ -111,13 +112,13 @@ def __init__(
"bytesize": databits,
"parity": parity,
"stopbits": stopbits,
"xonxoff": xonxoff,
"rtscts": rtscts,
# "xonxoff": xonxoff,
# "rtscts": rtscts,
}
self.isOpen = False
self.ser_devc = SerialStream(**self.serial_setup)

async def _read(self, len: int = 1) -> ByteString:
async def _read(self, len: int = None) -> ByteString:
"""Reads the serial communication.
Args:
Expand All @@ -126,13 +127,15 @@ async def _read(self, len: int = 1) -> ByteString:
Returns:
ByteString: The serial communication.
"""
if len is None:
len = self.ser_devc.in_waiting()
if len == 0:
return None
if not self.isOpen:
async with self.ser_devc:
with trio.move_on_after(self.timeout / 1000):
return await self.ser_devc.receive_some(len)
else:
with trio.move_on_after(self.timeout / 1000):
return await self.ser_devc.receive_some(len)
else:
return await self.ser_devc.receive_some(len)
return None

async def _write(self, command: str) -> None:
Expand All @@ -143,10 +146,10 @@ async def _write(self, command: str) -> None:
"""
if not self.isOpen:
async with self.ser_devc:
with trio.move_on_after(self.timeout / 1000):
with anyio.move_on_after(self.timeout / 1000):
await self.ser_devc.send_all(command.encode("ascii") + self.eol)
else:
with trio.move_on_after(self.timeout / 1000):
with anyio.move_on_after(self.timeout / 1000):
await self.ser_devc.send_all(command.encode("ascii") + self.eol)
return None

Expand All @@ -161,12 +164,16 @@ async def _readline(self) -> str:
line = bytearray()
while True:
c = None
with trio.move_on_after(self.timeout / 1000):
c = await self._read(1)
line += c
if c == self.eol:
break
if c is None:
with anyio.move_on_after(
self.timeout / 1000
): # if keep reading none, then timeout
while c is None: # Keep reading until a character is read
c = await self._read()
await anyio.lowlevel.checkpoint()
if c is None: # if we reach timeout,
break
line += c
if self.eol in line:
break
self.isOpen = False
return line.decode("ascii")
Expand All @@ -187,16 +194,18 @@ async def _write_readall(self, command: str, timeout: int = None) -> list[str]:
await self._write(command)
line = bytearray()
arr_line = []
with trio.move_on_after(timeout / 1000):
while True:
c = await self._read(1)
if c is None:
break
if c == self.eol:
arr_line.append(line.decode("ascii"))
line = bytearray()
else:
line += c
while True:
c = None
with anyio.move_on_after(
self.timeout / 1000
): # if keep reading none, then timeout
while c is None: # Keep reading until a character is read
c = await self._read()
await anyio.lowlevel.checkpoint()
if c is None: # if we reach timeout,
break
line += c
arr_line = line.decode("ascii").splitlines()
self.isOpen = False
return arr_line

Expand All @@ -215,12 +224,16 @@ async def _write_readline(self, command: str) -> str:
line = bytearray()
while True:
c = None
with trio.move_on_after(self.timeout / 1000):
c = await self._read(1)
if c == self.eol:
break
line += c
if c is None:
with anyio.move_on_after(
self.timeout / 1000
): # if keep reading none, then timeout
while c is None: # Keep reading until a character is read
c = await self._read()
await anyio.lowlevel.checkpoint()
if c is None: # if we reach timeout,
break
line += c
if self.eol in line:
break
self.isOpen = False
return line.decode("ascii")
Expand Down
90 changes: 45 additions & 45 deletions pyalicat/daq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@
Date: 2024-01-07
"""

import device
from typing import Any
from trio import open_nursery
import csv
import time
from datetime import datetime
import warnings
import asyncpg
import trio_asyncio
from trio_asyncio import run
from threading import Thread
from datetime import datetime
from queue import Queue
from threading import Thread
from typing import Any

import asyncpg
import device
from anyio import create_task_group, run

warnings.filterwarnings("always")

Expand Down Expand Up @@ -145,12 +143,12 @@ async def get(
if val and isinstance(val, str):
val = val.split()
if not id:
async with open_nursery() as g:
async with create_task_group() as g:
for dev in dev_list:
g.start_soon(self.update_dict_get, ret_dict, dev, val)
if id and isinstance(id, str):
id = id.split()
async with open_nursery() as g:
async with create_task_group() as g:
for i in id:
g.start_soon(self.update_dict_get, ret_dict, i, val)
return ret_dict
Expand Down Expand Up @@ -190,12 +188,12 @@ async def set(self, command: dict[str, str | float], id: list = "") -> None:
if isinstance(command, str):
command = command.split()
if not id:
async with open_nursery() as g:
async with create_task_group() as g:
for dev in dev_list:
g.start_soon(self.update_dict_set, ret_dict, dev, command)
if isinstance(id, str):
id = id.split()
async with open_nursery() as g:
async with create_task_group() as g:
for i in id:
g.start_soon(self.update_dict_set, ret_dict, i, command)
return ret_dict
Expand All @@ -205,14 +203,27 @@ class AsyncPG:
"""Async context manager for connecting to a PostgreSQL database using asyncpg."""

def __init__(self, **kwargs):
"""Initializes the AsyncPG object."""
self.conn = None
self.kwargs = kwargs

async def __aenter__(self):
"""Connects to the database.
Returns:
asyncpg.Connection: The connection object.
"""
self.conn = await asyncpg.connect(**self.kwargs)
return self.conn

async def __aexit__(self, exc_type, exc, tb):
"""Closes the connection to the database.
Args:
exc_type: The exception type.
exc: The exception.
tb: The traceback.
"""
await self.conn.close()
self.conn = None

Expand Down Expand Up @@ -262,11 +273,9 @@ async def create_table(self, dict, conn):
dict (dict): The dictionary containing the data to be added as columns.
conn: The connection object to the database.
"""
async with trio_asyncio.aio_as_trio(conn.transaction()):
await trio_asyncio.aio_as_trio(
conn.execute(
"CREATE TABLE IF NOT EXISTS alicat (Time timestamp, Device text, PRIMARY KEY (Time, Device))"
)
async with conn.transaction():
await conn.execute(
"CREATE TABLE IF NOT EXISTS alicat (Time timestamp, Device text, PRIMARY KEY (Time, Device))"
)
keys = sorted(dict.keys(), key=self._key_func)
for key in keys:
Expand All @@ -275,16 +284,12 @@ async def create_table(self, dict, conn):
data_type = "timestamp"
elif isinstance(dict[key], float):
data_type = "float"
await trio_asyncio.aio_as_trio(
conn.execute(
f"ALTER TABLE alicat ADD COLUMN IF NOT EXISTS {''.join(key.split()).lower()} {data_type}"
)
await conn.execute(
f"ALTER TABLE alicat ADD COLUMN IF NOT EXISTS {''.join(key.split()).lower()} {data_type}"
)
await trio_asyncio.aio_as_trio(
conn.execute(
"SELECT create_hypertable('alicat', by_range('time'), if_not_exists => TRUE)"
) # create the timescaledb hypertable
)
await conn.execute(
"SELECT create_hypertable('alicat', by_range('time'), if_not_exists => TRUE)"
) # create the timescaledb hypertable

async def insert_data(self, dict, conn):
"""Inserts the data into the database.
Expand All @@ -293,30 +298,25 @@ async def insert_data(self, dict, conn):
dict (dict): The dictionary containing the data to be added.
conn: The connection object to the database.
"""
async with trio_asyncio.aio_as_trio(conn.transaction()):
async with conn.transaction():
for dev in dict:
await trio_asyncio.aio_as_trio(
conn.execute(
"INSERT INTO alicat ("
+ ", ".join(
[key.lower().replace(" ", "") for key in dev.keys()]
)
+ ") VALUES ("
+ ", ".join(["$" + str(i + 1) for i in range(len(dev))])
+ ")",
*dev.values(),
) # We could optimize this by using a single insert statement for all devices. We would have to make sure that the order of the values is the same for all devices and it would only work if they all have the same fields. That is, it wouldn't work for the flowmeter in our case because it has RH values
)
await conn.execute(
"INSERT INTO alicat ("
+ ", ".join([key.lower().replace(" ", "") for key in dev.keys()])
+ ") VALUES ("
+ ", ".join(["$" + str(i + 1) for i in range(len(dev))])
+ ")",
*dev.values(),
) # We could optimize this by using a single insert statement for all devices. We would have to make sure that the order of the values is the same for all devices and it would only work if they all have the same fields. That is, it wouldn't work for the flowmeter in our case because it has RH values

async def update_dict_log(
self, Daq, qualities
) -> dict[str, dict[str, str | float]]:
"""Updates the dictionary with the new values.
Args:
ret_dict (dict): The dictionary of devices to update.
dev (str): The name of the device.
val (list): The values to get from the device.
Daq (DAQ): The DAQ object to take readings from.
qualities (list): The list of qualities to log.
Returns:
dict: The dictionary of devices with the updated values.
Expand All @@ -343,7 +343,7 @@ async def logging(
rate = self.rate
database = self.database
rows = []
async with trio_asyncio.aio_as_trio(database) as conn:
async with database as conn:
self.df = await self.Daq.get(self.qualities)
unique = dict()
for dev in self.df:
Expand Down Expand Up @@ -378,7 +378,7 @@ async def logging(
if write_async:
nurse_time = time.time_ns()
# open_nursery
async with open_nursery() as g:
async with create_task_group() as g:
# insert_data from the previous iteration
g.start_soon(self.insert_data, rows, conn)
# get
Expand Down
18 changes: 13 additions & 5 deletions pyalicat/device.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from typing import Any
"""This module contains the Device class and its subclasses.
Raises:
VersionError: Raised when the version of the device does not support command.
Returns:
dict: The dictionary of devices with the updated values.
"""

import json
import re
import warnings
from abc import ABC
from typing import Any

import anyio
from anyio import run
from comm import SerialDevice
import trio
from trio import run
import warnings

# from .device import Device
warnings.filterwarnings("always")
Expand Down Expand Up @@ -1206,7 +1214,7 @@ async def get_units(self, stats: list) -> dict:
"""Gets the units of the current dataframe format of the device.
Args:
measurement (list): List of measurements to get units for.
stats (list): List of statistics to get units for.
Returns:
list: Units of statistics in measurement
Expand Down
38 changes: 0 additions & 38 deletions pyalicat/test.csv

This file was deleted.

Loading

0 comments on commit f77609d

Please sign in to comment.