Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow oracles composed of different feeds #535

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions oracle/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct PriceConfig<Currency> {
#[serde(default)]
pub value: Option<f64>,
// Feeds to consume to calculate this exchange rate.
#[serde(default = "BTreeMap::new")]
pub feeds: BTreeMap<FeedName, Vec<CurrencyPair<Currency>>>,
#[serde(default = "Vec::new")]
pub feeds: Vec<Vec<(FeedName, CurrencyPair<Currency>)>>,
}

impl<Currency> PriceConfig<Currency>
Expand All @@ -47,12 +47,15 @@ where
{
// TODO: validate currencies exist
pub fn validate(&self) -> Result<(), PriceConfigError<Currency>> {
for (name, path) in &self.feeds {
for path in self
.feeds
.iter()
.map(|x| x.iter().map(|(_name, pair)| pair.clone()).collect::<Vec<_>>())
{
let end = &match &path.first() {
Some(currency_pair) if currency_pair.contains(&self.pair.base) => Ok(self.pair.quote.clone()),
Some(currency_pair) if currency_pair.contains(&self.pair.quote) => Ok(self.pair.base.clone()),
_ => Err(PriceConfigError {
feed: name.clone(),
pair: self.pair.clone(),
error: ConfigError::NoStart,
}),
Expand All @@ -61,7 +64,6 @@ where
match &path.last() {
Some(currency_pair) if currency_pair.contains(end) => Ok(()),
_ => Err(PriceConfigError {
feed: name.clone(),
pair: self.pair.clone(),
error: ConfigError::NoEnd,
}),
Expand All @@ -70,7 +72,6 @@ where
for [left, right] in path.windows(2).flat_map(<&[CurrencyPair<Currency>; 2]>::try_from) {
if !left.has_shared(right) {
return Err(PriceConfigError {
feed: name.clone(),
pair: self.pair.clone(),
error: ConfigError::NoPath(left.clone(), right.clone()),
});
Expand All @@ -91,7 +92,7 @@ mod tests {
PriceConfig {
pair: $pair,
value: None,
feeds: vec![(FeedName::Kraken, vec![$($path),*])].into_iter().collect()
feeds: vec![vec![$((FeedName::Kraken, $path)),*]]
}
.validate().expect("Config is valid")
}};
Expand All @@ -102,14 +103,13 @@ mod tests {
let result = PriceConfig {
pair: $pair,
value: None,
feeds: vec![(FeedName::Kraken, vec![$($path),*])].into_iter().collect()
feeds: vec![vec![$((FeedName::Kraken, $path)),*]]
}
.validate();
assert!(
matches!(
result,
Err(PriceConfigError{
feed: FeedName::Kraken,
pair: _,
error: $err
})
Expand Down
3 changes: 1 addition & 2 deletions oracle/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ pub enum ConfigError<Currency> {
}

#[derive(Error, Debug)]
#[error("{feed}: {pair} => {error}")]
#[error("{pair} => {error}")]
pub struct PriceConfigError<Currency> {
pub(crate) feed: FeedName,
pub(crate) pair: CurrencyPair<Currency>,
pub(crate) error: ConfigError<Currency>,
}
Expand Down
19 changes: 11 additions & 8 deletions oracle/src/feeds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,25 @@ impl PriceFeeds {
price_config
.feeds
.into_iter()
.map(|(name, route)| {
self.feeds
.get(&name)
.map(|feed| (name.clone(), route, feed))
.ok_or(Error::NotConfigured(name))
.map(|x| {
x.into_iter()
.map(|(name, pair)| {
self.feeds
.get(&name)
.map(|feed| (name.clone(), pair, feed))
.ok_or(Error::NotConfigured(name))
})
.collect::<Result<Vec<_>, Error>>()
})
.collect::<Result<Vec<_>, Error>>()?
.into_iter()
.map(|(name, route, feed)| {
.map(|route| {
let currency_pair = currency_pair.clone();
async move {
let mut currency_pair_and_price = if let Some(currency_pair_and_price) = join_all(
route
.into_iter()
.map(|currency_pair| feed.get_price(currency_pair, currency_store)),
.map(|(name, currency_pair, feed)| feed.get_price(currency_pair, currency_store)),
)
.await
.into_iter()
Expand All @@ -149,7 +153,6 @@ impl PriceFeeds {
currency_pair_and_price = currency_pair_and_price.invert()
}

log::trace!("Using {:?}: {}", name, currency_pair_and_price);
Ok(Some(currency_pair_and_price))
}
}),
Expand Down
2 changes: 1 addition & 1 deletion oracle/src/feeds/dia_fair_price.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::single_char_pattern)]
use super::{get_http, PriceFeed};
use crate::{config::CurrencyStore, currency::*, Error};
use crate::{config::CurrencyStore, currency::*, feeds::DiaApi, Error};
use async_trait::async_trait;
use clap::Parser;
use reqwest::Url;
Expand Down
9 changes: 6 additions & 3 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ mod feeds;

use backoff::{future::retry_notify, ExponentialBackoff};
use clap::Parser;
use config::{CurrencyStore, OracleConfig};
use config::{CurrencyStore, OracleConfig, PriceConfig};
use currency::*;
use error::Error;
use feeds::CoinGeckoApi;
use futures::{future::join_all, stream::StreamExt};
use git_version::git_version;
use runtime::{
Expand All @@ -16,7 +17,7 @@ use runtime::{
};
use signal_hook::consts::*;
use signal_hook_tokio::Signals;
use std::{path::PathBuf, time::Duration};
use std::{collections::BTreeMap, path::PathBuf, time::Duration};
use tokio::{join, time::sleep};

const VERSION: &str = git_version!(args = ["--tags"]);
Expand Down Expand Up @@ -179,7 +180,7 @@ async fn _main() -> Result<(), Error> {

let currency_store = &oracle_config.currencies;
let mut price_feeds = feeds::PriceFeeds::new(currency_store.clone());
price_feeds.maybe_add_coingecko(opts.coingecko);
price_feeds.maybe_add_coingecko(opts.coingecko.clone());
price_feeds.maybe_add_dia(opts.dia);
price_feeds.maybe_add_dia_fair_price(opts.dia_fair_price);
price_feeds.maybe_add_gateio(opts.gateio);
Expand All @@ -206,6 +207,8 @@ async fn _main() -> Result<(), Error> {
.into_iter()
.collect::<Result<Vec<_>, _>>()?;

log::debug!("Collected prices: {:?}", prices);

// get prices above first to prevent websocket timeout
let shutdown_tx = ShutdownSender::new();
let parachain_rpc = InterBtcParachain::from_url_with_retry(
Expand Down