Skip to content

Commit

Permalink
feat(publisher): Publish logs (#220)
Browse files Browse the repository at this point in the history
* refactor(core): rename transaction subject's height to block_height for clarity

* feat(core): add logs subject and types

* feat(publisher): publish logs

* ci(repo): fix lint

* fix(core): fix outputs' doc test

---------

Co-authored-by: Pedro Nauck <[email protected]>
  • Loading branch information
Jurshsmith and pedronauck committed Sep 24, 2024
1 parent 8548f06 commit 5895a65
Show file tree
Hide file tree
Showing 17 changed files with 388 additions and 23 deletions.
2 changes: 1 addition & 1 deletion benches/nats-publisher/src/utils/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl TxHelper {
let mut subject: TransactionsSubject = tx.into();
subject = subject
.with_tx_index(Some(index))
.with_height(Some(BlockHeight::from(self.get_height(block))))
.with_block_height(Some(BlockHeight::from(self.get_height(block))))
.with_status(self.get_status(tx).map(Into::into));
subject
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-streams-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fuel-streams-macros = { workspace = true }
futures = { workspace = true }
pretty_assertions = { workspace = true, optional = true }
rand = { workspace = true }
serde = "1"
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod blocks;
pub mod inputs;
pub mod logs;
pub mod nats;
pub mod outputs;
pub mod receipts;
Expand Down
13 changes: 13 additions & 0 deletions crates/fuel-streams-core/src/logs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pub mod subjects;
pub mod types;

pub use subjects::*;
use types::*;

use crate::prelude::*;

impl StreamEncoder for Log {}
impl Streamable for Log {
const NAME: &'static str = "logs";
const WILDCARD_LIST: &'static [&'static str] = &[LogsSubject::WILDCARD];
}
81 changes: 81 additions & 0 deletions crates/fuel-streams-core/src/logs/subjects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use fuel_streams_macros::subject::{IntoSubject, Subject};

use crate::types::*;

/// Represents a subject for logs related to transactions in the Fuel network.
///
/// This subject format allows for efficient querying and filtering of logs
/// based on the block height, transaction ID, the index of the receipt within the transaction,
/// and the unique log ID.
///
/// # Examples
///
/// Creating a subject for a specific log:
///
/// ```
/// # use fuel_streams_core::logs::subjects::LogsSubject;
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = LogsSubject {
/// block_height: Some(1000.into()),
/// tx_id: Some(Bytes32::from([1u8; 32])),
/// receipt_index: Some(0),
/// log_id: Some(Bytes32::from([2u8; 32])),
/// };
/// assert_eq!(
/// subject.parse(),
/// "logs.1000.0x0101010101010101010101010101010101010101010101010101010101010101.0.0x0202020202020202020202020202020202020202020202020202020202020202"
/// );
/// ```
///
/// Wildcard for querying all logs:
///
/// ```
/// # use fuel_streams_core::logs::subjects::LogsSubject;
/// # use fuel_streams_macros::subject::*;
/// assert_eq!(LogsSubject::WILDCARD, "logs.>");
/// ```
///
/// Creating a subject query using the `wildcard` method:
///
/// ```
/// # use fuel_streams_core::logs::subjects::LogsSubject;
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let wildcard = LogsSubject::wildcard(
/// Some(1000.into()),
/// Some(Bytes32::from([1u8; 32])),
/// None,
/// None
/// );
/// assert_eq!(
/// wildcard,
/// "logs.1000.0x0101010101010101010101010101010101010101010101010101010101010101.*.*"
/// );
/// ```
///
/// Using the builder pattern:
///
/// ```
/// # use fuel_streams_core::logs::subjects::LogsSubject;
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = LogsSubject::new()
/// .with_block_height(Some(2310.into()))
/// .with_tx_id(Some(Bytes32::from([1u8; 32])))
/// .with_receipt_index(Some(0))
/// .with_log_id(Some(Bytes32::from([2u8; 32])));
/// assert_eq!(
/// subject.parse(),
/// "logs.2310.0x0101010101010101010101010101010101010101010101010101010101010101.0.0x0202020202020202020202020202020202020202020202020202020202020202"
/// );
/// ```
#[derive(Subject, Debug, Clone, Default)]
#[subject_wildcard = "logs.>"]
#[subject_format = "logs.{block_height}.{tx_id}.{receipt_index}.{log_id}"]
pub struct LogsSubject {
pub block_height: Option<BlockHeight>,
pub tx_id: Option<Bytes32>,
pub receipt_index: Option<usize>,
pub log_id: Option<Bytes32>,
}
140 changes: 140 additions & 0 deletions crates/fuel-streams-core/src/logs/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use fuel_core_types::fuel_tx::{Bytes32, ContractId, Receipt, Word};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

/// A convenient aggregate type to represent a Fuel logs to allow users
/// think about them agnostic of receipts.
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum Log {
WithoutData {
id: ContractId,
ra: Word,
rb: Word,
rc: Word,
rd: Word,
pc: Word,
is: Word,
},
WithData {
id: ContractId,
ra: Word,
rb: Word,
ptr: Word,
len: Word,
digest: Bytes32,
pc: Word,
is: Word,
data: Option<Vec<u8>>,
},
}

impl From<Receipt> for Log {
fn from(value: Receipt) -> Self {
match value {
Receipt::Log {
id,
ra,
rb,
rc,
rd,
pc,
is,
} => Log::WithoutData {
id,
ra,
rb,
rc,
rd,
pc,
is,
},
Receipt::LogData {
id,
ra,
rb,
ptr,
len,
digest,
pc,
is,
data,
} => Log::WithData {
id,
ra,
rb,
ptr,
len,
digest,
pc,
is,
data,
},
_ => panic!("Invalid receipt type"),
}
}
}

/// Introduced majorly allow delegating serialization and deserialization to `fuel-core`'s Receipt
impl From<Log> for Receipt {
fn from(log: Log) -> Receipt {
match log {
Log::WithoutData {
id,
ra,
rb,
rc,
rd,
pc,
is,
} => Receipt::Log {
id,
ra,
rb,
rc,
rd,
pc,
is,
},
Log::WithData {
id,
ra,
rb,
ptr,
len,
digest,
pc,
is,
data,
} => Receipt::LogData {
id,
ra,
rb,
ptr,
len,
digest,
pc,
is,
data,
},
}
}
}

impl Serialize for Log {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let receipt: Receipt = self.clone().into();
receipt.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for Log {
fn deserialize<D>(deserializer: D) -> Result<Log, D::Error>
where
D: Deserializer<'de>,
{
let receipt = Receipt::deserialize(deserializer)?;
Ok(Log::from(receipt))
}
}
2 changes: 1 addition & 1 deletion crates/fuel-streams-core/src/outputs/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub struct OutputsChangeSubject {
/// .with_tx_id(Some(Bytes32::zeroed()))
/// .with_index(Some(0))
/// .with_to(Some(Address::zeroed()))
/// .with_asset_id(Some(Bytes32::from([1u8; 32])));
/// .with_asset_id(Some(AssetId::from([1u8; 32])));
/// assert_eq!(
/// subject.to_string(),
/// "outputs.variable.0x0000000000000000000000000000000000000000000000000000000000000000.0.0x0000000000000000000000000000000000000000000000000000000000000000.0x0101010101010101010101010101010101010101010101010101010101010101"
Expand Down
10 changes: 5 additions & 5 deletions crates/fuel-streams-core/src/transactions/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{blocks::types::BlockHeight, types::*};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::IntoSubject;
/// let subject = TransactionsSubject {
/// height: Some(23.into()),
/// block_height: Some(23.into()),
/// tx_index: Some(1),
/// tx_id: Some(Bytes32::zeroed()),
/// status: Some(TransactionStatus::Success),
Expand Down Expand Up @@ -52,7 +52,7 @@ use crate::{blocks::types::BlockHeight, types::*};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = TransactionsSubject::new()
/// .with_height(Some(23.into()))
/// .with_block_height(Some(23.into()))
/// .with_tx_index(Some(1))
/// .with_tx_id(Some(Bytes32::zeroed()))
/// .with_status(Some(TransactionStatus::Success))
Expand All @@ -61,9 +61,9 @@ use crate::{blocks::types::BlockHeight, types::*};
/// ```
#[derive(Subject, Debug, Clone, Default)]
#[subject_wildcard = "transactions.>"]
#[subject_format = "transactions.{height}.{tx_index}.{tx_id}.{status}.{kind}"]
#[subject_format = "transactions.{block_height}.{tx_index}.{tx_id}.{status}.{kind}"]
pub struct TransactionsSubject {
pub height: Option<BlockHeight>,
pub block_height: Option<BlockHeight>,
pub tx_index: Option<usize>,
pub tx_id: Option<Bytes32>,
pub status: Option<TransactionStatus>,
Expand Down Expand Up @@ -146,7 +146,7 @@ mod test {
fn transactions_subjects_from_transaction() {
let mock_tx = MockTransaction::build();
let subject = TransactionsSubject::from(&mock_tx);
assert!(subject.height.is_none());
assert!(subject.block_height.is_none());
assert!(subject.tx_index.is_none());
assert!(subject.status.is_none());
assert!(subject.kind.is_some());
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use fuel_core_types::{
pub use crate::{
blocks::types::*,
inputs::types::*,
logs::types::*,
nats::types::*,
transactions::types::*,
};
Expand Down
10 changes: 6 additions & 4 deletions crates/fuel-streams-publisher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
mod blocks;
mod inputs;
pub mod metrics;
mod logs;
mod outputs;
mod publisher;
mod receipts;
mod transactions;

mod fuel_core;

pub mod metrics;
pub mod server;
pub mod shutdown;
pub mod state;
pub mod system;
mod transactions;

mod fuel_core;

pub use fuel_core::{FuelCore, FuelCoreLike};
pub use publisher::{Publisher, Streams};
57 changes: 57 additions & 0 deletions crates/fuel-streams-publisher/src/logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;

use fuel_core_types::fuel_tx::Receipt;
use fuel_streams_core::{
logs::LogsSubject,
prelude::*,
types::{Transaction, UniqueIdentifier},
Stream,
};
use tracing::info;

use crate::{metrics::PublisherMetrics, publish_with_metrics, FuelCoreLike};

pub async fn publish(
metrics: &Arc<PublisherMetrics>,
fuel_core: &dyn FuelCoreLike,
logs_stream: &Stream<Log>,
transactions: &[Transaction],
block_producer: &Address,
block_height: BlockHeight,
) -> anyhow::Result<()> {
let chain_id = fuel_core.chain_id();

for transaction in transactions.iter() {
let tx_id = transaction.id(chain_id);
let receipts = fuel_core.get_receipts(&tx_id)?;

if let Some(receipts) = receipts {
for (index, receipt) in receipts.iter().enumerate() {
match receipt {
receipt @ (Receipt::Log { id, .. }
| Receipt::LogData { id, .. }) => {
let subject = LogsSubject::new()
.with_block_height(Some(block_height.clone()))
.with_tx_id(Some(tx_id.into()))
.with_receipt_index(Some(index))
.with_log_id(Some((*id).into()));
let subject_wildcard = LogsSubject::WILDCARD;

info!("NATS Publisher: Publishing Logs for 0x#{tx_id}");
publish_with_metrics!(
logs_stream
.publish(&subject, &(receipt.clone()).into()),
metrics,
chain_id,
block_producer,
subject_wildcard
);
}
_non_log_receipt => {}
}
}
}
}

Ok(())
}
Loading

0 comments on commit 5895a65

Please sign in to comment.