Skip to content

Commit

Permalink
feat(conductor)!: implement chain ID checks (#1482)
Browse files Browse the repository at this point in the history
## Summary
Implemented chain ID validation for both the sequencer and Celestia
readers in Conductor.

## Background
This is to safeguard against connecting to an incorrect network.

This change was previously blocking on a switch to solely using gRPC for
communication with sequencer, but after offline discussion it was
decided to implement the chain ID check anyways using the HTTP client.

## Changes
- Added sequencer and Celestia chain ID environment variables.
- Added chain ID checks to the initialization of both the sequencer and
Celestia readers.
- Changed conductor `run_until_stopped()` to return with an error so
that it will not fail silently.

## Testing
Added 2 blackbox tests to ensure the conductor exits with the proper
error in the event of both a Celestia and sequencer chain ID mismatch.

## Breaking Changelist
- Added `ASTRIA_CONDUCTOR_SEQUENCER_CHAIN_ID` and
`ASTRIA_CONDUCTOR_CELESTIA_CHAIN_ID` environment variables, without
which conductor will not function.

## Related Issues
closes #986

---------

Co-authored-by: Richard Janis Goldschmidt <[email protected]>
  • Loading branch information
ethanoroshiba and SuperFluffy authored Oct 1, 2024
1 parent b79138d commit 8a4eafc
Show file tree
Hide file tree
Showing 22 changed files with 536 additions and 107 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions charts/evm-rollup/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.27.3
version: 0.27.4

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.14.1"
appVersion: "0.14.2"

maintainers:
- name: wafflesvonmaple
Expand Down
2 changes: 2 additions & 0 deletions charts/evm-rollup/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ data:
OTEL_SERVICE_NAME: "{{ tpl .Values.otel.serviceNamePrefix . }}-conductor"
{{- if not .Values.global.dev }}
{{- else }}
ASTRIA_CONDUCTOR_EXPECTED_SEQUENCER_CHAIN_ID: "{{ tpl .Values.config.conductor.sequencerChainId . }}"
ASTRIA_CONDUCTOR_EXPECTED_CELESTIA_CHAIN_ID: "{{ tpl .Values.config.conductor.celestiaChainId . }}"
{{- end }}
---
apiVersion: v1
Expand Down
4 changes: 3 additions & 1 deletion charts/evm-rollup/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ images:
conductor:
repo: ghcr.io/astriaorg/conductor
pullPolicy: IfNotPresent
tag: "0.20.1"
tag: "0.21.0"
devTag: latest


Expand Down Expand Up @@ -178,6 +178,8 @@ config:
sequencerGrpc: ""
# The maximum number of requests to make to the sequencer per second
sequencerRequestsPerSecond: 500
# The chain id of the celestia network the conductor communicates with
celestiaChainId: ""

celestia:
# if config.rollup.executionLevel is NOT 'SoftOnly' AND celestia-node is not enabled
Expand Down
6 changes: 3 additions & 3 deletions charts/evm-stack/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencies:
version: 0.3.6
- name: evm-rollup
repository: file://../evm-rollup
version: 0.27.3
version: 0.27.4
- name: composer
repository: file://../composer
version: 0.1.4
Expand All @@ -20,5 +20,5 @@ dependencies:
- name: blockscout-stack
repository: https://blockscout.github.io/helm-charts
version: 1.6.2
digest: sha256:6e62801b5f401ba653f88a5ed9d33a6de38b8bba5ba942d01a2af68371c8bfd8
generated: "2024-09-19T12:52:41.503045-07:00"
digest: sha256:b086adf099e986e3a5c1f7f25481aaf42ebf597029a70ee0bd3ff6711e6bdccf
generated: "2024-09-25T14:31:21.35488-05:00"
4 changes: 2 additions & 2 deletions charts/evm-stack/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.6.0
version: 0.6.1

dependencies:
- name: celestia-node
version: 0.3.6
repository: "file://../celestia-node"
condition: celestia-node.enabled
- name: evm-rollup
version: 0.27.3
version: 0.27.4
repository: "file://../evm-rollup"
- name: composer
version: 0.1.4
Expand Down
2 changes: 2 additions & 0 deletions charts/evm-stack/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ global:
rollupName: ""
evmChainId: ""
sequencerChainId: ""
celestiaChainId: ""
otel:
endpoint: ""
tracesEndpoint: ""
Expand All @@ -29,6 +30,7 @@ evm-rollup:
config:
conductor:
sequencerChainId: "{{ .Values.global.sequencerChainId }}"
celestiaChainId: "{{ .Values.global.celestiaChainId }}"
sequencerRpc: "{{ .Values.global.sequencerRpc }}"
sequencerGrpc: "{{ .Values.global.sequencerGrpc }}"
otel:
Expand Down
1 change: 1 addition & 0 deletions crates/astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ tower = { version = "0.4.13", features = ["limit"] }
# when updating.
celestia-rpc = "0.1.1"
celestia-types = { workspace = true }
celestia-tendermint = { workspace = true }
jsonrpsee = { version = "0.20", features = ["client-core", "macros"] }

[dev-dependencies]
Expand Down
6 changes: 6 additions & 0 deletions crates/astria-conductor/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ ASTRIA_CONDUCTOR_SEQUENCER_BLOCK_TIME_MS=2000
# CometBFT node.
ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND=500

# The chain ID of the sequencer network the conductor should be communicating with.
ASTRIA_CONDUCTOR_EXPECTED_SEQUENCER_CHAIN_ID="test-sequencer-1000"

# The chain ID of the Celestia network the conductor should be communicating with.
ASTRIA_CONDUCTOR_EXPECTED_CELESTIA_CHAIN_ID="test-celestia-1000"

# Set to true to enable prometheus metrics.
ASTRIA_CONDUCTOR_NO_METRICS=true

Expand Down
6 changes: 6 additions & 0 deletions crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub(crate) struct Builder {
pub(crate) executor: executor::Handle,
pub(crate) sequencer_cometbft_client: SequencerClient,
pub(crate) sequencer_requests_per_second: u32,
pub(crate) expected_celestia_chain_id: String,
pub(crate) expected_sequencer_chain_id: String,
pub(crate) shutdown: CancellationToken,
pub(crate) metrics: &'static Metrics,
}
Expand All @@ -37,6 +39,8 @@ impl Builder {
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
} = self;
Expand All @@ -50,6 +54,8 @@ impl Builder {
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
})
Expand Down
74 changes: 67 additions & 7 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use astria_core::{
use astria_eyre::eyre::{
self,
bail,
ensure,
WrapErr as _,
};
use bytes::Bytes;
use celestia_rpc::HeaderClient as _;
use celestia_types::nmt::Namespace;
use futures::{
future::{
Expand Down Expand Up @@ -139,6 +141,12 @@ pub(crate) struct Reader {
/// (usually to verify block data retrieved from Celestia blobs).
sequencer_requests_per_second: u32,

/// The chain ID of the Celestia network the reader should be communicating with.
expected_celestia_chain_id: String,

/// The chain ID of the Sequencer the reader should be communicating with.
expected_sequencer_chain_id: String,

/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,

Expand All @@ -147,7 +155,7 @@ pub(crate) struct Reader {

impl Reader {
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let (executor, sequencer_chain_id) = select!(
let ((), executor, sequencer_chain_id) = select!(
() = self.shutdown.clone().cancelled_owned() => {
info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(||
info!("received shutdown signal while waiting for Celestia reader task to initialize")
Expand All @@ -169,24 +177,76 @@ impl Reader {
#[instrument(skip_all, err)]
async fn initialize(
&mut self,
) -> eyre::Result<(executor::Handle<StateIsInit>, tendermint::chain::Id)> {
) -> eyre::Result<((), executor::Handle<StateIsInit>, tendermint::chain::Id)> {
let validate_celestia_chain_id = async {
let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client)
.await
.wrap_err("failed to fetch Celestia chain ID")?;
let expected_celestia_chain_id = &self.expected_celestia_chain_id;
ensure!(
self.expected_celestia_chain_id == actual_celestia_chain_id.as_str(),
"expected Celestia chain id `{expected_celestia_chain_id}` does not match actual: \
`{actual_celestia_chain_id}`"
);
Ok(())
};

let wait_for_init_executor = async {
self.executor
.wait_for_init()
.await
.wrap_err("handle to executor failed while waiting for it being initialized")
};

let get_sequencer_chain_id = async {
get_sequencer_chain_id(self.sequencer_cometbft_client.clone())
.await
.wrap_err("failed to get sequencer chain ID")
let get_and_validate_sequencer_chain_id = async {
let actual_sequencer_chain_id =
get_sequencer_chain_id(self.sequencer_cometbft_client.clone())
.await
.wrap_err("failed to get sequencer chain ID")?;
let expected_sequencer_chain_id = &self.expected_sequencer_chain_id;
ensure!(
self.expected_sequencer_chain_id == actual_sequencer_chain_id.to_string(),
"expected Celestia chain id `{expected_sequencer_chain_id}` does not match \
actual: `{actual_sequencer_chain_id}`"
);
Ok(actual_sequencer_chain_id)
};

try_join!(wait_for_init_executor, get_sequencer_chain_id)
try_join!(
validate_celestia_chain_id,
wait_for_init_executor,
get_and_validate_sequencer_chain_id
)
}
}

#[instrument(skip_all, err)]
async fn get_celestia_chain_id(
celestia_client: &CelestiaClient,
) -> eyre::Result<celestia_tendermint::chain::Id> {
let retry_config = tryhard::RetryFutureConfig::new(u32::MAX)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(20))
.on_retry(
|attempt: u32, next_delay: Option<Duration>, error: &jsonrpsee::core::Error| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to fetch celestia network header info; retrying after backoff",
);
futures::future::ready(())
},
);
let network_head = tryhard::retry_fn(|| celestia_client.header_network_head())
.with_config(retry_config)
.await?;
Ok(network_head.chain_id().clone())
}

struct RunningReader {
block_cache: BlockCache<ReconstructedBlock>,

Expand Down
26 changes: 19 additions & 7 deletions crates/astria-conductor/src/conductor/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use astria_eyre::eyre::{
self,
eyre,
Result,
WrapErr as _,
};
use itertools::Itertools as _;
Expand Down Expand Up @@ -47,20 +48,20 @@ enum ExitReason {
ShutdownSignal,
TaskFailed {
name: &'static str,
error: eyre::ErrReport,
error: eyre::Report,
},
}

pin_project! {
/// A handle returned by [`ConductorInner::spawn`].
pub(super) struct InnerHandle {
shutdown_token: CancellationToken,
task: Option<tokio::task::JoinHandle<RestartOrShutdown>>,
task: Option<tokio::task::JoinHandle<Result<RestartOrShutdown>>>,
}
}

impl Future for InnerHandle {
type Output = Result<RestartOrShutdown, tokio::task::JoinError>;
type Output = Result<Result<RestartOrShutdown>, tokio::task::JoinError>;

fn poll(
self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -132,6 +133,7 @@ impl ConductorInner {
sequencer_grpc_client,
sequencer_cometbft_client: sequencer_cometbft_client.clone(),
sequencer_block_time: Duration::from_millis(cfg.sequencer_block_time_ms),
expected_sequencer_chain_id: cfg.expected_sequencer_chain_id.clone(),
shutdown: shutdown_token.clone(),
executor: executor_handle.clone(),
}
Expand All @@ -153,6 +155,8 @@ impl ConductorInner {
executor: executor_handle.clone(),
sequencer_cometbft_client: sequencer_cometbft_client.clone(),
sequencer_requests_per_second: cfg.sequencer_requests_per_second,
expected_celestia_chain_id: cfg.expected_celestia_chain_id,
expected_sequencer_chain_id: cfg.expected_sequencer_chain_id,
shutdown: shutdown_token.clone(),
metrics,
}
Expand All @@ -172,7 +176,7 @@ impl ConductorInner {
///
/// # Panics
/// Panics if it could not install a signal handler.
async fn run_until_stopped(mut self) -> RestartOrShutdown {
async fn run_until_stopped(mut self) -> Result<RestartOrShutdown> {
info_span!("Conductor::run_until_stopped").in_scope(|| info!("conductor is running"));

let exit_reason = select! {
Expand Down Expand Up @@ -219,7 +223,7 @@ impl ConductorInner {
/// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds
/// to abort the remaining tasks.
#[instrument(skip_all)]
async fn shutdown(mut self, exit_reason: ExitReason) -> RestartOrShutdown {
async fn shutdown(mut self, exit_reason: ExitReason) -> Result<RestartOrShutdown> {
self.shutdown_token.cancel();
let mut restart_or_shutdown = RestartOrShutdown::Shutdown;

Expand Down Expand Up @@ -273,7 +277,15 @@ impl ConductorInner {
}
info!("shutting down");

restart_or_shutdown
if let ExitReason::TaskFailed {
error, ..
} = exit_reason
{
if matches!(restart_or_shutdown, RestartOrShutdown::Shutdown) {
return Err(error);
}
}
Ok(restart_or_shutdown)
}
}

Expand All @@ -289,7 +301,7 @@ fn report_exit(exit_reason: &ExitReason, message: &str) {
}

#[instrument(skip_all)]
fn check_for_restart(name: &str, err: &eyre::ErrReport) -> bool {
fn check_for_restart(name: &str, err: &eyre::Report) -> bool {
if name != ConductorInner::EXECUTOR {
return false;
}
Expand Down
Loading

0 comments on commit 8a4eafc

Please sign in to comment.