Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update_agent: remove unused Results #610

Merged
merged 2 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/rpm_ostree/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ impl Handler<QueryLocalDeployments> for RpmOstreeClient {
pub struct RegisterAsDriver {}

impl Message for RegisterAsDriver {
type Result = Result<()>;
type Result = ();
}

impl Handler<RegisterAsDriver> for RpmOstreeClient {
type Result = Result<()>;
type Result = ();

fn handle(&mut self, _msg: RegisterAsDriver, _ctx: &mut Self::Context) -> Self::Result {
trace!("request to register as rpm-ostree update driver");
Expand Down
6 changes: 2 additions & 4 deletions src/rpm_ostree/cli_deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn deploy_locked(release: Release, allow_downgrade: bool) -> Result<Release>
/// Register as the update driver.
/// Keep attempting to register as driver for rpm-ostree, with exponential backoff
/// capped at 256 seconds.
pub fn deploy_register_driver() -> Result<()> {
pub fn deploy_register_driver() {
let mut register_attempt = invoke_cli_register();
let mut retry_secs = Duration::from_secs(1);
while let Err(attempt) = register_attempt {
Expand All @@ -51,8 +51,6 @@ pub fn deploy_register_driver() -> Result<()> {
retry_secs *= 2;
}
}

Ok(())
}

/// CLI executor for registering driver.
Expand Down Expand Up @@ -162,7 +160,7 @@ mod tests {
let now = SystemTime::now();
// expect to take 1 + 2 + 4 = 7 seconds
// to register as driver due to `fail_point`s
deploy_register_driver().unwrap();
deploy_register_driver();
let elapsed = now.elapsed().unwrap().as_secs();
// `fail_point`s are set to succeed on 4th try
assert!(REGISTER_DRIVER_FAILURES.get() == 3);
Expand Down
126 changes: 51 additions & 75 deletions src/update_agent/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{UpdateAgent, UpdateAgentInfo, UpdateAgentState};
use crate::rpm_ostree::{self, Release};
use crate::utils;
use actix::prelude::*;
use anyhow::Error;
use anyhow::{Error, Result};
use futures::prelude::*;
use log::trace;
use prometheus::{IntCounter, IntCounterVec, IntGauge};
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Handler<RefreshTick> for UpdateAgent {
trace!("update agent tick, current state: {:?}", *agent_state_guard);
let prev_state = (*agent_state_guard).clone();

let modify_state_outcome = match &prev_state {
match &prev_state {
UpdateAgentState::StartState => {
update_agent_info
.tick_initialize(&mut *agent_state_guard)
Expand Down Expand Up @@ -128,13 +128,9 @@ impl Handler<RefreshTick> for UpdateAgent {
.tick_end(&mut *agent_state_guard, update)
.await
}
UpdateAgentState::EndState => Ok(()),
UpdateAgentState::EndState => (),
lucab marked this conversation as resolved.
Show resolved Hide resolved
};

if modify_state_outcome.is_err() {
log::error!("failed to update agent state");
}

Self::refresh_delay(
update_agent_info.steady_interval,
&prev_state,
Expand Down Expand Up @@ -261,14 +257,15 @@ impl UpdateAgent {

impl UpdateAgentInfo {
/// Initialize the update agent.
async fn tick_initialize(&self, state: &mut UpdateAgentState) -> Result<(), ()> {
async fn tick_initialize(&self, state: &mut UpdateAgentState) {
trace!("update agent in start state");
if self.enabled {
self.register_as_driver().await?;
self.register_as_driver().await;
}
let local_depls = self.local_deployments().await;
if let Ok(depls) = local_depls {
UpdateAgent::log_excluded_depls(&depls, &self);
match local_depls {
Ok(depls) => UpdateAgent::log_excluded_depls(&depls, &self),
Err(e) => log::error!("failed to query local deployments: {}", e),
}
let status;
if self.enabled {
Expand All @@ -284,12 +281,10 @@ impl UpdateAgentInfo {

utils::notify_ready();
utils::update_unit_status(status);

Ok(())
}

/// Try to report steady state.
async fn tick_report_steady(&self, state: &mut UpdateAgentState) -> Result<(), ()> {
async fn tick_report_steady(&self, state: &mut UpdateAgentState) {
trace!("trying to report steady state");

let is_steady = self.strategy.report_steady().await;
Expand All @@ -298,12 +293,10 @@ impl UpdateAgentInfo {
utils::update_unit_status("periodically polling for updates");
state.reported_steady();
}

Ok(())
}

/// Try to check for updates.
async fn tick_check_updates(&self, state: &mut UpdateAgentState) -> Result<(), ()> {
async fn tick_check_updates(&self, state: &mut UpdateAgentState) {
trace!("trying to check for udpates");

let local_depls = self.local_deployments().await;
Expand All @@ -320,7 +313,10 @@ impl UpdateAgentInfo {
.fetch_update_hint(&self.identity, depls, allow_downgrade)
.await
}
_ => None,
Err(e) => {
log::error!("failed to query local deployments: {}", e);
None
}
};

match release {
Expand All @@ -332,29 +328,24 @@ impl UpdateAgentInfo {
state.no_new_update();
}
}

Ok(())
}

/// Try to stage an update.
async fn tick_stage_update(
&self,
mut state: &mut UpdateAgentState,
release: Release,
) -> Result<(), ()> {
async fn tick_stage_update(&self, mut state: &mut UpdateAgentState, release: Release) {
trace!("trying to stage an update");

let target = release.clone();
let deploy_outcome = self.attempt_deploy(target).await;

match deploy_outcome {
Ok(_) => {
Ok(release) => {
let msg = format!("update staged: {}", release.version);
utils::update_unit_status(&msg);
log::info!("{}", msg);
state.update_staged(release);
}
Err(_) => {
Err(e) => {
log::error!("failed to stage deployment: {}", e);
let release_ver = release.version.clone();
let fail_count = UpdateAgentInfo::deploy_attempt_failed(release, &mut state);
let msg = format!(
Expand All @@ -367,67 +358,59 @@ impl UpdateAgentInfo {
log::trace!("{}", msg);
}
};

Ok(())
}

/// Try to finalize an update.
async fn tick_finalize_update(
&self,
state: &mut UpdateAgentState,
release: Release,
) -> Result<(), ()> {
async fn tick_finalize_update(&self, state: &mut UpdateAgentState, release: Release) {
trace!("trying to finalize an update");
FINALIZATION_ATTEMPTS.inc();

let strategy_can_finalize = self.strategy.can_finalize().await;
let state_change = if !strategy_can_finalize {
if !strategy_can_finalize {
utils::update_unit_status(&format!(
"update staged: {}; reboot pending due to update strategy",
&release.version
));
// Reset number of postponements to `MAX_FINALIZE_POSTPONEMENTS`
// if strategy does not allow finalization.
state.update_staged(release);
Err(())
lucab marked this conversation as resolved.
Show resolved Hide resolved
} else {
let usersessions_can_finalize = state.usersessions_can_finalize();
if !usersessions_can_finalize {
FINALIZATION_BLOCKED
.with_label_values(&[ACTIVE_USERSESSIONS_LABEL])
.inc();
utils::update_unit_status(&format!(
"update staged: {}; reboot delayed due to active user sessions",
release.version
));
// Record postponement and postpone finalization.
state.record_postponement();
Err(())
} else {
self.finalize_deployment(release).await
}
};
return;
}

let release = state_change?;
FINALIZATION_SUCCESS.inc();
utils::update_unit_status(&format!("update finalized: {}", release.version));
state.update_finalized(release);
let usersessions_can_finalize = state.usersessions_can_finalize();
if !usersessions_can_finalize {
FINALIZATION_BLOCKED
.with_label_values(&[ACTIVE_USERSESSIONS_LABEL])
.inc();
utils::update_unit_status(&format!(
"update staged: {}; reboot delayed due to active user sessions",
release.version
));
// Record postponement and postpone finalization.
state.record_postponement();
return;
}

Ok(())
match self.finalize_deployment(release).await {
Ok(release) => {
FINALIZATION_SUCCESS.inc();
utils::update_unit_status(&format!("update finalized: {}", release.version));
state.update_finalized(release);
}
Err(e) => log::error!("failed to finalize deployment: {}", e),
}
}

/// Actor job is done.
async fn tick_end(&self, state: &mut UpdateAgentState, release: Release) -> Result<(), ()> {
async fn tick_end(&self, state: &mut UpdateAgentState, release: Release) {
let status = format!("update applied, waiting for reboot: {}", release.version);
log::info!("{}", status);
state.end();
utils::update_unit_status(&status);

Ok(())
}

/// Fetch and stage an update, in finalization-locked mode.
async fn attempt_deploy(&self, release: Release) -> Result<Release, ()> {
async fn attempt_deploy(&self, release: Release) -> Result<Release> {
log::info!(
"target release '{}' selected, proceeding to stage it",
release.version
Expand All @@ -440,7 +423,6 @@ impl UpdateAgentInfo {
.rpm_ostree_actor
.send(msg)
.unwrap_or_else(|e| Err(e.into()))
.map_err(|e| log::error!("failed to stage deployment: {}", e))
.await;

upgrade
Expand All @@ -463,13 +445,12 @@ impl UpdateAgentInfo {
///
/// This ignores deployments that have been only staged but not finalized in the
/// past, as they are acceptable as future update target.
async fn local_deployments(&self) -> Result<BTreeSet<Release>, ()> {
async fn local_deployments(&self) -> Result<BTreeSet<Release>> {
let msg = rpm_ostree::QueryLocalDeployments { omit_staged: true };
let depls = self
.rpm_ostree_actor
.send(msg)
.unwrap_or_else(|e| Err(e.into()))
.map_err(|e| log::error!("failed to query local deployments: {}", e))
.map_ok(move |depls| {
log::trace!("found {} local deployments", depls.len());
depls
Expand All @@ -480,7 +461,7 @@ impl UpdateAgentInfo {
}

/// Finalize a deployment (unlock and reboot).
async fn finalize_deployment(&self, release: Release) -> Result<Release, ()> {
async fn finalize_deployment(&self, release: Release) -> Result<Release> {
log::info!(
"staged deployment '{}' available, proceeding to finalize it",
release.version
Expand All @@ -491,25 +472,20 @@ impl UpdateAgentInfo {
.rpm_ostree_actor
.send(msg)
.unwrap_or_else(|e| Err(e.into()))
.map_err(|e| log::error!("failed to finalize deployment: {}", e))
.await;

upgrade
}

/// Attempt to register as the update driver for rpm-ostree.
async fn register_as_driver(&self) -> Result<(), ()> {
async fn register_as_driver(&self) {
log::info!("registering as the update driver for rpm-ostree");

let msg = rpm_ostree::RegisterAsDriver {};
let result = self
.rpm_ostree_actor
self.rpm_ostree_actor
.send(msg)
.unwrap_or_else(|e| Err(e.into()))
.map_err(|e| log::error!("failed to register as driver: {}", e))
.await;

result
.unwrap_or_else(|e| log::error!("failed to register as driver: {}", e))
.await
}
}

Expand Down