Skip to content

Commit

Permalink
Merge pull request #801 from jelmer/publish
Browse files Browse the repository at this point in the history
More work on implementing publish web data
  • Loading branch information
jelmer authored Sep 21, 2024
2 parents 2eb030e + 398e84e commit edd5abd
Show file tree
Hide file tree
Showing 24 changed files with 849 additions and 713 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ protobuf-codegen = "3"
protoc-rust = "^2.0"

[workspace]
members = [ "common-py", "differ", "mail-filter", "publish-py", "publish", "runner-py", "runner", "worker" ]
members = [ "common-py", "differ", "mail-filter", "publish-py", "publish", "runner-py", "runner", "worker" , "bzr-store", "git-store"]

[workspace.package]
edition = "2021"

[features]
default = ["gcp", "gcs", "debian"]
Expand Down
8 changes: 8 additions & 0 deletions bzr-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "bzr-store"
version = "0.1.0"
edition.workspace = true

[lib]

[dependencies]
Empty file added bzr-store/src/lib.rs
Empty file.
2 changes: 1 addition & 1 deletion common-py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "common-py"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
publish = false
edition = "2021"
edition.workspace = true
description = "Common bindings for the janitor - python"
license = "GPL-3.0+"
repository = "https://github.com/jelmer/janitor.git"
Expand Down
2 changes: 1 addition & 1 deletion differ/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "janitor-differ"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
edition = "2021"
edition.workspace = true
description = "Differ for the janitor"
license = "GPL-3.0+"
repository = "https://github.com/jelmer/janitor.git"
Expand Down
8 changes: 8 additions & 0 deletions git-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "janitor-git-store"
version = "0.1.0"
edition.workspace = true

[lib]

[dependencies]
Empty file added git-store/src/lib.rs
Empty file.
3 changes: 3 additions & 0 deletions git-store/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}
2 changes: 1 addition & 1 deletion mail-filter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "janitor-mail-filter"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
edition = "2018"
edition.workspace = true
description = "Mail filter for the janitor"
license = "Apache-2.0"
repository = "https://github.com/jelmer/janitor.git"
Expand Down
2 changes: 1 addition & 1 deletion publish-py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "publish-py"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
edition = "2021"
edition.workspace = true
description = "Publisher for the janitor - python bindings"
publish = false
license = "GPL-3.0+"
Expand Down
2 changes: 1 addition & 1 deletion publish/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "janitor-publish"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
edition = "2021"
edition.workspace = true
description = "Publisher for the janitor"
license = "GPL-3.0+"
repository = "https://github.com/jelmer/janitor.git"
Expand Down
45 changes: 20 additions & 25 deletions publish/src/bin/janitor-publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct Args {

/// Limit number of pushes per cycle.
#[clap(long)]
push_limit: Option<i32>,
push_limit: Option<usize>,

/// Require a binary diff when publishing merge requests.
#[clap(long)]
Expand Down Expand Up @@ -95,16 +95,16 @@ async fn main() -> Result<(), i32> {

let config: &'static _ = Box::leak(config);

let bucket_rate_limiter: std::sync::Arc<Mutex<Box<dyn RateLimiter>>> =
std::sync::Arc::new(std::sync::Mutex::new(if args.slowstart {
Box::new(SlowStartRateLimiter::new(args.max_mps_per_bucket))
let bucket_rate_limiter: Mutex<Box<dyn RateLimiter>> =
std::sync::Mutex::new(if args.slowstart {
Box::new(SlowStartRateLimiter::new(args.max_mps_per_bucket)) as Box<dyn RateLimiter>
} else if let Some(max_mps_per_bucket) = args.max_mps_per_bucket {
Box::new(FixedRateLimiter::new(max_mps_per_bucket))
Box::new(FixedRateLimiter::new(max_mps_per_bucket)) as Box<dyn RateLimiter>
} else {
Box::new(NonRateLimiter)
}));
Box::new(NonRateLimiter) as Box<dyn RateLimiter>
});

let forge_rate_limiter = Arc::new(Mutex::new(HashMap::new()));
let forge_rate_limiter = Mutex::new(HashMap::new());

let vcs_managers = Box::new(janitor::vcs::get_vcs_managers_from_config(config));
let vcs_managers: &'static _ = Box::leak(vcs_managers);
Expand Down Expand Up @@ -147,15 +147,20 @@ async fn main() -> Result<(), i32> {
.await,
));

let state = Arc::new(janitor_publish::AppState {
conn: db.clone(),
bucket_rate_limiter,
forge_rate_limiter,
push_limit: args.push_limit,
});

if args.once {
janitor_publish::publish_pending_ready(
db.clone(),
state,
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
vcs_managers,
args.push_limit,
args.require_binary_diff,
)
.await
Expand All @@ -176,44 +181,34 @@ async fn main() -> Result<(), i32> {
}
} else {
tokio::spawn(janitor_publish::process_queue_loop(
db.clone(),
state.clone(),
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
forge_rate_limiter.clone(),
vcs_managers,
chrono::Duration::seconds(args.interval),
!args.no_auto_publish,
args.push_limit,
args.modify_mp_limit,
args.require_binary_diff,
));

tokio::spawn(janitor_publish::refresh_bucket_mp_counts(
db.clone(),
bucket_rate_limiter.clone(),
));
tokio::spawn(janitor_publish::refresh_bucket_mp_counts(state.clone()));

tokio::spawn(janitor_publish::listen_to_runner(
db.clone(),
state.clone(),
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
vcs_managers,
args.require_binary_diff,
));

let app = janitor_publish::web::app(
state.clone(),
publish_worker.clone(),
bucket_rate_limiter.clone(),
forge_rate_limiter.clone(),
vcs_managers,
db.clone(),
args.require_binary_diff,
args.modify_mp_limit,
args.push_limit,
redis_async_connection.clone(),
config,
);
Expand Down
29 changes: 14 additions & 15 deletions publish/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,39 +574,31 @@ fn get_merged_by_user_url(url: &url::Url, user: &str) -> Result<Option<url::Url>
}

pub async fn process_queue_loop(
db: sqlx::PgPool,
state: Arc<AppState>,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
forge_rate_limiter: Arc<Mutex<HashMap<Forge, chrono::DateTime<Utc>>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
interval: chrono::Duration,
auto_publish: bool,
push_limit: Option<i32>,
modify_mp_limit: Option<i32>,
require_binary_diff: bool,
) {
todo!();
}

pub async fn publish_pending_ready(
db: sqlx::PgPool,
state: Arc<AppState>,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
push_limit: Option<i32>,
require_binary_diff: bool,
) -> Result<(), PublishError> {
todo!();
}

pub async fn refresh_bucket_mp_counts(
db: sqlx::PgPool,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
) -> Result<(), sqlx::Error> {
pub async fn refresh_bucket_mp_counts(state: Arc<AppState>) -> Result<(), sqlx::Error> {
let mut per_bucket: HashMap<janitor::publish::MergeProposalStatus, HashMap<String, usize>> =
HashMap::new();

Expand All @@ -620,7 +612,7 @@ pub async fn refresh_bucket_mp_counts(
GROUP BY 1, 2
"#,
)
.fetch_all(&db)
.fetch_all(&state.conn)
.await?;

for row in rows {
Expand All @@ -629,19 +621,19 @@ pub async fn refresh_bucket_mp_counts(
.or_default()
.insert(row.0, row.2 as usize);
}
bucket_rate_limiter
state
.bucket_rate_limiter
.lock()
.unwrap()
.set_mps_per_bucket(&per_bucket);
Ok(())
}

pub async fn listen_to_runner(
db: sqlx::PgPool,
state: Arc<AppState>,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
require_binary_diff: bool,
) {
Expand Down Expand Up @@ -678,3 +670,10 @@ mod tests {
assert_eq!(finish_time + chrono::Duration::days(7), next_try_time);
}
}

pub struct AppState {
pub conn: sqlx::PgPool,
pub bucket_rate_limiter: Mutex<Box<dyn rate_limiter::RateLimiter>>,
pub forge_rate_limiter: Mutex<HashMap<Forge, chrono::DateTime<Utc>>>,
pub push_limit: Option<usize>,
}
4 changes: 4 additions & 0 deletions publish/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pub trait RateLimiter: Send + Sync {
fn inc(&mut self, bucket: &str);

fn get_stats(&self) -> Option<RateLimitStats>;

fn get_max_open(&self, bucket: &str) -> Option<usize> {
None
}
}

pub struct NonRateLimiter;
Expand Down
43 changes: 43 additions & 0 deletions publish/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,46 @@ LIMIT 1
.fetch_optional(&*conn)
.await
}

pub async fn get_publish_attempt_count(
conn: &PgPool,
revision: &RevisionId,
transient_result_codes: &[&str],
) -> Result<usize, sqlx::Error> {
Ok(sqlx::query_scalar::<_, i64>(
"select count(*) from publish where revision = $1 and result_code != ALL($2::text[])",
)
.bind(revision)
.bind(transient_result_codes)
.fetch_one(&*conn)
.await? as usize)
}

pub async fn get_previous_mp_status(
conn: &PgPool,
codebase: &str,
campaign: &str,
) -> Result<Vec<(String, String)>, sqlx::Error> {
sqlx::query_as(
r#"""
WITH per_run_mps AS (
SELECT run.id AS run_id, run.finish_time,
merge_proposal.url AS mp_url, merge_proposal.status AS mp_status
FROM run
LEFT JOIN merge_proposal ON run.revision = merge_proposal.revision
WHERE run.codebase = $1
AND run.suite = $2
AND run.result_code = 'success'
AND merge_proposal.status NOT IN ('open', 'abandoned')
GROUP BY run.id, merge_proposal.url
)
SELECT mp_url, mp_status FROM per_run_mps
WHERE run_id = (
SELECT run_id FROM per_run_mps ORDER BY finish_time DESC LIMIT 1)
"""#,
)
.bind(codebase)
.bind(campaign)
.fetch_all(&*conn)
.await
}
Loading

0 comments on commit edd5abd

Please sign in to comment.