Skip to content

Commit

Permalink
feat: Use 2 queues instead of 1 (#1396)
Browse files Browse the repository at this point in the history
* queue per verif method

* Standardize queuename

* Remove fmt

* Tweaks
  • Loading branch information
amaury1093 authored Dec 10, 2023
1 parent 5aa6026 commit af44f6c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
42 changes: 36 additions & 6 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

tracing_subscriber::fmt::init();

// Make sure the worker is well configured.
let addr = env::var("RCH_AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672".into());
let backend_name = env::var("RCH_BACKEND_NAME").expect("RCH_BACKEND_NAME is not set");
let verif_method: VerifMethod = env::var("RCH_VERIF_METHOD")
.expect("RCH_VERIF_METHODS is not set")
.as_str()
.into();

let options = ConnectionProperties::default()
// Use tokio executor and reactor.
// At the moment the reactor is only available for unix.
Expand All @@ -46,15 +52,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

// Receive channel
let channel = conn.create_channel().await?;
info!(state=?conn.status().state(), "Connected to AMQP broker");
info!(backend=?backend_name,state=?conn.status().state(), "Connected to AMQP broker");

// Create queue "check_email" with priority.
// Create queue "check_email.{Smtp,Headless}" with priority.
let queue_name = format!("check_email.{:?}", verif_method);
let mut queue_args = FieldTable::default();
queue_args.insert("x-max-priority".into(), 5.into()); // https://www.rabbitmq.com/priority.html

let queue = channel
channel
.queue_declare(
"check_email",
&queue_name,
QueueDeclareOptions {
durable: true,
..Default::default()
Expand All @@ -63,10 +70,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
)
.await?;

info!(backend=?backend_name,queue=?queue.name().as_str(), "Worker will start consuming messages");
info!(queue=?queue_name, "Worker will start consuming messages");
let mut consumer = channel
.basic_consume(
queue.name().as_str(),
&queue_name,
&backend_name,
BasicConsumeOptions::default(),
FieldTable::default(),
Expand All @@ -86,3 +93,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

Ok(())
}

// Verification method used by the worker.
#[derive(Debug, Clone, Copy)]
enum VerifMethod {
// This worker will use a headless browser to verify emails.
// Oftentimes, this also means that the worker doesn't have port 25 open.
Headless,
// This worker will use a SMTP server to verify emails.
Smtp,
}

impl From<&str> for VerifMethod {
fn from(s: &str) -> Self {
match s {
"Headless" => Self::Headless,
"Smtp" => Self::Smtp,
_ => panic!(
"Unknown verification method {}, must be one of Headless, Smtp",
s
),
}
}
}
1 change: 1 addition & 0 deletions worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub async fn process_check_email(
let output = check_email(payload.input).await;
debug!(email=output.input,output=?output, "Done check-if-email-exists");

// Check if we have a webhook to send the output to.
if let Some(webhook) = payload.webhook {
let webhook_output = WebhookOutput {
output,
Expand Down

0 comments on commit af44f6c

Please sign in to comment.