Skip to content

Commit

Permalink
Merge pull request #2739 from fermyon/integrate-mqtt
Browse files Browse the repository at this point in the history
[Factors] Integrate mqtt in trigger2
  • Loading branch information
lann authored Aug 21, 2024
2 parents cd64556 + 6ee514d commit 2491098
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 1 addition & 10 deletions crates/factor-outbound-mqtt/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,7 @@ use spin_factor_outbound_networking::OutboundAllowedHosts;
use spin_world::v2::mqtt::{self as v2, Connection, Error, Qos};
use tracing::{instrument, Level};

#[async_trait]
pub trait ClientCreator: Send + Sync {
fn create(
&self,
address: String,
username: String,
password: String,
keep_alive_interval: Duration,
) -> Result<Arc<dyn MqttClient>, Error>;
}
use crate::ClientCreator;

pub struct InstanceState {
allowed_hosts: OutboundAllowedHosts,
Expand Down
42 changes: 41 additions & 1 deletion crates/factor-outbound-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use spin_factors::{
use spin_world::v2::mqtt::{self as v2, Error, Qos};
use tokio::sync::Mutex;

pub use host::{ClientCreator, MqttClient};
pub use host::MqttClient;

pub struct OutboundMqttFactor {
create_client: Arc<dyn ClientCreator>,
Expand Down Expand Up @@ -73,6 +73,19 @@ pub struct NetworkedMqttClient {
const MQTT_CHANNEL_CAP: usize = 1000;

impl NetworkedMqttClient {
/// Create a [`ClientCreator`] that creates a [`NetworkedMqttClient`].
pub fn creator() -> Arc<dyn ClientCreator> {
Arc::new(|address, username, password, keep_alive_interval| {
Ok(Arc::new(NetworkedMqttClient::create(
address,
username,
password,
keep_alive_interval,
)?) as _)
})
}

/// Create a new [`NetworkedMqttClient`] with the given address, username, password, and keep alive interval.
pub fn create(
address: String,
username: String,
Expand Down Expand Up @@ -127,3 +140,30 @@ impl MqttClient for NetworkedMqttClient {
Ok(())
}
}

/// A trait for creating MQTT client.
#[async_trait]
pub trait ClientCreator: Send + Sync {
fn create(
&self,
address: String,
username: String,
password: String,
keep_alive_interval: Duration,
) -> Result<Arc<dyn MqttClient>, Error>;
}

impl<F> ClientCreator for F
where
F: Fn(String, String, String, Duration) -> Result<Arc<dyn MqttClient>, Error> + Send + Sync,
{
fn create(
&self,
address: String,
username: String,
password: String,
keep_alive_interval: Duration,
) -> Result<Arc<dyn MqttClient>, Error> {
self(address, username, password, keep_alive_interval)
}
}
1 change: 1 addition & 0 deletions crates/runtime-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ spin-factor-key-value-spin = { path = "../factor-key-value-spin" }
spin-factor-key-value-redis = { path = "../factor-key-value-redis" }
spin-factor-key-value-azure = { path = "../factor-key-value-azure" }
spin-factor-outbound-http = { path = "../factor-outbound-http" }
spin-factor-outbound-mqtt = { path = "../factor-outbound-mqtt" }
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
spin-factor-outbound-redis = { path = "../factor-outbound-redis" }
spin-factor-sqlite = { path = "../factor-sqlite" }
Expand Down
7 changes: 7 additions & 0 deletions crates/runtime-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Context as _;
use spin_factor_key_value::runtime_config::spin::{self as key_value, MakeKeyValueStore};
use spin_factor_key_value::{DefaultLabelResolver as _, KeyValueFactor};
use spin_factor_outbound_http::OutboundHttpFactor;
use spin_factor_outbound_mqtt::OutboundMqttFactor;
use spin_factor_outbound_networking::runtime_config::spin::SpinTlsRuntimeConfig;
use spin_factor_outbound_networking::OutboundNetworkingFactor;
use spin_factor_outbound_redis::OutboundRedisFactor;
Expand Down Expand Up @@ -176,6 +177,12 @@ impl FactorRuntimeConfigSource<OutboundHttpFactor> for TomlRuntimeConfigSource<'
}
}

impl FactorRuntimeConfigSource<OutboundMqttFactor> for TomlRuntimeConfigSource<'_> {
fn get_runtime_config(&mut self) -> anyhow::Result<Option<()>> {
Ok(None)
}
}

impl FactorRuntimeConfigSource<SqliteFactor> for TomlRuntimeConfigSource<'_> {
fn get_runtime_config(&mut self) -> anyhow::Result<Option<spin_factor_sqlite::RuntimeConfig>> {
self.sqlite.resolve_from_toml(self.table.as_ref())
Expand Down
7 changes: 4 additions & 3 deletions crates/trigger2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ spin-app = { path = "../app" }
spin-common = { path = "../common" }
spin-componentize = { path = "../componentize" }
spin-core = { path = "../core" }
spin-factor-key-value = { path = "../factor-key-value" }
spin-factor-outbound-http = { path = "../factor-outbound-http" }
spin-factor-outbound-mqtt = { path = "../factor-outbound-mqtt" }
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
spin-factor-outbound-redis = { path = "../factor-outbound-redis" }
spin-factor-sqlite = { path = "../factor-sqlite" }
spin-factor-variables = { path = "../factor-variables" }
spin-factor-wasi = { path = "../factor-wasi" }
spin-factor-key-value = { path = "../factor-key-value" }
spin-factor-sqlite = { path = "../factor-sqlite" }
spin-factor-outbound-redis = { path = "../factor-outbound-redis" }
spin-factors = { path = "../factors" }
spin-factors-executor = { path = "../factors-executor" }
spin-telemetry = { path = "../telemetry" }
Expand Down
3 changes: 3 additions & 0 deletions crates/trigger2/src/factors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::path::PathBuf;

use spin_factor_key_value::KeyValueFactor;
use spin_factor_outbound_http::OutboundHttpFactor;
use spin_factor_outbound_mqtt::{NetworkedMqttClient, OutboundMqttFactor};
use spin_factor_outbound_networking::OutboundNetworkingFactor;
use spin_factor_outbound_redis::OutboundRedisFactor;
use spin_factor_sqlite::SqliteFactor;
Expand All @@ -19,6 +20,7 @@ pub struct TriggerFactors {
pub outbound_http: OutboundHttpFactor,
pub sqlite: SqliteFactor,
pub redis: OutboundRedisFactor,
pub mqtt: OutboundMqttFactor,
}

impl TriggerFactors {
Expand All @@ -36,6 +38,7 @@ impl TriggerFactors {
outbound_http: OutboundHttpFactor::new(),
sqlite: SqliteFactor::new(default_sqlite_label_resolver),
redis: OutboundRedisFactor::new(),
mqtt: OutboundMqttFactor::new(NetworkedMqttClient::creator()),
}
}
}
Expand Down

0 comments on commit 2491098

Please sign in to comment.