diff --git a/hermespy/channel/radar/radar.py b/hermespy/channel/radar/radar.py index 4d9702ef..98982578 100644 --- a/hermespy/channel/radar/radar.py +++ b/hermespy/channel/radar/radar.py @@ -11,6 +11,7 @@ from sparse import GCXS # type: ignore from hermespy.core import ( + AntennaMode, ChannelStateInformation, ChannelStateFormat, Direction, @@ -609,10 +610,10 @@ def propagation_response( ) -> np.ndarray: # Query the sensor array responses rx_response = receiver.antennas.cartesian_array_response( - carrier_frequency, self.position, "global" + carrier_frequency, self.position, "global", AntennaMode.RX ) tx_response = transmitter.antennas.cartesian_array_response( - carrier_frequency, self.position, "global" + carrier_frequency, self.position, "global", AntennaMode.TX ).conj() if self.attenuate: diff --git a/hermespy/channel/sionna_rt_channel.py b/hermespy/channel/sionna_rt_channel.py index c4c5ef07..03943686 100644 --- a/hermespy/channel/sionna_rt_channel.py +++ b/hermespy/channel/sionna_rt_channel.py @@ -86,10 +86,12 @@ def __apply_doppler(self, num_samples: int) -> tuple: tau (np.ndarray): delays. Shape (num_rx_ants, num_tx_ants, num_paths) """ # Apply doppler - self.paths.apply_doppler(sampling_frequency=self.bandwidth, - num_time_steps=num_samples, - tx_velocities=self.transmitter_velocity, - rx_velocities=self.receiver_velocity) + self.paths.apply_doppler( + sampling_frequency=self.bandwidth, + num_time_steps=num_samples, + tx_velocities=self.transmitter_velocity, + rx_velocities=self.receiver_velocity, + ) # Get and cast CIR a, tau = self.paths.cir() @@ -106,7 +108,7 @@ def state( self, num_samples: int, max_num_taps: int, - interpolation_mode: InterpolationMode = InterpolationMode.NEAREST + interpolation_mode: InterpolationMode = InterpolationMode.NEAREST, ) -> ChannelStateInformation: # Apply Doppler effect and get the channel impulse response a, tau = self.__apply_doppler(num_samples) @@ -114,11 +116,15 @@ def state( # Init result max_delay = np.max(tau) if tau.size != 0 else 0 max_delay_in_samples = min(max_num_taps, ceil(max_delay * self.bandwidth)) - raw_state = np.zeros(( - self.num_receive_antennas, - self.num_transmit_antennas, - num_samples, - 1 + max_delay_in_samples), dtype=np.complex_) + raw_state = np.zeros( + ( + self.num_receive_antennas, + self.num_transmit_antennas, + num_samples, + 1 + max_delay_in_samples, + ), + dtype=np.complex_, + ) # If no paths hit the target, then return an empty state if a.size == 0 or tau.size == 0: return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state) @@ -136,9 +142,7 @@ def state( return ChannelStateInformation(ChannelStateFormat.IMPULSE_RESPONSE, raw_state) def _propagate( - self, - signal_block: SignalBlock, - interpolation: InterpolationMode, + self, signal_block: SignalBlock, interpolation: InterpolationMode ) -> SignalBlock: # Calculate the resulting signal block parameters sr_ratio = self.receiver_state.sampling_rate / self.transmitter_state.sampling_rate @@ -150,16 +154,16 @@ def _propagate( a, tau = self.__apply_doppler(signal_block.num_samples) # If no paths hit the target, then return a zeroed signal if a.size == 0 or tau.size == 0: - return SignalBlock(np.zeros((num_streams_new, num_samples_new), - signal_block.dtype), - offset_new) + return SignalBlock( + np.zeros((num_streams_new, num_samples_new), signal_block.dtype), offset_new + ) # Set other attributes max_delay = np.max(tau) max_delay_in_samples = ceil(max_delay * self.bandwidth) propagated_samples = np.zeros( (num_streams_new, signal_block.num_samples + max_delay_in_samples), - dtype=signal_block.dtype + dtype=signal_block.dtype, ) # Prepare the optimal einsum path ahead of time for faster execution @@ -173,9 +177,9 @@ def _propagate( if tau_p == -1.0: continue t = int(tau_p * self.bandwidth) - propagated_samples[ - :, t : t + signal_block.num_samples - ] += np.einsum(einsum_subscripts, a_p, signal_block, optimize=einsum_path) + propagated_samples[:, t : t + signal_block.num_samples] += np.einsum( + einsum_subscripts, a_p, signal_block, optimize=einsum_path + ) propagated_samples *= np.sqrt(self.__gain) return SignalBlock(propagated_samples, offset_new) @@ -189,10 +193,9 @@ class SionnaRTChannelRealization(ChannelRealization[SionnaRTChannelSample]): scene: rt.Scene - def __init__(self, - scene: rt.Scene, - sample_hooks: Set[ChannelSampleHook] | None = None, - gain: float = 1.) -> None: + def __init__( + self, scene: rt.Scene, sample_hooks: Set[ChannelSampleHook] | None = None, gain: float = 1.0 + ) -> None: super().__init__(sample_hooks, gain) self.scene = scene @@ -205,18 +208,12 @@ def _sample(self, state: LinkState) -> SionnaRTChannelSample: # init self.scene.tx_array tx_antenna = rt.Antenna("iso", "V") - tx_positions = [ - a.position - for a in state.transmitter.antennas.transmit_antennas - ] + tx_positions = [a.position for a in state.transmitter.antennas.transmit_antennas] self.scene.tx_array = rt.AntennaArray(tx_antenna, tx_positions) # init self.scene.rx_array rx_antenna = rt.Antenna("iso", "V") - rx_positions = [ - a.position - for a in state.receiver.antennas.receive_antennas - ] + rx_positions = [a.position for a in state.receiver.antennas.receive_antennas] self.scene.rx_array = rt.AntennaArray(rx_antenna, rx_positions) # init tx and rx @@ -244,9 +241,7 @@ def to_HDF(self, group: Group) -> None: @staticmethod def From_HDF( - scene: rt.Scene, - group: Group, - sample_hooks: Set[ChannelSampleHook[SionnaRTChannelSample]] + scene: rt.Scene, group: Group, sample_hooks: Set[ChannelSampleHook[SionnaRTChannelSample]] ) -> SionnaRTChannelRealization: return SionnaRTChannelRealization(scene, sample_hooks, group.attrs["gain"]) diff --git a/hermespy/core/signal_model.py b/hermespy/core/signal_model.py index 07201288..047a0855 100644 --- a/hermespy/core/signal_model.py +++ b/hermespy/core/signal_model.py @@ -766,7 +766,9 @@ def __parse_slice(s: slice, dim_size: int) -> Tuple[int, int, int]: s1 = s1 if s1 >= 0 else s1 % dim_size return s0, s1, s2 - def _parse_validate_itemkey(self, key: Any) -> Tuple[int, int, int, int, int, int, bool, bool, bool]: + def _parse_validate_itemkey( + self, key: Any + ) -> Tuple[int, int, int, int, int, int, bool, bool, bool]: """Parse and validate key in __getitem__ and __setitem__. Raises: @@ -843,7 +845,17 @@ def _parse_validate_itemkey(self, key: Any) -> Tuple[int, int, int, int, int, in else: raise TypeError(f"Samples key is ofan unsupported type ({type(key[1])})") - return s00, s01, s02, s10, s11, s12, False, should_flatten_streams, should_flatten_samples + return ( + s00, + s01, + s02, + s10, + s11, + s12, + False, + should_flatten_streams, + should_flatten_samples, + ) # ====================================================================== # done Key is a tuple of two @@ -881,7 +893,17 @@ def _parse_validate_itemkey(self, key: Any) -> Tuple[int, int, int, int, int, in except ValueError: raise TypeError(f"Unsupported key type {type(key)}") - return s00, s01, s02, s10, s11, s12, isboolmask, should_flatten_streams, should_flatten_samples + return ( + s00, + s01, + s02, + s10, + s11, + s12, + isboolmask, + should_flatten_streams, + should_flatten_samples, + ) def _find_affected_blocks(self, s10: int, s11: int) -> Tuple[int, int]: """Find indices of blocks that are affected by the given samples slice. @@ -924,7 +946,7 @@ def _find_affected_blocks(self, s10: int, s11: int) -> Tuple[int, int]: return b_start, b_stop - def getitem(self, key: Any = slice(None, None), unflatten : bool = True) -> np.ndarray: + def getitem(self, key: Any = slice(None, None), unflatten: bool = True) -> np.ndarray: """Get specified samples. Works like np.ndarray.__getitem__, but de-sparsifies the signal. @@ -954,7 +976,9 @@ def getitem(self, key: Any = slice(None, None), unflatten : bool = True) -> np.n Returns: np.ndarray with ndim 2 or less and dtype dtype np.complex_""" - s00, s01, s02, s10, s11, s12, isboolmask, should_flatten_streams, should_flatten_samples = self._parse_validate_itemkey(key) + s00, s01, s02, s10, s11, s12, isboolmask, should_flatten_streams, should_flatten_samples = ( + self._parse_validate_itemkey(key) + ) num_streams = -((s01 - s00) // -s02) num_samples = -((s11 - s10) // -s12) if self.num_samples == 0 or self.num_streams == 0: # if this signal is empty diff --git a/hermespy/core/transformation.py b/hermespy/core/transformation.py index 85334ad5..87405392 100644 --- a/hermespy/core/transformation.py +++ b/hermespy/core/transformation.py @@ -472,9 +472,11 @@ def invert(self) -> Transformation: return np.linalg.inv(self).view(Transformation) - def lookat(self, - target: np.ndarray = np.array([0., 0., 0.], float), - up: np.ndarray = np.array([0., 1., 0.], float)) -> Transformation: + def lookat( + self, + target: np.ndarray = np.array([0.0, 0.0, 0.0], float), + up: np.ndarray = np.array([0.0, 1.0, 0.0], float), + ) -> Transformation: """Rotate and loook at the given coordinates. Modifies `orientation` property. Args: @@ -493,7 +495,9 @@ def lookat(self, # Validate arguments target_ = np.asarray(target) if target_.shape != (3,): - raise ValueError(f"Got target of an unexpected shape (expected (3,), got {target_.shape})") + raise ValueError( + f"Got target of an unexpected shape (expected (3,), got {target_.shape})" + ) up_ = np.asarray(up) if up_.shape != (3,): raise ValueError(f"Got up of an unexpected shape (expected (3,), got {up_.shape})") @@ -504,18 +508,18 @@ def lookat(self, pos = self.translation f = target_ - pos f_norm = np.linalg.norm(f) - f = f / f_norm if f_norm != 0. else pos # normalize + f = f / f_norm if f_norm != 0.0 else pos # normalize # side/right vector s = np.cross(up_, f) s_norm = np.linalg.norm(s) - s = s / s_norm if s_norm != 0. else up_ # normalize + s = s / s_norm if s_norm != 0.0 else up_ # normalize # up vector u = np.cross(f, s) # Calcualte the new transformation matrix self[:3, 0] = s self[:3, 1] = u self[:3, 2] = f - self[3, :] = [0., 0., 0., 1.] + self[3, :] = [0.0, 0.0, 0.0, 1.0] return self @@ -810,9 +814,11 @@ def to_local_coordinates(self, arg_0: Transformable | Transformation | np.ndarra local_transformation = self.backwards_transformation @ arg_0 return local_transformation.view(Transformation) - def lookat(self, - target: np.ndarray = np.array([0., 0., 0.], float), - up: np.ndarray = np.array([0., 1., 0.], float)) -> None: + def lookat( + self, + target: np.ndarray = np.array([0.0, 0.0, 0.0], float), + up: np.ndarray = np.array([0.0, 1.0, 0.0], float), + ) -> None: """Rotate and loook at the given coordinates. Modifies `orientation` property. Args: diff --git a/hermespy/jcas/matched_filtering.py b/hermespy/jcas/matched_filtering.py index 7abcb5d0..e9a8cf56 100644 --- a/hermespy/jcas/matched_filtering.py +++ b/hermespy/jcas/matched_filtering.py @@ -84,7 +84,8 @@ def _receive(self, signal: Signal) -> JCASReception: signal.append_samples( Signal.Create( np.zeros( - (1, required_num_received_samples - signal.num_samples), dtype=complex + (signal.num_streams, required_num_received_samples - signal.num_samples), + dtype=complex, ), self.sampling_rate, signal.carrier_frequency, @@ -93,20 +94,18 @@ def _receive(self, signal: Signal) -> JCASReception: ) ) - # Remove possible overhead samples if signal is too long - # resampled_signal.set_samples(re) - # sampled_signal[:, :num_samples] + # Digital receive beamformer + angle_bins, beamformed_samples = self._receive_beamform(signal) + # Transmit-receive correlation for range estimation + transmitted_samples = self.transmission.signal.getitem(0) correlation = ( - abs( - correlate( - signal.getitem(), self.transmission.signal.getitem(), mode="valid", method="fft" - ).flatten() - ) + abs(correlate(beamformed_samples, transmitted_samples, mode="valid", method="fft")) / self.transmission.signal.num_samples ) + lags = correlation_lags( - signal.num_samples, self.transmission.signal.num_samples, mode="valid" + beamformed_samples.shape[1], transmitted_samples.shape[1], mode="valid" ) # Append zeros for correct depth estimation @@ -114,10 +113,9 @@ def _receive(self, signal: Signal) -> JCASReception: # correlation = np.append(correlation, np.zeros(num_appended_zeros)) # Create the cube object - angle_bins = np.array([[0.0, 0.0]]) velocity_bins = np.array([0.0]) range_bins = 0.5 * lags[:num_propagated_samples] * resolution - cube_data = np.array([[correlation[:num_propagated_samples]]], dtype=float) + cube_data = correlation[:, None, :num_propagated_samples] cube = RadarCube(cube_data, angle_bins, velocity_bins, range_bins, self.carrier_frequency) # Infer the point cloud, if a detector has been configured diff --git a/hermespy/modem/waveform_chirp_fsk.py b/hermespy/modem/waveform_chirp_fsk.py index cf62a3d9..2d1f5533 100644 --- a/hermespy/modem/waveform_chirp_fsk.py +++ b/hermespy/modem/waveform_chirp_fsk.py @@ -30,7 +30,7 @@ __status__ = "Prototype" -scipy_minor_version = int(version("scipy").split('.')[1]) +scipy_minor_version = int(version("scipy").split(".")[1]) class ChirpFSKWaveform(PilotCommunicationWaveform, Serializable): @@ -473,7 +473,9 @@ def _prototypes(self) -> Tuple[np.ndarray, float]: if scipy_minor_version < 14: phase = integrate.cumtrapz(frequency, dx=1 / self.sampling_rate, initial=0) else: - phase = integrate.cumulative_trapezoid(frequency, dx=1 / self.sampling_rate, initial=0) + phase = integrate.cumulative_trapezoid( + frequency, dx=1 / self.sampling_rate, initial=0 + ) phase *= 2 * np.pi prototypes[idx, :] = np.exp(1j * phase) diff --git a/hermespy/radar/radar.py b/hermespy/radar/radar.py index 345fd522..21f4a2ab 100644 --- a/hermespy/radar/radar.py +++ b/hermespy/radar/radar.py @@ -389,10 +389,10 @@ def _receive_beamform(self, signal: Signal) -> Tuple[np.ndarray, np.ndarray]: RuntimeError: If the beamforming configuration does not result in a single output stream. """ - if self.device.antennas.num_antennas > 1: + if self.device.antennas.num_receive_ports > 1: if self.receive_beamformer is None: raise RuntimeError( - "Receiving over a device with more than one antenna requires a beamforming configuration" + "Receiving over a device with more than one RF port requires a beamforming configuration" ) if self.receive_beamformer.num_receive_output_streams != 1: @@ -402,7 +402,7 @@ def _receive_beamform(self, signal: Signal) -> Tuple[np.ndarray, np.ndarray]: if ( self.receive_beamformer.num_receive_input_streams - != self.device.antennas.num_antennas + != self.device.antennas.num_receive_ports ): raise RuntimeError( "Radar operator receive beamformers are required to consider the full number of antenna streams" diff --git a/hermespy/simulation/animation.py b/hermespy/simulation/animation.py index 6f6d85c1..f22b9201 100644 --- a/hermespy/simulation/animation.py +++ b/hermespy/simulation/animation.py @@ -60,11 +60,9 @@ class Trajectory(ABC): # lookat attributes _lookat_flag: bool = False _lookat_target: Trajectory = None - _lookat_up: np.ndarray = np.array([0., 1., 0.,], float) # (3,), float + _lookat_up: np.ndarray = np.array([0.0, 1.0, 0.0], float) # (3,), float - def lookat(self, - target: Trajectory, - up: np.ndarray = np.array([0., 1., 0.,], float)) -> None: + def lookat(self, target: Trajectory, up: np.ndarray = np.array([0.0, 1.0, 0.0], float)) -> None: """Set a target to look at and track. Args: @@ -81,7 +79,7 @@ def lookat_disable(self) -> None: def lookat_enable(self) -> None: if self._lookat_target is None: - raise RuntimeError("Cannot enable lookat whithout a target. Use the \"lookat\" method.") + raise RuntimeError('Cannot enable lookat whithout a target. Use the "lookat" method.') self._lookat_flag = True @@ -345,4 +343,4 @@ def sample_translation(self, timestamp: float) -> np.ndarray: return start_point + self.sample_velocity(timestamp) * (timestamp - start_time) def sample_orientation(self, timestamp: float) -> np.ndarray: - return np.array([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]], float) + return np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]], float) diff --git a/submodules/affect b/submodules/affect index 61509eb7..8fa65a3c 160000 --- a/submodules/affect +++ b/submodules/affect @@ -1 +1 @@ -Subproject commit 61509eb756ae3725b8a67c2d26a5af5ba95186fb +Subproject commit 8fa65a3ca9b0dcdd3d544363bc692d4f85f6f718