Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use routing key as config #34

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions paladin-core/src/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
//! - It supports a notion of message acknowledgement.
//! - It supports a notion of resource release.
//! - Rather than returning a tuple of `(sender, receiver)`, it breaks each into
//! separate methods.
//! This is because generally senders and receivers are usually instantiated in
//! separate process, as the channel is meant to facilitate inter process
//! communication. This avoids instantiating unnecessary resources when only one
//! is needed.
//! separate methods. This is because generally senders and receivers are
//! usually instantiated in separate process, as the channel is meant to
//! facilitate inter process communication. This avoids instantiating
//! unnecessary resources when only one is needed.

use std::{
pin::Pin,
Expand Down
5 changes: 5 additions & 0 deletions paladin-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! provided configuration.

use clap::{Args, ValueEnum};
use uuid::Uuid;

const HELP_HEADING: &str = "Paladin options";

Expand All @@ -40,6 +41,10 @@ pub struct Config {
/// Provides the URI for the AMQP broker, if the AMQP runtime is selected.
#[arg(long, help_heading = HELP_HEADING, env = "AMQP_URI", required_if_eq("runtime", "amqp"))]
pub amqp_uri: Option<String>,

/// Provides the routing key workers should use to listen for tasks on the
#[arg(long, help_heading = HELP_HEADING)]
pub task_bus_routing_key: Option<Uuid>,
}

/// Enumerates the available serialization formats.
Expand Down
21 changes: 13 additions & 8 deletions paladin-core/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,17 @@ pub struct Runtime {
worker_emulator: Option<Vec<JoinHandle<Result<()>>>>,
_marker: Marker,
}
const TASK_BUS_ROUTING_KEY: Uuid = Uuid::nil();
const IPC_ROUTING_KEY: Uuid = Uuid::max();

impl Runtime {
/// Initializes the [`Runtime`] with the provided [`Config`].
pub async fn from_config(config: &Config, marker: Marker) -> Result<Self> {
let channel_factory = DynamicChannelFactory::from_config(config).await?;
let task_channel = channel_factory
.get(TASK_BUS_ROUTING_KEY, ChannelType::ExactlyOnce)
.get(
config.task_bus_routing_key.unwrap_or(Uuid::nil()),
ChannelType::ExactlyOnce,
)
.await?;
let serializer = Serializer::from(config);

Expand Down Expand Up @@ -269,7 +271,7 @@ impl Runtime {
/// # impl Operation for StringLength {
/// # type Input = String;
/// # type Output = usize;
/// #
/// #
/// # fn execute(&self, input: Self::Input) -> Result<Self::Output> {
/// # Ok(input.len())
/// # }
Expand Down Expand Up @@ -405,7 +407,10 @@ impl WorkerRuntime {
pub async fn from_config(config: &Config, marker: Marker) -> Result<Self> {
let channel_factory = DynamicChannelFactory::from_config(config).await?;
let task_channel = channel_factory
.get(TASK_BUS_ROUTING_KEY, ChannelType::ExactlyOnce)
.get(
config.task_bus_routing_key.unwrap_or(Uuid::nil()),
ChannelType::ExactlyOnce,
)
.await?;

Ok(Self {
Expand Down Expand Up @@ -486,7 +491,7 @@ impl WorkerRuntime {
/// # impl Operation for StringLength {
/// # type Input = String;
/// # type Output = usize;
/// #
/// #
/// # fn execute(&self, input: Self::Input) -> Result<Self::Output> {
/// # Ok(input.len())
/// # }
Expand All @@ -505,10 +510,10 @@ impl WorkerRuntime {
/// async fn main() -> anyhow::Result<()> {
/// let args = Cli::parse();
/// let runtime = WorkerRuntime::from_config(&args.options, register()).await?;
///
///
/// let mut task_stream = runtime.get_task_receiver().await?;
/// while let Some((task, delivery)) = task_stream.next().await {
/// // ... handle task
/// // ... handle task
/// }
/// # Ok(())
/// }
Expand Down Expand Up @@ -599,7 +604,7 @@ impl WorkerRuntime {
/// # impl Operation for StringLength {
/// # type Input = String;
/// # type Output = usize;
/// #
/// #
/// # fn execute(&self, input: Self::Input) -> Result<Self::Output> {
/// # Ok(input.len())
/// # }
Expand Down
4 changes: 2 additions & 2 deletions paladin-core/src/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
//!
//! # Features:
//! - **Serializable Trait**: A shorthand trait that encapsulates common
//! serialization and deserialization behaviors.
//! It's designed to be used in asynchronous or threaded contexts.
//! serialization and deserialization behaviors. It's designed to be used in
//! asynchronous or threaded contexts.
//! - **Serializer Enum**: Provides a generic way to serialize and deserialize
//! binary data. It supports multiple serialization formats and can be easily
//! extended.
Expand Down
Loading