Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability for some nodes to change their task configuration in an online manner #406

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions angel-docker-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ do
shift
FORCE_BUILD=1
;;
--)
# Escape the remainder of args as to be considered passthrough
shift
dc_forward_params+=("${@}")
break
;;
*) # anything else
dc_forward_params+=("$1")
shift
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# File paths will be resolved expanding environment variables via
# `os.path.expandvars`.
# If a variable is referenced that does not exist in the environment then it
# will not be expanded and the text will be left as is.
#
# Be sure to consider the environment varaibles set by the tmuxinator
# configuration that will ultimately be running the node that consumes this
# file.
#
task_config:
m2:
object_net_checkpoint: "${MODEL_DIR}/object_detector/m2_det.pt"
m3:
object_net_checkpoint: "${MODEL_DIR}/object_detector/m3_det.pt"
m5:
object_net_checkpoint: "${MODEL_DIR}/object_detector/m5_det.pt"
r18:
object_net_checkpoint: "${MODEL_DIR}/object_detector/r18_det.pt"
2 changes: 2 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ services:
- ${WORKSPACE_SHELL_HOST_DIR}/stuff:${ANGEL_WORKSPACE_DIR}/stuff
- ${WORKSPACE_SHELL_HOST_DIR}/ros_home:/root/.ros
- ${WORKSPACE_SHELL_HOST_DIR}/cache/torch:/root/.cache/torch
- ${WORKSPACE_SHELL_HOST_DIR}/cache/pip:/root/.cache/pip
- ${WORKSPACE_SHELL_HOST_DIR}/cache/poetry:/root/.cache/pypoetry
# X11 things
- /tmp/.X11-unix:/tmp/.X11-unix
# assume this file exists, should be created before running.
Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ openpyxl = "^3.0.10"
pandas = ">=1.4.3"
Pillow = "=9.1.0"
pyarrow = ">=9.0.0" # for feature file support
pydantic = "^1"
pynput = "^1.7.6"
wheel = "<=0.43.0"
python-dotenv = ">=1.0.0"
Expand Down
36 changes: 36 additions & 0 deletions ros/angel_msgs/msg/SystemCommands.msg
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,44 @@
# received message instance.
#

# Time at which this command was emitted.
# -> Not using a `std_msgs/Header` here as the use of frame_id is currently
# undefined.
builtin_interfaces/Time stamp


# =====================
# Task Monitor commands
# =====================

# Index of GSP tasks being concurrently handled to indicate step progression/regression for.
int8 task_index

# If the current task progress should be reset.
bool reset_current_task

# If the current task progress should be regressed a step,
bool previous_step

# If the current task progress should be progressed a step.
bool next_step


# =============
# Task Changing
# =============

# Enumeration of sorts for the tasks that may be switched to.
#
# This is an enumeration here in order to be a source of coordination between
# nodes that support this functionality as just simply passing a string can
# easily be subject to spelling errors or misalignment between nodes.
#
uint32 TASK_NO_CHANGE = 0
uint32 TASK_COOKING = 1 # Cooking is "meta" atm because it is multi-task under the hood.
uint32 TASK_MEDICAL_M2 = 2
uint32 TASK_MEDICAL_M3 = 3
uint32 TASK_MEDICAL_R18 = 4

# The task that is being changed to.
uint32 change_task
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def intent_callback(self, intent: InterpretedAudioUserIntent) -> None:
return

sys_cmd_msg = SystemCommands()
sys_cmd_msg.stamp = self.get_clock().now().to_msg()

# TODO: This only works currently since we only recognize a few boolean
# based commands (next step, previous step). This will break down if
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import os.path
from pathlib import Path
from threading import Event, Lock, Thread
from typing import Union
from threading import Event, RLock, Thread
from typing import Dict, Union

from cv_bridge import CvBridge
import cv2
import numpy as np
from pydantic import BaseModel, validator
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup, ReentrantCallbackGroup
from rclpy.node import Node, ParameterDescriptor, Parameter
from sensor_msgs.msg import Image
import yaml

from yolov7.detect_ptg import load_model, predict_image
from angel_system.object_detection.yolov8_detect import predict_hands
Expand All @@ -19,14 +22,44 @@
from angel_system.utils.event import WaitAndClearEvent
from angel_system.utils.simple_timer import SimpleTimer

from angel_msgs.msg import ObjectDetection2dSet
from angel_msgs.msg import ObjectDetection2dSet, SystemCommands
from angel_utils import declare_and_get_parameters, RateTracker # , DYNAMIC_TYPE
from angel_utils import make_default_main
from angel_utils.system_commands import task_int_to_str


BRIDGE = CvBridge()


class OaHDTaskConfig(BaseModel):
"""
Structure of the configuration for a single task
for representation and validation.
"""
object_net_checkpoint: Path

@validator("object_net_checkpoint")
@classmethod
def expand_vars(cls, v: Path) -> Path:
"""
Expand path values with environment variables via os.path.expandvars
"""
p = Path(os.path.expandvars(v))
if not p.is_file():
raise ValueError(f"The file \"{p}\" does not exist or was not a file.")
elif not os.access(p, os.R_OK):
raise ValueError(f"The file \"{p}\" is not readable.")
return p


class OaHDPerTaskConfig(BaseModel):
"""
Structure of the whole configuration file across multiple tasks
for representation and validation.
"""
task_config: Dict[str, OaHDTaskConfig]


class ObjectAndHandDetector(Node):
"""
ROS node that runs the yolov7 object detector model and outputs
Expand All @@ -45,8 +78,15 @@ def __init__(self):
# Required parameter (no defaults)
("image_topic",),
("det_topic",),
("object_net_checkpoint",),
("system_commands_topic",),
("hand_net_checkpoint",),
("per_task_config",),
# The task to load parameters for initially, from our per-task
# config. This of course needs to be a valid "task_config"
# entry in the input configuration file. Intentionally not
# setting a default as this probably depends on the system
# configuration.
("default_task",),
##################################
# Defaulted parameters
("inference_img_size", 1280), # inference size (pixels)
Expand All @@ -64,9 +104,14 @@ def __init__(self):
)
self._image_topic = param_values["image_topic"]
self._det_topic = param_values["det_topic"]

self._object_model_ckpt_fp = Path(param_values["object_net_checkpoint"])
self._system_commands_topic = param_values["system_commands_topic"]
self._hand_model_chpt_fp = Path(param_values["hand_net_checkpoint"])
with open(param_values["per_task_config"]) as config_file:
# Load and validate config into structure.
self._per_task_config = OaHDPerTaskConfig(
**yaml.load(config_file, yaml.Loader)
)
self._default_task = param_values["default_task"]

self._inference_img_size = param_values["inference_img_size"]
self._det_conf_thresh = param_values["det_conf_threshold"]
Expand All @@ -77,7 +122,23 @@ def __init__(self):

self._enable_trace_logging = param_values["enable_time_trace_logging"]

# Lock to protect task-dependent logic. A simple mutex is appropriate
# for this implementation because the critical sections here will be
# the logic that changes task configuration and the object detection
# inference logic. If configuration usage becomes something that
# multiple concurrent agents use, then a reader-writer lock will need
# to be utilized.
self._task_dependent_lock = RLock()

# Timestamp of the latest configuration change. Input messages before
# this time should not be processed as they are stale relative to the
# configuration change. This should be in nanoseconds format.
self._timestamp_min = 0

# Object Model
# TODO: handle with task change logic. Initially "task change" to the
# default task configured.
self._object_model_ckpt_fp = Path(param_values["object_net_checkpoint"])
self.object_model: Union[yolov7.models.yolo.Model, TracedModel]
if not self._object_model_ckpt_fp.is_file():
raise ValueError(
Expand All @@ -95,16 +156,23 @@ def __init__(self):

# Single slot for latest image message to process detection over.
self._cur_image_msg: Image = None
self._cur_image_msg_lock = Lock()
self._cur_image_msg_lock = RLock()

# Initialize ROS hooks
self._subscription = self.create_subscription(
self._image_subscription = self.create_subscription(
Image,
self._image_topic,
self.listener_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self._sys_cmd_subscription = self.create_subscription(
Image,
self._system_commands_topic,
self.system_commands_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self._det_publisher = self.create_publisher(
ObjectDetection2dSet,
self._det_topic,
Expand Down Expand Up @@ -142,6 +210,18 @@ def __init__(self):
self._rt_thread.daemon = True
self._rt_thread.start()

def load_task_configuration(self, task_name: str) -> None:
"""
Load task specific configuration parameters for the given task name.
If the task name provided is not one supported, an exception is raised.

:param task_name: Name of the task to change to.
"""
with self._task_dependent_lock:
self._object_model_ckpt_fp = (
self._per_task_config.task_config[task_name].object_net_checkpoint
)

def listener_callback(self, image: Image):
"""
Callback function for image messages. Runs the berkeley object detector
Expand All @@ -154,6 +234,17 @@ def listener_callback(self, image: Image):
self._cur_image_msg = image
self._rt_awake_evt.set()

def system_commands_callback(self, msg: SystemCommands):
"""
Handle receiving a SystemCommands message.
"""
# Handle a request to changing the task
if msg.change_task != SystemCommands.TASK_NO_CHANGE:
raise NotImplementedError(
f"Not yet handling task changes for requested task "
f"{task_int_to_str(msg.change_task)}"
)

def rt_alive(self) -> bool:
"""
Check that the prediction runtime is still alive and raise an exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ def sys_cmd_callback(self, sys_cmd_msg: SystemCommands):
elif self._step_mode == "granular" and sys_cmd_msg.previous_step:
update_function = self.gsp.decrement_granular_step
else:
# This should never happen
# No next/previous step command received, nothing to do.
# TODO: Handle resetting the current task if that command is
# set.
return

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def publish_sys_cmd(self, task_id: int, forward: bool) -> None:
"""
log = self.get_logger()
msg = SystemCommands()
msg.stamp = self.get_clock().now().to_msg()
msg.task_index = task_id
if forward:
msg.next_step = True
Expand Down
36 changes: 36 additions & 0 deletions ros/angel_utils/python/angel_utils/system_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from functools import lru_cache
from typing import Dict

from angel_msgs.msg import SystemCommands


__all__ = [
"task_int_to_str"
]


@lru_cache()
def _get_sys_cmds_task_map() -> Dict[int, str]:
"""
Generate the TASK enumeration value to string mapping based on the contents
of the SystemCommands structure at the time of the call.

This should allow this functionality to be dynamic to any changes in the
SystemCommands message.

:returns: Dictionary mapping integer SystemCommands.TASK_* values into
their enumeration names.
"""
return {v: k for k, v in SystemCommands.__dict__.items() if k.startswith("TASK_")}


def task_int_to_str(i: int) -> str:
"""
Convert the integer value intended to represent one of the "TASK_*"
enumeration values in the `angel_msgs.msg.SystemCommands` message type into
the enumeration property string name.

:param i: Task enumeration value.
:return: String name of the enumeration value.
"""
return _get_sys_cmds_task_map()[i]
2 changes: 1 addition & 1 deletion tmux/demos/medical/BBN-M2-Tourniquet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ windows:
- BBN Interface:
layout: even-vertical
panes:
- task-converter: ros2 run bbn_integration_py task_to_bbn_update --ros-args
- task-converter: ros2 run bbn_integration_py task_to_bbn_update --ros-args
-r __ns:=${ROS_NAMESPACE}
-p task_update_topic:=TaskUpdates
-p bbn_update_topic:=BBNUpdates
Expand Down
Loading
Loading