Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GitHub Pipeline Sionna Integration #19

Merged
merged 18 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
sudo apt update
sudo apt-get install -y build-essential octave portaudio19-dev python-dev-is-python3
export MAKEFLAGS="-j $(grep -c ^processor /proc/cpuinfo)"
pip install -e .[develop,test,documentation,quadriga,uhd,audio]
pip install -e .[develop,test,quadriga,uhd,audio,sionna,scapy]

- name: Run unit tests
run: |
Expand Down Expand Up @@ -73,6 +73,7 @@ jobs:
with:
python-version: '3.11'

# Note: Sionna dependencies crash with hermespy[documentation] due to outdated ipywidgets requirement on Sionna's side
- name: Install doc dependencies
run: |
sudo apt update
Expand Down
4 changes: 2 additions & 2 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Unit Testing:
- Build Python 3.11
before_script:
- apt -qq update && apt-get -qq install -y octave portaudio19-dev python-dev-is-python3 unzip # remove for docker
- pip install -qq -e .\[develop,test,quadriga,audio,sionna\]
- pip install -qq -e .\[develop,test,quadriga,audio,sionna,scapy\]
- unzip dist/$HERMES_WHEEL_11 "hermespy/fec/aff3ct/*.so"
- pip install -qq pyzmq>=25.1.1 usrp-uhd-client memray>=1.11.0
script:
Expand All @@ -93,7 +93,7 @@ Integration Testing:
- Build Python 3.11
before_script:
- apt -qq update && apt-get -qq install -y octave portaudio19-dev python-dev-is-python3 # remove for docker
- pip install -qq dist/$HERMES_WHEEL_11\[test,quadriga,audio,sionna\]
- pip install -qq dist/$HERMES_WHEEL_11\[test,quadriga,audio,sionna,scapy\]
- pip install -qq memray
script:
- python ./tests/test_install.py ./tests/integration_tests/
Expand Down
54 changes: 54 additions & 0 deletions _examples/library/rotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-

import matplotlib.pyplot as plt
import numpy as np

from hermespy.core import Transformation
from hermespy.simulation import Simulation, LinearTrajectory, StaticTrajectory


# Setup flags
flag_traj_static_a = False # Is alpha device trajectory static?
flag_traj_static_b = False # Is beta device trajectory static?
flag_lookat = True # Should the devices look at each other?

# Create a new simulation featuring two devices
simulation = Simulation()
device_alpha = simulation.new_device()
device_beta = simulation.new_device()
duration = 60

# Init positions and poses
init_pose_alpha = Transformation.From_Translation(np.array([10., 10., 0.]))
fina_pose_alpha = Transformation.From_Translation(np.array([50., 50., 0.]))
init_pose_beta = Transformation.From_Translation(np.array([30., 10., 20.]))
fina_pose_beta = Transformation.From_Translation(np.array([30., 20., 20.]))
if flag_lookat:
init_pose_alpha = init_pose_alpha.lookat(init_pose_beta.translation)
fina_pose_alpha = fina_pose_alpha.lookat(fina_pose_beta.translation)
init_pose_beta = init_pose_beta.lookat(init_pose_alpha.translation)
fina_pose_beta = fina_pose_beta.lookat(fina_pose_alpha.translation)

# Assign each device a trajectory
# alpha
if flag_traj_static_a:
device_alpha.trajectory = StaticTrajectory(init_pose_alpha)
else:
device_alpha.trajectory = LinearTrajectory(init_pose_alpha, fina_pose_alpha, duration)
# beta
if flag_traj_static_b:
device_beta.trajectory = StaticTrajectory(init_pose_beta)
else:
device_beta.trajectory = LinearTrajectory(init_pose_beta, fina_pose_beta, duration)

# Lock the devices onto each other
if flag_lookat:
device_alpha.trajectory.lookat(device_beta.trajectory)
device_beta.trajectory.lookat(device_alpha.trajectory)

visualization = simulation.scenario.visualize()
with plt.ion():
for timestamp in np.linspace(0, duration, 200):
simulation.scenario.visualize.update_visualization(visualization, time=timestamp)
plt.pause(0.1)
plt.show()
5 changes: 3 additions & 2 deletions hermespy/channel/radar/radar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sparse import GCXS # type: ignore

from hermespy.core import (
AntennaMode,
ChannelStateInformation,
ChannelStateFormat,
Direction,
Expand Down Expand Up @@ -609,10 +610,10 @@ def propagation_response(
) -> np.ndarray:
# Query the sensor array responses
rx_response = receiver.antennas.cartesian_array_response(
carrier_frequency, self.position, "global"
carrier_frequency, self.position, "global", AntennaMode.RX
)
tx_response = transmitter.antennas.cartesian_array_response(
carrier_frequency, self.position, "global"
carrier_frequency, self.position, "global", AntennaMode.TX
).conj()

if self.attenuate:
Expand Down
65 changes: 30 additions & 35 deletions hermespy/channel/sionna_rt_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def __apply_doppler(self, num_samples: int) -> tuple:
tau (np.ndarray): delays. Shape (num_rx_ants, num_tx_ants, num_paths)
"""
# Apply doppler
self.paths.apply_doppler(sampling_frequency=self.bandwidth,
num_time_steps=num_samples,
tx_velocities=self.transmitter_velocity,
rx_velocities=self.receiver_velocity)
self.paths.apply_doppler(
sampling_frequency=self.bandwidth,
num_time_steps=num_samples,
tx_velocities=self.transmitter_velocity,
rx_velocities=self.receiver_velocity,
)

# Get and cast CIR
a, tau = self.paths.cir()
Expand All @@ -106,19 +108,23 @@ def state(
self,
num_samples: int,
max_num_taps: int,
interpolation_mode: InterpolationMode = InterpolationMode.NEAREST
interpolation_mode: InterpolationMode = InterpolationMode.NEAREST,
) -> ChannelStateInformation:
# Apply Doppler effect and get the channel impulse response
a, tau = self.__apply_doppler(num_samples)

# Init result
max_delay = np.max(tau) if tau.size != 0 else 0
max_delay_in_samples = min(max_num_taps, ceil(max_delay * self.bandwidth))
raw_state = np.zeros((
self.num_receive_antennas,
self.num_transmit_antennas,
num_samples,
1 + max_delay_in_samples), dtype=np.complex_)
raw_state = np.zeros(
(
self.num_receive_antennas,
self.num_transmit_antennas,
num_samples,
1 + max_delay_in_samples,
),
dtype=np.complex_,
)
# If no paths hit the target, then return an empty state
if a.size == 0 or tau.size == 0:
return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state)
Expand All @@ -136,9 +142,7 @@ def state(
return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state)

def _propagate(
self,
signal_block: SignalBlock,
interpolation: InterpolationMode,
self, signal_block: SignalBlock, interpolation: InterpolationMode
) -> SignalBlock:
# Calculate the resulting signal block parameters
sr_ratio = self.receiver_state.sampling_rate / self.transmitter_state.sampling_rate
Expand All @@ -150,16 +154,16 @@ def _propagate(
a, tau = self.__apply_doppler(signal_block.num_samples)
# If no paths hit the target, then return a zeroed signal
if a.size == 0 or tau.size == 0:
return SignalBlock(np.zeros((num_streams_new, num_samples_new),
signal_block.dtype),
offset_new)
return SignalBlock(
np.zeros((num_streams_new, num_samples_new), signal_block.dtype), offset_new
)

# Set other attributes
max_delay = np.max(tau)
max_delay_in_samples = ceil(max_delay * self.bandwidth)
propagated_samples = np.zeros(
(num_streams_new, signal_block.num_samples + max_delay_in_samples),
dtype=signal_block.dtype
dtype=signal_block.dtype,
)

# Prepare the optimal einsum path ahead of time for faster execution
Expand All @@ -173,9 +177,9 @@ def _propagate(
if tau_p == -1.0:
continue
t = int(tau_p * self.bandwidth)
propagated_samples[
:, t : t + signal_block.num_samples
] += np.einsum(einsum_subscripts, a_p, signal_block, optimize=einsum_path)
propagated_samples[:, t : t + signal_block.num_samples] += np.einsum(
einsum_subscripts, a_p, signal_block, optimize=einsum_path
)

propagated_samples *= np.sqrt(self.__gain)
return SignalBlock(propagated_samples, offset_new)
Expand All @@ -189,10 +193,9 @@ class SionnaRTChannelRealization(ChannelRealization[SionnaRTChannelSample]):

scene: rt.Scene

def __init__(self,
scene: rt.Scene,
sample_hooks: Set[ChannelSampleHook] | None = None,
gain: float = 1.) -> None:
def __init__(
self, scene: rt.Scene, sample_hooks: Set[ChannelSampleHook] | None = None, gain: float = 1.0
) -> None:
super().__init__(sample_hooks, gain)
self.scene = scene

Expand All @@ -205,18 +208,12 @@ def _sample(self, state: LinkState) -> SionnaRTChannelSample:

# init self.scene.tx_array
tx_antenna = rt.Antenna("iso", "V")
tx_positions = [
a.position
for a in state.transmitter.antennas.transmit_antennas
]
tx_positions = [a.position for a in state.transmitter.antennas.transmit_antennas]
self.scene.tx_array = rt.AntennaArray(tx_antenna, tx_positions)

# init self.scene.rx_array
rx_antenna = rt.Antenna("iso", "V")
rx_positions = [
a.position
for a in state.receiver.antennas.receive_antennas
]
rx_positions = [a.position for a in state.receiver.antennas.receive_antennas]
self.scene.rx_array = rt.AntennaArray(rx_antenna, rx_positions)

# init tx and rx
Expand Down Expand Up @@ -244,9 +241,7 @@ def to_HDF(self, group: Group) -> None:

@staticmethod
def From_HDF(
scene: rt.Scene,
group: Group,
sample_hooks: Set[ChannelSampleHook[SionnaRTChannelSample]]
scene: rt.Scene, group: Group, sample_hooks: Set[ChannelSampleHook[SionnaRTChannelSample]]
) -> SionnaRTChannelRealization:
return SionnaRTChannelRealization(scene, sample_hooks, group.attrs["gain"])

Expand Down
3 changes: 1 addition & 2 deletions hermespy/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
register,
)
from .random_node import RandomRealization, RandomNode
from .drop import Drop, RecalledDrop
from .drop import Drop
from .scenario import Scenario, ScenarioMode, ScenarioType, ReplayScenario
from .signal_model import Signal, SignalBlock, DenseSignal, SparseSignal
from .visualize import (
Expand Down Expand Up @@ -168,7 +168,6 @@
"RandomRealization",
"RandomNode",
"Drop",
"RecalledDrop",
"Scenario",
"ScenarioMode",
"ScenarioType",
Expand Down
79 changes: 69 additions & 10 deletions hermespy/core/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ def to_HDF(self, group: Group) -> None:
self.mixed_signal.to_HDF(self._create_group(group, "mixed_signal"))


DTT = TypeVar("DTT", bound="DeviceTransmission")
"""Type of device transmission."""


class DeviceTransmission(DeviceOutput):
"""Information generated by transmitting over a device."""

Expand Down Expand Up @@ -465,17 +469,38 @@ def num_operator_transmissions(self) -> int:
return len(self.__operator_transmissions)

@classmethod
def from_HDF(cls: Type[DeviceTransmission], group: Group) -> DeviceTransmission:
def from_HDF(
cls: Type[DeviceTransmission], group: Group, operators: Sequence[Transmitter] | None = None
) -> DeviceTransmission:
"""Recall a device transmission from a serialization.

Args:

group (Group):
HDF5 group containing the serialized device transmission.

operators (Sequence[Transmitter], optional):
List of device transmitters to recall the specific transmissions.
If not specified, the transmissions are recalled as their base class.
"""

# Recall base class
device_output = DeviceOutput.from_HDF(group)

# Recall attributes
num_transmissions = group.attrs.get("num_transmissions", 1)

# Recall transmissions
transmissions = [
Transmission.from_HDF(group[f"transmission_{t:02d}"]) for t in range(num_transmissions)
]
if operators is None:
transmissions = [
Transmission.from_HDF(group[f"transmission_{t:02d}"])
for t in range(num_transmissions)
]
else:
transmissions = [
operator.recall_transmission(group[f"transmission_{t:02d}"])
for t, operator in zip(range(num_transmissions), operators)
]

# Initialize object
return cls.From_Output(device_output, transmissions)
Expand Down Expand Up @@ -706,20 +731,26 @@ def num_operator_receptions(self) -> int:
return len(self.__operator_receptions)

@classmethod
def from_HDF(cls: Type[DRT], group: Group) -> DRT:
def from_HDF(cls: Type[DRT], group: Group, operators: Sequence[Receiver] | None = None) -> DRT:
# Recall base class
device_input = ProcessedDeviceInput.from_HDF(group)

# Recall individual operator receptions
num_receptions = group.attrs.get("num_operator_receptions", 0)

# Recall operator receptions
operator_receptions = [
Reception.from_HDF(group[f"reception_{f:02d}"]) for f in range(num_receptions)
]
# Recall receptions
if operators is None:
receptions = [
Reception.from_HDF(group[f"reception_{r:02d}"]) for r in range(num_receptions)
]
else:
receptions = [
operator.recall_reception(group[f"reception_{r:02d}"])
for r, operator in zip(range(num_receptions), operators)
]

# Initialize object
return cls.From_ProcessedDeviceInput(device_input, operator_receptions)
return cls.From_ProcessedDeviceInput(device_input, receptions)

@classmethod
def Recall(cls: Type[DRT], group: Group, device: Device) -> DRT:
Expand Down Expand Up @@ -1809,6 +1840,21 @@ def transmit(self, clear_cache: bool = True) -> DeviceTransmission:

return DeviceTransmission.From_Output(device_output, operator_transmissions)

def recall_transmission(self, group: Group) -> DeviceTransmission:
"""Recall a specific transmission from a HDF5 serialization.

Args:

group (Group):
HDF group containing the transmission.

Returns: The recalled transmission.
"""

# Recall the specific operator transmissions

return DeviceTransmission.from_HDF(group, list(self.transmitters))

def cache_transmission(self, transmission: DeviceTransmission) -> None:
for transmitter, operator_transmission in zip(
self.transmitters, transmission.operator_transmissions
Expand Down Expand Up @@ -1953,3 +1999,16 @@ def receive(

# Generate device reception
return DeviceReception.From_ProcessedDeviceInput(processed_input, receptions)

def recall_reception(self, group: Group) -> DeviceReception:
"""Recall a specific reception from a HDF5 serialization.

Args:

group (Group):
HDF group containing the reception.

Returns: The recalled reception.
"""

return DeviceReception.Recall(group, self)
Loading
Loading