Skip to content

Commit

Permalink
Merge pull request #168 from adorsys/iss-message-repository
Browse files Browse the repository at this point in the history
Iss message repository
  • Loading branch information
Christiantyemele authored Sep 12, 2024
2 parents c81ea9c + e5f4305 commit 4805e0b
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 12 deletions.
1 change: 1 addition & 0 deletions crates/plugins/mediator-coordination/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod client;
pub mod plugin;


mod constant;
mod didcomm;
mod jose;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use didcomm::Message;
/// Resources to map in a database.
use mongodb::bson::oid::ObjectId;
use serde::{Deserialize, Serialize};
Expand All @@ -22,6 +23,14 @@ pub struct Connection {
/// Generated DID to route messages to client.
pub routing_did: String,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct RoutedMessage {
#[serde(rename = "_id")]
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub message: Message,
pub recipient_did: String,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Secrets {
Expand Down
9 changes: 6 additions & 3 deletions crates/plugins/mediator-coordination/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use mongodb::{ options::ClientOptions, Client, Database};
use std::sync::Arc;

use crate::{
repository::stateful::coord::{MongoConnectionRepository, MongoSecretsRepository},
repository::stateful::coord::{MongoConnectionRepository, MongoMessagesRepository, MongoSecretsRepository},
util,
web::{self, AppState, AppStateRepository},
};
Expand Down Expand Up @@ -95,8 +95,11 @@ impl Plugin for MediatorCoordinationPlugin {

// Load persistence layer
let repository = AppStateRepository {
connection_repository: Arc::new(MongoConnectionRepository::from_db(db)),
secret_repository: Arc::new(MongoSecretsRepository::from_db(db)),

connection_repository: Arc::new(MongoConnectionRepository::from_db(&db)),
secret_repository: Arc::new(MongoSecretsRepository::from_db(&db)),
message_repository: Arc::new(MongoMessagesRepository::from_db(&db))

};

// Compile state
Expand Down
170 changes: 169 additions & 1 deletion crates/plugins/mediator-coordination/src/repository/stateful/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use mongodb::{
};

use crate::{
model::stateful::entity::{Connection, Secrets},
model::stateful::entity::{Connection, RoutedMessage, Secrets},
repository::traits::{Entity, Repository, RepositoryError},
};

Expand Down Expand Up @@ -117,7 +117,86 @@ impl MongoSecretsRepository {
}
}
}
pub struct MongoMessagesRepository {
collection: Collection<RoutedMessage>,
}
impl MongoMessagesRepository {
pub fn from_db(db: &Database) -> Self {
Self {
collection: db.collection("messages"),
}
}
}
#[async_trait]
impl Repository<RoutedMessage> for MongoMessagesRepository {
async fn find_all(&self) -> Result<Vec<RoutedMessage>, RepositoryError> {
let mut messages: Vec<RoutedMessage> = vec![];

// Retrieve all messages from the database
let mut cursor = self.collection.find(None, None).await?;
while cursor.advance().await? {
messages.push(cursor.deserialize_current()?);
}

Ok(messages)
}
async fn find_one(&self, message_id: ObjectId) -> Result<Option<RoutedMessage>, RepositoryError> {
// Query the database for the specified message ID
self.find_one_by(doc! {"_id": message_id}).await
}
async fn find_one_by(&self, filter: BsonDocument) -> Result<Option<RoutedMessage>, RepositoryError> {
// Query the database for the specified message ID
Ok(self.collection.find_one(filter, None).await?)
}
async fn store(&self, message: RoutedMessage) -> Result<RoutedMessage, RepositoryError> {
// Insert the new message into the database
let metadata = self.collection.insert_one(message.clone(), None).await?;

// Return persisted message
Ok(match metadata.inserted_id {
Bson::ObjectId(oid) => RoutedMessage {
id: Some(oid),
..message
},
_ => unreachable!(),
})
}
async fn delete_one(&self, message_id: ObjectId) -> Result<(), RepositoryError> {
// Delete the connection from the database
let metadata = self
.collection
.delete_one(doc! {"_id": message_id}, None)
.await?;

if metadata.deleted_count > 0 {
Ok(())
} else {
Err(RepositoryError::TargetNotFound)
}
}

async fn update(&self, message: RoutedMessage) -> Result<RoutedMessage, RepositoryError> {
if message.id.is_none() {
return Err(RepositoryError::MissingIdentifier);
}

// Update the message in the database
let metadata = self
.collection
.update_one(
doc! {"_id": message.id.unwrap()},
doc! {"$set": bson::to_document(&message).map_err(|_| RepositoryError::BsonConversionError)?},
None,
)
.await?;

if metadata.matched_count > 0 {
Ok(message)
} else {
Err(RepositoryError::TargetNotFound)
}
}
}
#[async_trait]
impl Repository<Secrets> for MongoSecretsRepository {
async fn find_all(&self) -> Result<Vec<Secrets>, RepositoryError> {
Expand Down Expand Up @@ -315,6 +394,16 @@ pub mod tests {
}
}
}
pub struct MockMessagesRepository {
messages: RwLock<Vec<RoutedMessage>>,
}
impl MockMessagesRepository {
pub fn from(messages: Vec<RoutedMessage>) -> Self {
Self {
messages: RwLock::new(messages),
}
}
}

#[async_trait]
impl Repository<Secrets> for MockSecretsRepository {
Expand Down Expand Up @@ -391,4 +480,83 @@ pub mod tests {
}
}
}

#[async_trait]
impl Repository<RoutedMessage> for MockMessagesRepository {
async fn find_all(&self) -> Result<Vec<RoutedMessage>, RepositoryError> {
Ok(self.messages.read().unwrap().clone())
}

async fn find_one(
&self,
secrets_id: ObjectId,
) -> Result<Option<RoutedMessage>, RepositoryError> {
self.find_one_by(doc! {"_id": secrets_id}).await
}

async fn find_one_by(
&self,
filter: BsonDocument,
) -> Result<Option<RoutedMessage>, RepositoryError> {
let filter: HashMap<String, Bson> = filter.into_iter().collect();

Ok(self
.messages
.read()
.unwrap()
.iter()
.find(|s| {
if let Some(id) = filter.get("_id") {
if json!(s.id) != json!(id) {
return false;
}
}

true
})
.cloned())
}

async fn store(&self, messages: RoutedMessage) -> Result<RoutedMessage, RepositoryError> {
// Add new entity to collection
self.messages.write().unwrap().push(messages.clone());

// Return added entity
Ok(messages)
}

async fn update(&self, messages: RoutedMessage) -> Result<RoutedMessage, RepositoryError> {
// Find entity to update
let pos = self
.messages
.read()
.unwrap()
.iter()
.position(|c| c.id == messages.id);

if let Some(pos) = pos {
self.messages.write().unwrap()[pos] = messages.clone();
Ok(messages)
} else {
Err(RepositoryError::TargetNotFound)
}
}

async fn delete_one(&self, message_id: ObjectId) -> Result<(), RepositoryError> {
// Find entity to delete
let pos = self
.messages
.read()
.unwrap()
.iter()
.position(|s| s.id == Some(message_id));

if let Some(pos) = pos {
self.messages.write().unwrap().remove(pos);
Ok(())
} else {
Err(RepositoryError::TargetNotFound)
}
}
}
}
4 changes: 4 additions & 0 deletions crates/plugins/mediator-coordination/src/web/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub enum MediationError {
UnsupportedDidMethod,
#[error("unsupported operation")]
UnsupportedOperation,
#[error("Could not store Message")]
PersisenceError,
#[error("Could not deserialize Message")]
DeserializationError
}

impl MediationError {
Expand Down
5 changes: 4 additions & 1 deletion crates/plugins/mediator-coordination/src/web/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ pub mod tests {

use crate::{
didcomm::bridge::LocalSecretsResolver,
repository::stateful::coord::tests::{MockConnectionRepository, MockSecretsRepository},
repository::stateful::coord::tests::{
MockConnectionRepository, MockMessagesRepository, MockSecretsRepository,
},
util::{self, MockFileSystem},
web::{self, AppStateRepository},
};
Expand All @@ -102,6 +104,7 @@ pub mod tests {
let repository = AppStateRepository {
connection_repository: Arc::new(MockConnectionRepository::from(vec![])),
secret_repository: Arc::new(MockSecretsRepository::from(vec![])),
message_repository: Arc::new(MockMessagesRepository::from(vec![])),
};

let state = Arc::new(AppState::from(
Expand Down
14 changes: 7 additions & 7 deletions crates/plugins/mediator-coordination/src/web/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod coord;
mod error;
pub mod error;
mod handler;
mod midlw;

Expand All @@ -10,8 +10,7 @@ use std::sync::Arc;

use crate::{
didcomm::bridge::{LocalDIDResolver, LocalSecretsResolver},
model::stateful::entity::Connection,
model::stateful::entity::Secrets,
model::stateful::entity::{Connection, RoutedMessage, Secrets},
repository::traits::Repository,
util,
};
Expand All @@ -34,21 +33,22 @@ pub struct AppState {
public_domain: String,

// Crypto identity
diddoc: Document,
pub diddoc: Document,
assertion_jwk: (String, Jwk),

// DIDComm Resolvers
did_resolver: LocalDIDResolver,
secrets_resolver: LocalSecretsResolver,
pub did_resolver: LocalDIDResolver,
pub secrets_resolver: LocalSecretsResolver,

// Persistence layer
repository: Option<AppStateRepository>,
pub repository: Option<AppStateRepository>,
}

#[derive(Clone)]
pub struct AppStateRepository {
pub connection_repository: Arc<dyn Repository<Connection>>,
pub secret_repository: Arc<dyn Repository<Secrets>>,
pub message_repository: Arc<dyn Repository<RoutedMessage>>,
}

impl AppState {
Expand Down

0 comments on commit 4805e0b

Please sign in to comment.