Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More work on implementing publish web data #801

Merged
merged 8 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ protobuf-codegen = "3"
protoc-rust = "^2.0"

[workspace]
members = [ "common-py", "differ", "mail-filter", "publish-py", "publish", "runner-py", "runner", "worker" ]
members = [ "common-py", "differ", "mail-filter", "publish-py", "publish", "runner-py", "runner", "worker" , "bzr-store", "git-store"]

[workspace.package]
edition = "2021"

[features]
default = ["gcp", "gcs", "debian"]
Expand Down
8 changes: 8 additions & 0 deletions bzr-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "bzr-store"
version = "0.1.0"
edition.workspace = true

[lib]

[dependencies]
Empty file added bzr-store/src/lib.rs
Empty file.
2 changes: 1 addition & 1 deletion common-py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "common-py"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
publish = false
edition = "2021"
edition.workspace = true
description = "Common bindings for the janitor - python"
license = "GPL-3.0+"
repository = "https://github.com/jelmer/janitor.git"
Expand Down
2 changes: 1 addition & 1 deletion differ/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "janitor-differ"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
edition = "2021"
edition.workspace = true
description = "Differ for the janitor"
license = "GPL-3.0+"
repository = "https://github.com/jelmer/janitor.git"
Expand Down
8 changes: 8 additions & 0 deletions git-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "janitor-git-store"
version = "0.1.0"
edition.workspace = true

[lib]

[dependencies]
Empty file added git-store/src/lib.rs
Empty file.
3 changes: 3 additions & 0 deletions git-store/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}
2 changes: 1 addition & 1 deletion mail-filter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "janitor-mail-filter"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
edition = "2018"
edition.workspace = true
description = "Mail filter for the janitor"
license = "Apache-2.0"
repository = "https://github.com/jelmer/janitor.git"
Expand Down
2 changes: 1 addition & 1 deletion publish-py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "publish-py"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
edition = "2021"
edition.workspace = true
description = "Publisher for the janitor - python bindings"
publish = false
license = "GPL-3.0+"
Expand Down
2 changes: 1 addition & 1 deletion publish/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "janitor-publish"
version = "0.0.0"
authors = ["Jelmer Vernooij <[email protected]>"]
edition = "2021"
edition.workspace = true
description = "Publisher for the janitor"
license = "GPL-3.0+"
repository = "https://github.com/jelmer/janitor.git"
Expand Down
45 changes: 20 additions & 25 deletions publish/src/bin/janitor-publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct Args {

/// Limit number of pushes per cycle.
#[clap(long)]
push_limit: Option<i32>,
push_limit: Option<usize>,

/// Require a binary diff when publishing merge requests.
#[clap(long)]
Expand Down Expand Up @@ -95,16 +95,16 @@ async fn main() -> Result<(), i32> {

let config: &'static _ = Box::leak(config);

let bucket_rate_limiter: std::sync::Arc<Mutex<Box<dyn RateLimiter>>> =
std::sync::Arc::new(std::sync::Mutex::new(if args.slowstart {
Box::new(SlowStartRateLimiter::new(args.max_mps_per_bucket))
let bucket_rate_limiter: Mutex<Box<dyn RateLimiter>> =
std::sync::Mutex::new(if args.slowstart {
Box::new(SlowStartRateLimiter::new(args.max_mps_per_bucket)) as Box<dyn RateLimiter>
} else if let Some(max_mps_per_bucket) = args.max_mps_per_bucket {
Box::new(FixedRateLimiter::new(max_mps_per_bucket))
Box::new(FixedRateLimiter::new(max_mps_per_bucket)) as Box<dyn RateLimiter>
} else {
Box::new(NonRateLimiter)
}));
Box::new(NonRateLimiter) as Box<dyn RateLimiter>
});

let forge_rate_limiter = Arc::new(Mutex::new(HashMap::new()));
let forge_rate_limiter = Mutex::new(HashMap::new());

let vcs_managers = Box::new(janitor::vcs::get_vcs_managers_from_config(config));
let vcs_managers: &'static _ = Box::leak(vcs_managers);
Expand Down Expand Up @@ -147,15 +147,20 @@ async fn main() -> Result<(), i32> {
.await,
));

let state = Arc::new(janitor_publish::AppState {
conn: db.clone(),
bucket_rate_limiter,
forge_rate_limiter,
push_limit: args.push_limit,
});

if args.once {
janitor_publish::publish_pending_ready(
db.clone(),
state,
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
vcs_managers,
args.push_limit,
args.require_binary_diff,
)
.await
Expand All @@ -176,44 +181,34 @@ async fn main() -> Result<(), i32> {
}
} else {
tokio::spawn(janitor_publish::process_queue_loop(
db.clone(),
state.clone(),
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
forge_rate_limiter.clone(),
vcs_managers,
chrono::Duration::seconds(args.interval),
!args.no_auto_publish,
args.push_limit,
args.modify_mp_limit,
args.require_binary_diff,
));

tokio::spawn(janitor_publish::refresh_bucket_mp_counts(
db.clone(),
bucket_rate_limiter.clone(),
));
tokio::spawn(janitor_publish::refresh_bucket_mp_counts(state.clone()));

tokio::spawn(janitor_publish::listen_to_runner(
db.clone(),
state.clone(),
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
vcs_managers,
args.require_binary_diff,
));

let app = janitor_publish::web::app(
state.clone(),
publish_worker.clone(),
bucket_rate_limiter.clone(),
forge_rate_limiter.clone(),
vcs_managers,
db.clone(),
args.require_binary_diff,
args.modify_mp_limit,
args.push_limit,
redis_async_connection.clone(),
config,
);
Expand Down
29 changes: 14 additions & 15 deletions publish/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,39 +574,31 @@ fn get_merged_by_user_url(url: &url::Url, user: &str) -> Result<Option<url::Url>
}

pub async fn process_queue_loop(
db: sqlx::PgPool,
state: Arc<AppState>,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
forge_rate_limiter: Arc<Mutex<HashMap<Forge, chrono::DateTime<Utc>>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
interval: chrono::Duration,
auto_publish: bool,
push_limit: Option<i32>,
modify_mp_limit: Option<i32>,
require_binary_diff: bool,
) {
todo!();
}

pub async fn publish_pending_ready(
db: sqlx::PgPool,
state: Arc<AppState>,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
push_limit: Option<i32>,
require_binary_diff: bool,
) -> Result<(), PublishError> {
todo!();
}

pub async fn refresh_bucket_mp_counts(
db: sqlx::PgPool,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
) -> Result<(), sqlx::Error> {
pub async fn refresh_bucket_mp_counts(state: Arc<AppState>) -> Result<(), sqlx::Error> {
let mut per_bucket: HashMap<janitor::publish::MergeProposalStatus, HashMap<String, usize>> =
HashMap::new();

Expand All @@ -620,7 +612,7 @@ pub async fn refresh_bucket_mp_counts(
GROUP BY 1, 2
"#,
)
.fetch_all(&db)
.fetch_all(&state.conn)
.await?;

for row in rows {
Expand All @@ -629,19 +621,19 @@ pub async fn refresh_bucket_mp_counts(
.or_default()
.insert(row.0, row.2 as usize);
}
bucket_rate_limiter
state
.bucket_rate_limiter
.lock()
.unwrap()
.set_mps_per_bucket(&per_bucket);
Ok(())
}

pub async fn listen_to_runner(
db: sqlx::PgPool,
state: Arc<AppState>,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
require_binary_diff: bool,
) {
Expand Down Expand Up @@ -678,3 +670,10 @@ mod tests {
assert_eq!(finish_time + chrono::Duration::days(7), next_try_time);
}
}

pub struct AppState {
pub conn: sqlx::PgPool,
pub bucket_rate_limiter: Mutex<Box<dyn rate_limiter::RateLimiter>>,
pub forge_rate_limiter: Mutex<HashMap<Forge, chrono::DateTime<Utc>>>,
pub push_limit: Option<usize>,
}
4 changes: 4 additions & 0 deletions publish/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pub trait RateLimiter: Send + Sync {
fn inc(&mut self, bucket: &str);

fn get_stats(&self) -> Option<RateLimitStats>;

fn get_max_open(&self, bucket: &str) -> Option<usize> {
None
}
}

pub struct NonRateLimiter;
Expand Down
43 changes: 43 additions & 0 deletions publish/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,46 @@ LIMIT 1
.fetch_optional(&*conn)
.await
}

pub async fn get_publish_attempt_count(
conn: &PgPool,
revision: &RevisionId,
transient_result_codes: &[&str],
) -> Result<usize, sqlx::Error> {
Ok(sqlx::query_scalar::<_, i64>(
"select count(*) from publish where revision = $1 and result_code != ALL($2::text[])",
)
.bind(revision)
.bind(transient_result_codes)
.fetch_one(&*conn)
.await? as usize)
}

pub async fn get_previous_mp_status(
conn: &PgPool,
codebase: &str,
campaign: &str,
) -> Result<Vec<(String, String)>, sqlx::Error> {
sqlx::query_as(
r#"""
WITH per_run_mps AS (
SELECT run.id AS run_id, run.finish_time,
merge_proposal.url AS mp_url, merge_proposal.status AS mp_status
FROM run
LEFT JOIN merge_proposal ON run.revision = merge_proposal.revision
WHERE run.codebase = $1
AND run.suite = $2
AND run.result_code = 'success'
AND merge_proposal.status NOT IN ('open', 'abandoned')
GROUP BY run.id, merge_proposal.url
)
SELECT mp_url, mp_status FROM per_run_mps
WHERE run_id = (
SELECT run_id FROM per_run_mps ORDER BY finish_time DESC LIMIT 1)
"""#,
)
.bind(codebase)
.bind(campaign)
.fetch_all(&*conn)
.await
}
Loading
Loading