Skip to content

Commit

Permalink
Merge branch 'matched-filter-beamforming' into 'master'
Browse files Browse the repository at this point in the history
Matched Filter JCAS Beamforming

See merge request barkhauseninstitut/wicon/hermespy!209
  • Loading branch information
adlerjan committed Aug 22, 2024
2 parents 4be0ba1 + 6c8a786 commit 11714f3
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 76 deletions.
5 changes: 3 additions & 2 deletions hermespy/channel/radar/radar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sparse import GCXS # type: ignore

from hermespy.core import (
AntennaMode,
ChannelStateInformation,
ChannelStateFormat,
Direction,
Expand Down Expand Up @@ -609,10 +610,10 @@ def propagation_response(
) -> np.ndarray:
# Query the sensor array responses
rx_response = receiver.antennas.cartesian_array_response(
carrier_frequency, self.position, "global"
carrier_frequency, self.position, "global", AntennaMode.RX
)
tx_response = transmitter.antennas.cartesian_array_response(
carrier_frequency, self.position, "global"
carrier_frequency, self.position, "global", AntennaMode.TX
).conj()

if self.attenuate:
Expand Down
65 changes: 30 additions & 35 deletions hermespy/channel/sionna_rt_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def __apply_doppler(self, num_samples: int) -> tuple:
tau (np.ndarray): delays. Shape (num_rx_ants, num_tx_ants, num_paths)
"""
# Apply doppler
self.paths.apply_doppler(sampling_frequency=self.bandwidth,
num_time_steps=num_samples,
tx_velocities=self.transmitter_velocity,
rx_velocities=self.receiver_velocity)
self.paths.apply_doppler(
sampling_frequency=self.bandwidth,
num_time_steps=num_samples,
tx_velocities=self.transmitter_velocity,
rx_velocities=self.receiver_velocity,
)

# Get and cast CIR
a, tau = self.paths.cir()
Expand All @@ -106,19 +108,23 @@ def state(
self,
num_samples: int,
max_num_taps: int,
interpolation_mode: InterpolationMode = InterpolationMode.NEAREST
interpolation_mode: InterpolationMode = InterpolationMode.NEAREST,
) -> ChannelStateInformation:
# Apply Doppler effect and get the channel impulse response
a, tau = self.__apply_doppler(num_samples)

# Init result
max_delay = np.max(tau) if tau.size != 0 else 0
max_delay_in_samples = min(max_num_taps, ceil(max_delay * self.bandwidth))
raw_state = np.zeros((
self.num_receive_antennas,
self.num_transmit_antennas,
num_samples,
1 + max_delay_in_samples), dtype=np.complex_)
raw_state = np.zeros(
(
self.num_receive_antennas,
self.num_transmit_antennas,
num_samples,
1 + max_delay_in_samples,
),
dtype=np.complex_,
)
# If no paths hit the target, then return an empty state
if a.size == 0 or tau.size == 0:
return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state)
Expand All @@ -136,9 +142,7 @@ def state(
return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state)

def _propagate(
self,
signal_block: SignalBlock,
interpolation: InterpolationMode,
self, signal_block: SignalBlock, interpolation: InterpolationMode
) -> SignalBlock:
# Calculate the resulting signal block parameters
sr_ratio = self.receiver_state.sampling_rate / self.transmitter_state.sampling_rate
Expand All @@ -150,16 +154,16 @@ def _propagate(
a, tau = self.__apply_doppler(signal_block.num_samples)
# If no paths hit the target, then return a zeroed signal
if a.size == 0 or tau.size == 0:
return SignalBlock(np.zeros((num_streams_new, num_samples_new),
signal_block.dtype),
offset_new)
return SignalBlock(
np.zeros((num_streams_new, num_samples_new), signal_block.dtype), offset_new
)

# Set other attributes
max_delay = np.max(tau)
max_delay_in_samples = ceil(max_delay * self.bandwidth)
propagated_samples = np.zeros(
(num_streams_new, signal_block.num_samples + max_delay_in_samples),
dtype=signal_block.dtype
dtype=signal_block.dtype,
)

# Prepare the optimal einsum path ahead of time for faster execution
Expand All @@ -173,9 +177,9 @@ def _propagate(
if tau_p == -1.0:
continue
t = int(tau_p * self.bandwidth)
propagated_samples[
:, t : t + signal_block.num_samples
] += np.einsum(einsum_subscripts, a_p, signal_block, optimize=einsum_path)
propagated_samples[:, t : t + signal_block.num_samples] += np.einsum(
einsum_subscripts, a_p, signal_block, optimize=einsum_path
)

propagated_samples *= np.sqrt(self.__gain)
return SignalBlock(propagated_samples, offset_new)
Expand All @@ -189,10 +193,9 @@ class SionnaRTChannelRealization(ChannelRealization[SionnaRTChannelSample]):

scene: rt.Scene

def __init__(self,
scene: rt.Scene,
sample_hooks: Set[ChannelSampleHook] | None = None,
gain: float = 1.) -> None:
def __init__(
self, scene: rt.Scene, sample_hooks: Set[ChannelSampleHook] | None = None, gain: float = 1.0
) -> None:
super().__init__(sample_hooks, gain)
self.scene = scene

Expand All @@ -205,18 +208,12 @@ def _sample(self, state: LinkState) -> SionnaRTChannelSample:

# init self.scene.tx_array
tx_antenna = rt.Antenna("iso", "V")
tx_positions = [
a.position
for a in state.transmitter.antennas.transmit_antennas
]
tx_positions = [a.position for a in state.transmitter.antennas.transmit_antennas]
self.scene.tx_array = rt.AntennaArray(tx_antenna, tx_positions)

# init self.scene.rx_array
rx_antenna = rt.Antenna("iso", "V")
rx_positions = [
a.position
for a in state.receiver.antennas.receive_antennas
]
rx_positions = [a.position for a in state.receiver.antennas.receive_antennas]
self.scene.rx_array = rt.AntennaArray(rx_antenna, rx_positions)

# init tx and rx
Expand Down Expand Up @@ -244,9 +241,7 @@ def to_HDF(self, group: Group) -> None:

@staticmethod
def From_HDF(
scene: rt.Scene,
group: Group,
sample_hooks: Set[ChannelSampleHook[SionnaRTChannelSample]]
scene: rt.Scene, group: Group, sample_hooks: Set[ChannelSampleHook[SionnaRTChannelSample]]
) -> SionnaRTChannelRealization:
return SionnaRTChannelRealization(scene, sample_hooks, group.attrs["gain"])

Expand Down
34 changes: 29 additions & 5 deletions hermespy/core/signal_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,9 @@ def __parse_slice(s: slice, dim_size: int) -> Tuple[int, int, int]:
s1 = s1 if s1 >= 0 else s1 % dim_size
return s0, s1, s2

def _parse_validate_itemkey(self, key: Any) -> Tuple[int, int, int, int, int, int, bool, bool, bool]:
def _parse_validate_itemkey(
self, key: Any
) -> Tuple[int, int, int, int, int, int, bool, bool, bool]:
"""Parse and validate key in __getitem__ and __setitem__.
Raises:
Expand Down Expand Up @@ -843,7 +845,17 @@ def _parse_validate_itemkey(self, key: Any) -> Tuple[int, int, int, int, int, in
else:
raise TypeError(f"Samples key is ofan unsupported type ({type(key[1])})")

return s00, s01, s02, s10, s11, s12, False, should_flatten_streams, should_flatten_samples
return (
s00,
s01,
s02,
s10,
s11,
s12,
False,
should_flatten_streams,
should_flatten_samples,
)
# ======================================================================
# done Key is a tuple of two

Expand Down Expand Up @@ -881,7 +893,17 @@ def _parse_validate_itemkey(self, key: Any) -> Tuple[int, int, int, int, int, in
except ValueError:
raise TypeError(f"Unsupported key type {type(key)}")

return s00, s01, s02, s10, s11, s12, isboolmask, should_flatten_streams, should_flatten_samples
return (
s00,
s01,
s02,
s10,
s11,
s12,
isboolmask,
should_flatten_streams,
should_flatten_samples,
)

def _find_affected_blocks(self, s10: int, s11: int) -> Tuple[int, int]:
"""Find indices of blocks that are affected by the given samples slice.
Expand Down Expand Up @@ -924,7 +946,7 @@ def _find_affected_blocks(self, s10: int, s11: int) -> Tuple[int, int]:

return b_start, b_stop

def getitem(self, key: Any = slice(None, None), unflatten : bool = True) -> np.ndarray:
def getitem(self, key: Any = slice(None, None), unflatten: bool = True) -> np.ndarray:
"""Get specified samples.
Works like np.ndarray.__getitem__, but de-sparsifies the signal.
Expand Down Expand Up @@ -954,7 +976,9 @@ def getitem(self, key: Any = slice(None, None), unflatten : bool = True) -> np.n
Returns: np.ndarray with ndim 2 or less and dtype dtype np.complex_"""

s00, s01, s02, s10, s11, s12, isboolmask, should_flatten_streams, should_flatten_samples = self._parse_validate_itemkey(key)
s00, s01, s02, s10, s11, s12, isboolmask, should_flatten_streams, should_flatten_samples = (
self._parse_validate_itemkey(key)
)
num_streams = -((s01 - s00) // -s02)
num_samples = -((s11 - s10) // -s12)
if self.num_samples == 0 or self.num_streams == 0: # if this signal is empty
Expand Down
26 changes: 16 additions & 10 deletions hermespy/core/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,11 @@ def invert(self) -> Transformation:

return np.linalg.inv(self).view(Transformation)

def lookat(self,
target: np.ndarray = np.array([0., 0., 0.], float),
up: np.ndarray = np.array([0., 1., 0.], float)) -> Transformation:
def lookat(
self,
target: np.ndarray = np.array([0.0, 0.0, 0.0], float),
up: np.ndarray = np.array([0.0, 1.0, 0.0], float),
) -> Transformation:
"""Rotate and loook at the given coordinates. Modifies `orientation` property.
Args:
Expand All @@ -493,7 +495,9 @@ def lookat(self,
# Validate arguments
target_ = np.asarray(target)
if target_.shape != (3,):
raise ValueError(f"Got target of an unexpected shape (expected (3,), got {target_.shape})")
raise ValueError(
f"Got target of an unexpected shape (expected (3,), got {target_.shape})"
)
up_ = np.asarray(up)
if up_.shape != (3,):
raise ValueError(f"Got up of an unexpected shape (expected (3,), got {up_.shape})")
Expand All @@ -504,18 +508,18 @@ def lookat(self,
pos = self.translation
f = target_ - pos
f_norm = np.linalg.norm(f)
f = f / f_norm if f_norm != 0. else pos # normalize
f = f / f_norm if f_norm != 0.0 else pos # normalize
# side/right vector
s = np.cross(up_, f)
s_norm = np.linalg.norm(s)
s = s / s_norm if s_norm != 0. else up_ # normalize
s = s / s_norm if s_norm != 0.0 else up_ # normalize
# up vector
u = np.cross(f, s)
# Calcualte the new transformation matrix
self[:3, 0] = s
self[:3, 1] = u
self[:3, 2] = f
self[3, :] = [0., 0., 0., 1.]
self[3, :] = [0.0, 0.0, 0.0, 1.0]

return self

Expand Down Expand Up @@ -810,9 +814,11 @@ def to_local_coordinates(self, arg_0: Transformable | Transformation | np.ndarra
local_transformation = self.backwards_transformation @ arg_0
return local_transformation.view(Transformation)

def lookat(self,
target: np.ndarray = np.array([0., 0., 0.], float),
up: np.ndarray = np.array([0., 1., 0.], float)) -> None:
def lookat(
self,
target: np.ndarray = np.array([0.0, 0.0, 0.0], float),
up: np.ndarray = np.array([0.0, 1.0, 0.0], float),
) -> None:
"""Rotate and loook at the given coordinates. Modifies `orientation` property.
Args:
Expand Down
22 changes: 10 additions & 12 deletions hermespy/jcas/matched_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def _receive(self, signal: Signal) -> JCASReception:
signal.append_samples(
Signal.Create(
np.zeros(
(1, required_num_received_samples - signal.num_samples), dtype=complex
(signal.num_streams, required_num_received_samples - signal.num_samples),
dtype=complex,
),
self.sampling_rate,
signal.carrier_frequency,
Expand All @@ -93,31 +94,28 @@ def _receive(self, signal: Signal) -> JCASReception:
)
)

# Remove possible overhead samples if signal is too long
# resampled_signal.set_samples(re)
# sampled_signal[:, :num_samples]
# Digital receive beamformer
angle_bins, beamformed_samples = self._receive_beamform(signal)

# Transmit-receive correlation for range estimation
transmitted_samples = self.transmission.signal.getitem(0)
correlation = (
abs(
correlate(
signal.getitem(), self.transmission.signal.getitem(), mode="valid", method="fft"
).flatten()
)
abs(correlate(beamformed_samples, transmitted_samples, mode="valid", method="fft"))
/ self.transmission.signal.num_samples
)

lags = correlation_lags(
signal.num_samples, self.transmission.signal.num_samples, mode="valid"
beamformed_samples.shape[1], transmitted_samples.shape[1], mode="valid"
)

# Append zeros for correct depth estimation
# num_appended_zeros = max(0, num_samples - resampled_signal.num_samples)
# correlation = np.append(correlation, np.zeros(num_appended_zeros))

# Create the cube object
angle_bins = np.array([[0.0, 0.0]])
velocity_bins = np.array([0.0])
range_bins = 0.5 * lags[:num_propagated_samples] * resolution
cube_data = np.array([[correlation[:num_propagated_samples]]], dtype=float)
cube_data = correlation[:, None, :num_propagated_samples]
cube = RadarCube(cube_data, angle_bins, velocity_bins, range_bins, self.carrier_frequency)

# Infer the point cloud, if a detector has been configured
Expand Down
6 changes: 4 additions & 2 deletions hermespy/modem/waveform_chirp_fsk.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
__status__ = "Prototype"


scipy_minor_version = int(version("scipy").split('.')[1])
scipy_minor_version = int(version("scipy").split(".")[1])


class ChirpFSKWaveform(PilotCommunicationWaveform, Serializable):
Expand Down Expand Up @@ -473,7 +473,9 @@ def _prototypes(self) -> Tuple[np.ndarray, float]:
if scipy_minor_version < 14:
phase = integrate.cumtrapz(frequency, dx=1 / self.sampling_rate, initial=0)
else:
phase = integrate.cumulative_trapezoid(frequency, dx=1 / self.sampling_rate, initial=0)
phase = integrate.cumulative_trapezoid(
frequency, dx=1 / self.sampling_rate, initial=0
)
phase *= 2 * np.pi
prototypes[idx, :] = np.exp(1j * phase)

Expand Down
6 changes: 3 additions & 3 deletions hermespy/radar/radar.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,10 @@ def _receive_beamform(self, signal: Signal) -> Tuple[np.ndarray, np.ndarray]:
RuntimeError: If the beamforming configuration does not result in a single output stream.
"""

if self.device.antennas.num_antennas > 1:
if self.device.antennas.num_receive_ports > 1:
if self.receive_beamformer is None:
raise RuntimeError(
"Receiving over a device with more than one antenna requires a beamforming configuration"
"Receiving over a device with more than one RF port requires a beamforming configuration"
)

if self.receive_beamformer.num_receive_output_streams != 1:
Expand All @@ -402,7 +402,7 @@ def _receive_beamform(self, signal: Signal) -> Tuple[np.ndarray, np.ndarray]:

if (
self.receive_beamformer.num_receive_input_streams
!= self.device.antennas.num_antennas
!= self.device.antennas.num_receive_ports
):
raise RuntimeError(
"Radar operator receive beamformers are required to consider the full number of antenna streams"
Expand Down
Loading

0 comments on commit 11714f3

Please sign in to comment.