Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iss message repository #168

Merged
merged 12 commits into from
Sep 12, 2024
1 change: 1 addition & 0 deletions crates/plugins/mediator-coordination/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod client;
pub mod plugin;


mod constant;
mod didcomm;
mod jose;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use didcomm::Message;
/// Resources to map in a database.
use mongodb::bson::oid::ObjectId;
use serde::{Deserialize, Serialize};
Expand All @@ -22,6 +23,14 @@ pub struct Connection {
/// Generated DID to route messages to client.
pub routing_did: String,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct RoutedMessage {
#[serde(rename = "_id")]
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub message: Message,
pub recipient_did: String,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Secrets {
Expand Down
9 changes: 6 additions & 3 deletions crates/plugins/mediator-coordination/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use mongodb::{ options::ClientOptions, Client, Database};
use std::sync::Arc;

use crate::{
repository::stateful::coord::{MongoConnectionRepository, MongoSecretsRepository},
repository::stateful::coord::{MongoConnectionRepository, MongoMessagesRepository, MongoSecretsRepository},
util,
web::{self, AppState, AppStateRepository},
};
Expand Down Expand Up @@ -95,8 +95,11 @@ impl Plugin for MediatorCoordinationPlugin {

// Load persistence layer
let repository = AppStateRepository {
connection_repository: Arc::new(MongoConnectionRepository::from_db(db)),
secret_repository: Arc::new(MongoSecretsRepository::from_db(db)),

connection_repository: Arc::new(MongoConnectionRepository::from_db(&db)),
secret_repository: Arc::new(MongoSecretsRepository::from_db(&db)),
message_repository: Arc::new(MongoMessagesRepository::from_db(&db))

};

// Compile state
Expand Down
170 changes: 169 additions & 1 deletion crates/plugins/mediator-coordination/src/repository/stateful/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use mongodb::{
};

use crate::{
model::stateful::entity::{Connection, Secrets},
model::stateful::entity::{Connection, RoutedMessage, Secrets},
repository::traits::{Entity, Repository, RepositoryError},
};

Expand Down Expand Up @@ -117,7 +117,86 @@ impl MongoSecretsRepository {
}
}
}
pub struct MongoMessagesRepository {
collection: Collection<RoutedMessage>,
}
impl MongoMessagesRepository {
pub fn from_db(db: &Database) -> Self {
Self {
collection: db.collection("messages"),
}
}
}
#[async_trait]
impl Repository<RoutedMessage> for MongoMessagesRepository {
async fn find_all(&self) -> Result<Vec<RoutedMessage>, RepositoryError> {
let mut messages: Vec<RoutedMessage> = vec![];

// Retrieve all messages from the database
let mut cursor = self.collection.find(None, None).await?;
while cursor.advance().await? {
messages.push(cursor.deserialize_current()?);
}

Ok(messages)
}
async fn find_one(&self, message_id: ObjectId) -> Result<Option<RoutedMessage>, RepositoryError> {
// Query the database for the specified message ID
self.find_one_by(doc! {"_id": message_id}).await
}
async fn find_one_by(&self, filter: BsonDocument) -> Result<Option<RoutedMessage>, RepositoryError> {
// Query the database for the specified message ID
Ok(self.collection.find_one(filter, None).await?)
}
async fn store(&self, message: RoutedMessage) -> Result<RoutedMessage, RepositoryError> {
// Insert the new message into the database
let metadata = self.collection.insert_one(message.clone(), None).await?;

// Return persisted message
Ok(match metadata.inserted_id {
Bson::ObjectId(oid) => RoutedMessage {
id: Some(oid),
..message
},
_ => unreachable!(),
})
}
async fn delete_one(&self, message_id: ObjectId) -> Result<(), RepositoryError> {
// Delete the connection from the database
let metadata = self
.collection
.delete_one(doc! {"_id": message_id}, None)
.await?;

if metadata.deleted_count > 0 {
Ok(())
} else {
Err(RepositoryError::TargetNotFound)
}
}

async fn update(&self, message: RoutedMessage) -> Result<RoutedMessage, RepositoryError> {
if message.id.is_none() {
return Err(RepositoryError::MissingIdentifier);
}

// Update the message in the database
let metadata = self
.collection
.update_one(
doc! {"_id": message.id.unwrap()},
doc! {"$set": bson::to_document(&message).map_err(|_| RepositoryError::BsonConversionError)?},
None,
)
.await?;

if metadata.matched_count > 0 {
Ok(message)
} else {
Err(RepositoryError::TargetNotFound)
}
}
}
#[async_trait]
impl Repository<Secrets> for MongoSecretsRepository {
async fn find_all(&self) -> Result<Vec<Secrets>, RepositoryError> {
Expand Down Expand Up @@ -315,6 +394,16 @@ pub mod tests {
}
}
}
pub struct MockMessagesRepository {
messages: RwLock<Vec<RoutedMessage>>,
}
impl MockMessagesRepository {
pub fn from(messages: Vec<RoutedMessage>) -> Self {
Self {
messages: RwLock::new(messages),
}
}
}

#[async_trait]
impl Repository<Secrets> for MockSecretsRepository {
Expand Down Expand Up @@ -391,4 +480,83 @@ pub mod tests {
}
}
}

#[async_trait]
impl Repository<RoutedMessage> for MockMessagesRepository {
async fn find_all(&self) -> Result<Vec<RoutedMessage>, RepositoryError> {
Ok(self.messages.read().unwrap().clone())
}

async fn find_one(
&self,
secrets_id: ObjectId,
) -> Result<Option<RoutedMessage>, RepositoryError> {
self.find_one_by(doc! {"_id": secrets_id}).await
}

async fn find_one_by(
&self,
filter: BsonDocument,
) -> Result<Option<RoutedMessage>, RepositoryError> {
let filter: HashMap<String, Bson> = filter.into_iter().collect();

Ok(self
.messages
.read()
.unwrap()
.iter()
.find(|s| {
if let Some(id) = filter.get("_id") {
if json!(s.id) != json!(id) {
return false;
}
}

true
})
.cloned())
}

async fn store(&self, messages: RoutedMessage) -> Result<RoutedMessage, RepositoryError> {
// Add new entity to collection
self.messages.write().unwrap().push(messages.clone());

// Return added entity
Ok(messages)
}

async fn update(&self, messages: RoutedMessage) -> Result<RoutedMessage, RepositoryError> {
// Find entity to update
let pos = self
.messages
.read()
.unwrap()
.iter()
.position(|c| c.id == messages.id);

if let Some(pos) = pos {
self.messages.write().unwrap()[pos] = messages.clone();
Ok(messages)
} else {
Err(RepositoryError::TargetNotFound)
}
}

async fn delete_one(&self, message_id: ObjectId) -> Result<(), RepositoryError> {
// Find entity to delete
let pos = self
.messages
.read()
.unwrap()
.iter()
.position(|s| s.id == Some(message_id));

if let Some(pos) = pos {
self.messages.write().unwrap().remove(pos);
Ok(())
} else {
Err(RepositoryError::TargetNotFound)
}
}
}
}
4 changes: 4 additions & 0 deletions crates/plugins/mediator-coordination/src/web/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub enum MediationError {
UnsupportedDidMethod,
#[error("unsupported operation")]
UnsupportedOperation,
#[error("Could not store Message")]
PersisenceError,
#[error("Could not deserialize Message")]
DeserializationError
}

impl MediationError {
Expand Down
5 changes: 4 additions & 1 deletion crates/plugins/mediator-coordination/src/web/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ pub mod tests {

use crate::{
didcomm::bridge::LocalSecretsResolver,
repository::stateful::coord::tests::{MockConnectionRepository, MockSecretsRepository},
repository::stateful::coord::tests::{
MockConnectionRepository, MockMessagesRepository, MockSecretsRepository,
},
util::{self, MockFileSystem},
web::{self, AppStateRepository},
};
Expand All @@ -102,6 +104,7 @@ pub mod tests {
let repository = AppStateRepository {
connection_repository: Arc::new(MockConnectionRepository::from(vec![])),
secret_repository: Arc::new(MockSecretsRepository::from(vec![])),
message_repository: Arc::new(MockMessagesRepository::from(vec![])),
};

let state = Arc::new(AppState::from(
Expand Down
14 changes: 7 additions & 7 deletions crates/plugins/mediator-coordination/src/web/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod coord;
mod error;
pub mod error;
mod handler;
mod midlw;

Expand All @@ -10,8 +10,7 @@ use std::sync::Arc;

use crate::{
didcomm::bridge::{LocalDIDResolver, LocalSecretsResolver},
model::stateful::entity::Connection,
model::stateful::entity::Secrets,
model::stateful::entity::{Connection, RoutedMessage, Secrets},
repository::traits::Repository,
util,
};
Expand All @@ -34,21 +33,22 @@ pub struct AppState {
public_domain: String,

// Crypto identity
diddoc: Document,
pub diddoc: Document,
assertion_jwk: (String, Jwk),

// DIDComm Resolvers
did_resolver: LocalDIDResolver,
secrets_resolver: LocalSecretsResolver,
pub did_resolver: LocalDIDResolver,
pub secrets_resolver: LocalSecretsResolver,

// Persistence layer
repository: Option<AppStateRepository>,
pub repository: Option<AppStateRepository>,
}

#[derive(Clone)]
pub struct AppStateRepository {
pub connection_repository: Arc<dyn Repository<Connection>>,
pub secret_repository: Arc<dyn Repository<Secrets>>,
pub message_repository: Arc<dyn Repository<RoutedMessage>>,
}

impl AppState {
Expand Down
Loading