diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/repository/ConnectionRepository.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/repository/ConnectionRepository.scala index cf68e8cf25..44fc1300d3 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/repository/ConnectionRepository.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/repository/ConnectionRepository.scala @@ -5,7 +5,7 @@ import io.iohk.atala.connect.core.model.ConnectionRecord.ProtocolState import io.iohk.atala.mercury.protocol.connection.* import io.iohk.atala.shared.models.WalletAccessContext import zio.RIO - +import zio.Task import java.util.UUID trait ConnectionRepository { @@ -19,6 +19,12 @@ trait ConnectionRepository { states: ConnectionRecord.ProtocolState* ): RIO[WalletAccessContext, Seq[ConnectionRecord]] + def getConnectionRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): Task[Seq[ConnectionRecord]] + def getConnectionRecord(recordId: UUID): RIO[WalletAccessContext, Option[ConnectionRecord]] def deleteConnectionRecord(recordId: UUID): RIO[WalletAccessContext, Int] @@ -50,4 +56,5 @@ trait ConnectionRepository { recordId: UUID, failReason: Option[String], ): RIO[WalletAccessContext, Int] + } diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/repository/ConnectionRepositoryInMemory.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/repository/ConnectionRepositoryInMemory.scala index 2e91de7ebf..2495f8a070 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/repository/ConnectionRepositoryInMemory.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/repository/ConnectionRepositoryInMemory.scala @@ -155,6 +155,30 @@ class ConnectionRepositoryInMemory(walletRefs: Ref[Map[WalletId, Ref[Map[UUID, C .getOrElse(ZIO.succeed(0)) } yield count + def updateAfterFailForAllWallets( + recordId: UUID, + failReason: Option[String], + ): Task[Int] = walletRefs.get.flatMap { wallets => + ZIO.foldLeft(wallets.values)(0) { (acc, walletRef) => + for { + records <- walletRef.get + count <- records.get(recordId) match { + case Some(record) => + walletRef.update { r => + r.updated( + recordId, + record.copy( + metaRetries = record.metaRetries - 1, + metaLastFailure = failReason + ) + ) + } *> ZIO.succeed(1) // Record updated, count as 1 update + case None => ZIO.succeed(0) // No record updated + } + } yield acc + count + } + } + override def getConnectionRecordByThreadId(thid: String): RIO[WalletAccessContext, Option[ConnectionRecord]] = { for { storeRef <- walletStoreRef @@ -183,6 +207,27 @@ class ConnectionRepositoryInMemory(walletRefs: Ref[Map[WalletId, Ref[Map[UUID, C .toSeq } + override def getConnectionRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): Task[Seq[ConnectionRecord]] = { + + for { + refs <- walletRefs.get + stores <- ZIO.foreach(refs.values.toList)(_.get) + } yield { + stores + .flatMap(_.values) + .filter { rec => + (!ignoreWithZeroRetries || rec.metaRetries > 0) && + states.contains(rec.protocolState) + } + .take(limit) + .toSeq + } + } + override def createConnectionRecord(record: ConnectionRecord): RIO[WalletAccessContext, Int] = { for { _ <- for { diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionService.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionService.scala index be4d8b240e..de7e9e387b 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionService.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionService.scala @@ -53,6 +53,12 @@ trait ConnectionService { states: ConnectionRecord.ProtocolState* ): ZIO[WalletAccessContext, ConnectionServiceError, Seq[ConnectionRecord]] + def getConnectionRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): IO[ConnectionServiceError, Seq[ConnectionRecord]] + def getConnectionRecord(recordId: UUID): ZIO[WalletAccessContext, ConnectionServiceError, Option[ConnectionRecord]] def getConnectionRecordByThreadId( @@ -65,5 +71,4 @@ trait ConnectionService { recordId: UUID, failReason: Option[String] ): ZIO[WalletAccessContext, ConnectionServiceError, Unit] - } diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceImpl.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceImpl.scala index 4b7af223a7..0cf609b5b9 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceImpl.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceImpl.scala @@ -74,6 +74,18 @@ private class ConnectionServiceImpl( } yield records } + override def getConnectionRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ProtocolState* + ): IO[ConnectionServiceError, Seq[ConnectionRecord]] = { + for { + records <- connectionRepository + .getConnectionRecordsByStatesForAllWallets(ignoreWithZeroRetries, limit, states: _*) + .mapError(RepositoryError.apply) + } yield records + } + override def getConnectionRecord( recordId: UUID ): ZIO[WalletAccessContext, ConnectionServiceError, Option[ConnectionRecord]] = { diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala index dce8f7989d..a6e26df880 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/ConnectionServiceNotifier.scala @@ -7,7 +7,7 @@ import io.iohk.atala.mercury.model.DidId import io.iohk.atala.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse} import io.iohk.atala.shared.models.WalletAccessContext import zio.{URLayer, ZIO, ZLayer} - +import zio.IO import java.time.Duration import java.util.UUID @@ -109,6 +109,13 @@ class ConnectionServiceNotifier( states: ConnectionRecord.ProtocolState* ): ZIO[WalletAccessContext, ConnectionServiceError, Seq[ConnectionRecord]] = svc.getConnectionRecordsByStates(ignoreWithZeroRetries, limit, states: _*) + + override def getConnectionRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): IO[ConnectionServiceError, Seq[ConnectionRecord]] = + svc.getConnectionRecordsByStatesForAllWallets(ignoreWithZeroRetries, limit, states: _*) } object ConnectionServiceNotifier { diff --git a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala index 2ba4ae7658..0403629c39 100644 --- a/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala +++ b/connect/lib/core/src/main/scala/io/iohk/atala/connect/core/service/MockConnectionService.scala @@ -73,6 +73,12 @@ object MockConnectionService extends Mock[ConnectionService] { states: ConnectionRecord.ProtocolState* ): IO[ConnectionServiceError, Seq[ConnectionRecord]] = ??? + override def getConnectionRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): IO[ConnectionServiceError, Seq[ConnectionRecord]] = ??? + override def getConnectionRecord(recordId: UUID): IO[ConnectionServiceError, Option[ConnectionRecord]] = ??? override def getConnectionRecordByThreadId(thid: String): IO[ConnectionServiceError, Option[ConnectionRecord]] = diff --git a/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/repository/ConnectionRepositorySpecSuite.scala b/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/repository/ConnectionRepositorySpecSuite.scala index 6cbf087338..0049d0e990 100644 --- a/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/repository/ConnectionRepositorySpecSuite.scala +++ b/connect/lib/core/src/test/scala/io/iohk/atala/connect/core/repository/ConnectionRepositorySpecSuite.scala @@ -395,6 +395,46 @@ object ConnectionRepositorySpecSuite { wallet1Record <- repo.getConnectionRecord(record.id).provide(wac1) wallet2Record <- repo.getConnectionRecord(record.id).provide(wac2) } yield assertTrue(wallet1Record.isDefined) && assertTrue(wallet2Record.isEmpty) - } + }, + test("getConnectionRecordsByStatesForAllWallets returns correct records for all wallets") { + val walletId1 = WalletId.random + val walletId2 = WalletId.random + for { + repo <- ZIO.service[ConnectionRepository] + + wac1 = ZLayer.succeed(WalletAccessContext(walletId1)) + wac2 = ZLayer.succeed(WalletAccessContext(walletId2)) + aRecordWallet1 = connectionRecord + bRecordWallet2 = connectionRecord + _ <- repo.createConnectionRecord(aRecordWallet1).provide(wac1) + _ <- repo.createConnectionRecord(bRecordWallet2).provide(wac2) + _ <- repo + .updateConnectionProtocolState( + aRecordWallet1.id, + ProtocolState.InvitationGenerated, + ProtocolState.ConnectionRequestReceived, + 1 + ) + .provide(wac1) + _ <- repo + .updateConnectionProtocolState( + bRecordWallet2.id, + ProtocolState.InvitationGenerated, + ProtocolState.ConnectionResponsePending, + 1 + ) + .provide(wac2) + allWalletRecords <- repo.getConnectionRecordsByStatesForAllWallets( + ignoreWithZeroRetries = true, + limit = 10, + ProtocolState.ConnectionRequestReceived, + ProtocolState.ConnectionResponsePending + ) + } yield { + assertTrue(allWalletRecords.size == 2) && + assertTrue(allWalletRecords.exists(_.id == aRecordWallet1.id)) && + assertTrue(allWalletRecords.exists(_.id == bRecordWallet2.id)) + } + }, ) } diff --git a/connect/lib/sql-doobie/src/main/scala/io/iohk/atala/connect/sql/repository/JdbcConnectionRepository.scala b/connect/lib/sql-doobie/src/main/scala/io/iohk/atala/connect/sql/repository/JdbcConnectionRepository.scala index 8c3247bc8e..e8249e0490 100644 --- a/connect/lib/sql-doobie/src/main/scala/io/iohk/atala/connect/sql/repository/JdbcConnectionRepository.scala +++ b/connect/lib/sql-doobie/src/main/scala/io/iohk/atala/connect/sql/repository/JdbcConnectionRepository.scala @@ -18,11 +18,12 @@ import io.iohk.atala.shared.db.Implicits.* import io.iohk.atala.shared.models.WalletAccessContext import org.postgresql.util.PSQLException import zio.* - +import zio.interop.catz.* import java.time.Instant import java.util.UUID +import doobie.free.connection -class JdbcConnectionRepository(xa: Transactor[ContextAwareTask]) extends ConnectionRepository { +class JdbcConnectionRepository(xa: Transactor[ContextAwareTask], xb: Transactor[Task]) extends ConnectionRepository { // given logHandler: LogHandler = LogHandler.jdkLogHandler @@ -114,9 +115,25 @@ class JdbcConnectionRepository(xa: Transactor[ContextAwareTask]) extends Connect limit: Int, states: ConnectionRecord.ProtocolState* ): RIO[WalletAccessContext, Seq[ConnectionRecord]] = { + getRecordsByStates(ignoreWithZeroRetries, limit, states: _*).transactWallet(xa) + } + + override def getConnectionRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): Task[Seq[ConnectionRecord]] = { + getRecordsByStates(ignoreWithZeroRetries, limit, states: _*).transact(xb) + } + + private def getRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ConnectionRecord.ProtocolState* + ): ConnectionIO[Seq[ConnectionRecord]] = { states match case Nil => - ZIO.succeed(Nil) + connection.pure(Nil) case head +: tail => val nel = NonEmptyList.of(head, tail: _*) val inClauseFragment = Fragments.in(fr"protocol_state", nel) @@ -148,7 +165,6 @@ class JdbcConnectionRepository(xa: Transactor[ContextAwareTask]) extends Connect .to[Seq] cxnIO - .transactWallet(xa) } override def getConnectionRecord(recordId: UUID): RIO[WalletAccessContext, Option[ConnectionRecord]] = { @@ -298,10 +314,8 @@ class JdbcConnectionRepository(xa: Transactor[ContextAwareTask]) extends Connect """.stripMargin.update cxnIO.run.transactWallet(xa) } - } - object JdbcConnectionRepository { - val layer: URLayer[Transactor[ContextAwareTask], ConnectionRepository] = - ZLayer.fromFunction(new JdbcConnectionRepository(_)) + val layer: URLayer[Transactor[ContextAwareTask] & Transactor[Task], ConnectionRepository] = + ZLayer.fromFunction(new JdbcConnectionRepository(_, _)) } diff --git a/connect/lib/sql-doobie/src/test/scala/io/iohk/atala/connect/sql/repository/JdbcConnectionRepositorySpec.scala b/connect/lib/sql-doobie/src/test/scala/io/iohk/atala/connect/sql/repository/JdbcConnectionRepositorySpec.scala index fd1430befd..d4864bf975 100644 --- a/connect/lib/sql-doobie/src/test/scala/io/iohk/atala/connect/sql/repository/JdbcConnectionRepositorySpec.scala +++ b/connect/lib/sql-doobie/src/test/scala/io/iohk/atala/connect/sql/repository/JdbcConnectionRepositorySpec.scala @@ -20,7 +20,8 @@ object JdbcConnectionRepositorySpec extends ZIOSpecDefault, PostgresTestContaine Migrations.layer, dbConfig, pgContainerLayer, - contextAwareTransactorLayer + contextAwareTransactorLayer, + systemTransactorLayer ) override def spec: Spec[TestEnvironment with Scope, Any] = diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepository.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepository.scala index 7dadaf30c9..593f2e7725 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepository.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepository.scala @@ -21,6 +21,12 @@ trait CredentialRepository { states: IssueCredentialRecord.ProtocolState* ): RIO[WalletAccessContext, Seq[IssueCredentialRecord]] + def getIssueCredentialRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): Task[Seq[IssueCredentialRecord]] + def getIssueCredentialRecordByThreadId( thid: DidCommID, ignoreWithZeroRetries: Boolean, diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepositoryInMemory.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepositoryInMemory.scala index 399c51c0b3..f69e3b94e4 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepositoryInMemory.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/CredentialRepositoryInMemory.scala @@ -195,6 +195,26 @@ class CredentialRepositoryInMemory( .toSeq } + override def getIssueCredentialRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ProtocolState* + ): Task[Seq[IssueCredentialRecord]] = { + for { + refs <- walletRefs.get + stores <- ZIO.foreach(refs.values.toList)(_.get) + } yield { + stores + .flatMap(_.values) + .filter { rec => + (!ignoreWithZeroRetries || rec.metaRetries > 0) && + states.contains(rec.protocolState) + } + .take(limit) + .toSeq + } + } + override def getIssueCredentialRecordByThreadId( thid: DidCommID, ignoreWithZeroRetries: Boolean, diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/PresentationRepository.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/PresentationRepository.scala index 544e600978..506a5b6e66 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/PresentationRepository.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/PresentationRepository.scala @@ -15,6 +15,13 @@ trait PresentationRepository { limit: Int, states: PresentationRecord.ProtocolState* ): RIO[WalletAccessContext, Seq[PresentationRecord]] + + def getPresentationRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: PresentationRecord.ProtocolState* + ): Task[Seq[PresentationRecord]] + def getPresentationRecordByThreadId(thid: DidCommID): RIO[WalletAccessContext, Option[PresentationRecord]] def updatePresentationRecordProtocolState( diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/PresentationRepositoryInMemory.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/PresentationRepositoryInMemory.scala index 43b967e1e9..d4a663d218 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/PresentationRepositoryInMemory.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/repository/PresentationRepositoryInMemory.scala @@ -157,6 +157,26 @@ class PresentationRepositoryInMemory( .toSeq } + override def getPresentationRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: ProtocolState* + ): Task[Seq[PresentationRecord]] = { + for { + refs <- walletRefs.get + stores <- ZIO.foreach(refs.values.toList)(_.get) + } yield { + stores + .flatMap(_.values) + .filter { rec => + (!ignoreWithZeroRetries || rec.metaRetries > 0) && + states.contains(rec.protocolState) + } + .take(limit) + .toSeq + } + } + override def getPresentationRecordByThreadId( thid: DidCommID, ): RIO[WalletAccessContext, Option[PresentationRecord]] = { diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialService.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialService.scala index 05387e4deb..d6a81adcbf 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialService.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialService.scala @@ -51,6 +51,12 @@ trait CredentialService { states: IssueCredentialRecord.ProtocolState* ): ZIO[WalletAccessContext, CredentialServiceError, Seq[IssueCredentialRecord]] + def getIssueCredentialRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): IO[CredentialServiceError, Seq[IssueCredentialRecord]] + def getIssueCredentialRecord( recordId: DidCommID ): ZIO[WalletAccessContext, CredentialServiceError, Option[IssueCredentialRecord]] diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceImpl.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceImpl.scala index fa220b85e4..2550e62047 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceImpl.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceImpl.scala @@ -230,6 +230,18 @@ private class CredentialServiceImpl( } yield records } + override def getIssueCredentialRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): IO[CredentialServiceError, Seq[IssueCredentialRecord]] = { + for { + records <- credentialRepository + .getIssueCredentialRecordsByStatesForAllWallets(ignoreWithZeroRetries, limit, states: _*) + .mapError(RepositoryError.apply) + } yield records + } + override def receiveCredentialOffer( offer: OfferCredential ): ZIO[WalletAccessContext, CredentialServiceError, IssueCredentialRecord] = { diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifier.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifier.scala index 4773933051..2474208493 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifier.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/CredentialServiceNotifier.scala @@ -8,7 +8,7 @@ import io.iohk.atala.mercury.protocol.issuecredential.{IssueCredential, OfferCre import io.iohk.atala.pollux.core.model.error.CredentialServiceError import io.iohk.atala.pollux.core.model.{DidCommID, IssueCredentialRecord} import io.iohk.atala.shared.models.WalletAccessContext -import zio.{URLayer, ZIO, ZLayer} +import zio.{URLayer, ZIO, ZLayer, IO} import java.util.UUID @@ -171,6 +171,13 @@ class CredentialServiceNotifier( states: IssueCredentialRecord.ProtocolState* ): ZIO[WalletAccessContext, CredentialServiceError, Seq[IssueCredentialRecord]] = svc.getIssueCredentialRecordsByStates(ignoreWithZeroRetries, limit, states: _*) + + override def getIssueCredentialRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): IO[CredentialServiceError, Seq[IssueCredentialRecord]] = + svc.getIssueCredentialRecordsByStatesForAllWallets(ignoreWithZeroRetries, limit, states: _*) } object CredentialServiceNotifier { diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockCredentialService.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockCredentialService.scala index 897ac2ac9b..36b5fcec3a 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockCredentialService.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockCredentialService.scala @@ -190,6 +190,13 @@ object MockCredentialService extends Mock[CredentialService] { ): IO[CredentialServiceError, Seq[IssueCredentialRecord]] = ??? + override def getIssueCredentialRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): IO[CredentialServiceError, Seq[IssueCredentialRecord]] = + ??? + override def getIssueCredentialRecord( recordId: DidCommID ): IO[CredentialServiceError, Option[IssueCredentialRecord]] = diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockPresentationService.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockPresentationService.scala index a990968139..8376f80d90 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockPresentationService.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/MockPresentationService.scala @@ -134,6 +134,12 @@ object MockPresentationService extends Mock[PresentationService] { state: PresentationRecord.ProtocolState* ): IO[PresentationError, Seq[PresentationRecord]] = ??? + override def getPresentationRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + state: PresentationRecord.ProtocolState* + ): IO[PresentationError, Seq[PresentationRecord]] = ??? + override def getPresentationRecord(recordId: DidCommID): IO[PresentationError, Option[PresentationRecord]] = ??? override def getPresentationRecordByThreadId(thid: DidCommID): IO[PresentationError, Option[PresentationRecord]] = diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationService.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationService.scala index 193ece102a..5b314426bd 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationService.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationService.scala @@ -43,6 +43,12 @@ trait PresentationService { state: PresentationRecord.ProtocolState* ): ZIO[WalletAccessContext, PresentationError, Seq[PresentationRecord]] + def getPresentationRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + state: PresentationRecord.ProtocolState* + ): IO[PresentationError, Seq[PresentationRecord]] + def getPresentationRecord( recordId: DidCommID ): ZIO[WalletAccessContext, PresentationError, Option[PresentationRecord]] diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceImpl.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceImpl.scala index d3fe5f00ce..afbe637bd6 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceImpl.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceImpl.scala @@ -207,6 +207,18 @@ private class PresentationServiceImpl( } yield records } + override def getPresentationRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: PresentationRecord.ProtocolState* + ): IO[PresentationError, Seq[PresentationRecord]] = { + for { + records <- presentationRepository + .getPresentationRecordsByStatesForAllWallets(ignoreWithZeroRetries, limit, states: _*) + .mapError(RepositoryError.apply) + } yield records + } + override def receiveRequestPresentation( connectionId: Option[String], request: RequestPresentation diff --git a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifier.scala b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifier.scala index 9bc6de060d..287b1b2ae5 100644 --- a/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifier.scala +++ b/pollux/lib/core/src/main/scala/io/iohk/atala/pollux/core/service/PresentationServiceNotifier.scala @@ -8,7 +8,7 @@ import io.iohk.atala.pollux.core.model.presentation.Options import io.iohk.atala.pollux.core.model.{DidCommID, PresentationRecord} import io.iohk.atala.pollux.vc.jwt.{Issuer, PresentationPayload, W3cCredentialPayload} import io.iohk.atala.shared.models.WalletAccessContext -import zio.{URLayer, ZIO, ZLayer} +import zio.{URLayer, ZIO, ZLayer, IO} import java.time.Instant import java.util.UUID @@ -142,6 +142,13 @@ class PresentationServiceNotifier( ): ZIO[WalletAccessContext, PresentationError, Seq[PresentationRecord]] = svc.getPresentationRecordsByStates(ignoreWithZeroRetries, limit, state: _*) + override def getPresentationRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + state: PresentationRecord.ProtocolState* + ): IO[PresentationError, Seq[PresentationRecord]] = + svc.getPresentationRecordsByStatesForAllWallets(ignoreWithZeroRetries, limit, state: _*) + override def getPresentationRecord( recordId: DidCommID ): ZIO[WalletAccessContext, PresentationError, Option[PresentationRecord]] = diff --git a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/repository/CredentialRepositorySpecSuite.scala b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/repository/CredentialRepositorySpecSuite.scala index f4084173f9..21219a6366 100644 --- a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/repository/CredentialRepositorySpecSuite.scala +++ b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/repository/CredentialRepositorySpecSuite.scala @@ -490,6 +490,39 @@ object CredentialRepositorySpecSuite { count1 <- repo.createIssueCredentialRecord(record1).provide(wallet1) delete1 <- repo.deleteIssueCredentialRecord(record1.id).provide(wallet2) } yield assert(count1)(equalTo(1)) && assert(delete1)(isZero) - } + }, + test("getIssueCredentialRecordsByStatesForAllWallets should return all the records") { + val walletId1 = WalletId.random + val walletId2 = WalletId.random + val wallet1 = ZLayer.succeed(WalletAccessContext(walletId1)) + val wallet2 = ZLayer.succeed(WalletAccessContext(walletId2)) + for { + repo <- ZIO.service[CredentialRepository] + record1 = issueCredentialRecord + record2 = issueCredentialRecord + count1 <- repo.createIssueCredentialRecord(record1).provide(wallet1) + count2 <- repo.createIssueCredentialRecord(record2).provide(wallet2) + _ <- repo + .updateCredentialRecordProtocolState(record1.id, ProtocolState.OfferPending, ProtocolState.OfferSent) + .provide(wallet1) + _ <- repo + .updateCredentialRecordProtocolState( + record2.id, + ProtocolState.OfferPending, + ProtocolState.CredentialGenerated + ) + .provide(wallet2) + allRecords <- repo.getIssueCredentialRecordsByStatesForAllWallets( + ignoreWithZeroRetries = true, + limit = 10, + ProtocolState.OfferSent, + ProtocolState.CredentialGenerated + ) + } yield assert(count1)(equalTo(1)) && + assert(count2)(equalTo(1)) && + assertTrue(allRecords.size == 2) && + assertTrue(allRecords.exists(_.id == record1.id)) && + assertTrue(allRecords.exists(_.id == record2.id)) + }, ) } diff --git a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/repository/PresentationRepositorySpecSuite.scala b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/repository/PresentationRepositorySpecSuite.scala index 384d0fa0ba..f81e509345 100644 --- a/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/repository/PresentationRepositorySpecSuite.scala +++ b/pollux/lib/core/src/test/scala/io/iohk/atala/pollux/core/repository/PresentationRepositorySpecSuite.scala @@ -395,6 +395,43 @@ object PresentationRepositorySpecSuite { assert(update1)(isZero) && assert(update2)(isZero) && assert(update3)(isZero) - } + }, + test("getPresentationRecordsByStatesForAllWallets should return all the records") { + val walletId1 = WalletId.random + val walletId2 = WalletId.random + val wallet1 = ZLayer.succeed(WalletAccessContext(walletId1)) + val wallet2 = ZLayer.succeed(WalletAccessContext(walletId2)) + for { + repo <- ZIO.service[PresentationRepository] + record1 = presentationRecord + record2 = presentationRecord + count1 <- repo.createPresentationRecord(record1).provide(wallet1) + count2 <- repo.createPresentationRecord(record2).provide(wallet2) + _ <- repo + .updatePresentationRecordProtocolState( + record1.id, + ProtocolState.RequestPending, + ProtocolState.RequestSent + ) + .provide(wallet1) + _ <- repo + .updatePresentationRecordProtocolState( + record2.id, + ProtocolState.RequestPending, + ProtocolState.PresentationReceived + ) + .provide(wallet2) + allRecords <- repo.getPresentationRecordsByStatesForAllWallets( + ignoreWithZeroRetries = true, + limit = 10, + ProtocolState.RequestSent, + ProtocolState.PresentationReceived + ) + } yield assert(count1)(equalTo(1)) && + assert(count2)(equalTo(1)) && + assertTrue(allRecords.size == 2) && + assertTrue(allRecords.exists(_.id == record1.id)) && + assertTrue(allRecords.exists(_.id == record2.id)) + }, ) } diff --git a/pollux/lib/sql-doobie/src/main/scala/io/iohk/atala/pollux/sql/repository/JdbcCredentialRepository.scala b/pollux/lib/sql-doobie/src/main/scala/io/iohk/atala/pollux/sql/repository/JdbcCredentialRepository.scala index 62e273ac97..24cbdb5113 100644 --- a/pollux/lib/sql-doobie/src/main/scala/io/iohk/atala/pollux/sql/repository/JdbcCredentialRepository.scala +++ b/pollux/lib/sql-doobie/src/main/scala/io/iohk/atala/pollux/sql/repository/JdbcCredentialRepository.scala @@ -21,9 +21,12 @@ import org.postgresql.util.PSQLException import zio.* import zio.json.* +import doobie.free.connection import java.time.Instant +import zio.interop.catz.* -class JdbcCredentialRepository(xa: Transactor[ContextAwareTask], maxRetries: Int) extends CredentialRepository { +class JdbcCredentialRepository(xa: Transactor[ContextAwareTask], xb: Transactor[Task], maxRetries: Int) + extends CredentialRepository { // Uncomment to have Doobie LogHandler in scope and automatically output SQL statements in logs // given logHandler: LogHandler = LogHandler.jdkLogHandler @@ -179,14 +182,14 @@ class JdbcCredentialRepository(xa: Transactor[ContextAwareTask], maxRetries: Int effect.transactWallet(xa) } - override def getIssueCredentialRecordsByStates( + private def getRecordsByStates( ignoreWithZeroRetries: Boolean, limit: Int, states: IssueCredentialRecord.ProtocolState* - ): RIO[WalletAccessContext, Seq[IssueCredentialRecord]] = { + ): ConnectionIO[Seq[IssueCredentialRecord]] = { states match case Nil => - ZIO.succeed(Nil) + connection.pure(Nil) case head +: tail => val nel = NonEmptyList.of(head, tail: _*) val inClauseFragment = Fragments.in(fr"protocol_state", nel) @@ -224,11 +227,23 @@ class JdbcCredentialRepository(xa: Transactor[ContextAwareTask], maxRetries: Int """.stripMargin .query[IssueCredentialRecord] .to[Seq] - cxnIO - .transactWallet(xa) + } + override def getIssueCredentialRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): RIO[WalletAccessContext, Seq[IssueCredentialRecord]] = { + getRecordsByStates(ignoreWithZeroRetries, limit, states: _*).transactWallet(xa) } + override def getIssueCredentialRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: IssueCredentialRecord.ProtocolState* + ): Task[Seq[IssueCredentialRecord]] = { + getRecordsByStates(ignoreWithZeroRetries, limit, states: _*).transact(xb) + } override def getIssueCredentialRecord( recordId: DidCommID ): RIO[WalletAccessContext, Option[IssueCredentialRecord]] = { @@ -483,6 +498,6 @@ class JdbcCredentialRepository(xa: Transactor[ContextAwareTask], maxRetries: Int object JdbcCredentialRepository { val maxRetries = 5 // TODO Move to config - val layer: URLayer[Transactor[ContextAwareTask], CredentialRepository] = - ZLayer.fromFunction(new JdbcCredentialRepository(_, maxRetries)) + val layer: URLayer[Transactor[ContextAwareTask] & Transactor[Task], CredentialRepository] = + ZLayer.fromFunction(new JdbcCredentialRepository(_, _, maxRetries)) } diff --git a/pollux/lib/sql-doobie/src/main/scala/io/iohk/atala/pollux/sql/repository/JdbcPresentationRepository.scala b/pollux/lib/sql-doobie/src/main/scala/io/iohk/atala/pollux/sql/repository/JdbcPresentationRepository.scala index e390119797..ac5cc921b9 100644 --- a/pollux/lib/sql-doobie/src/main/scala/io/iohk/atala/pollux/sql/repository/JdbcPresentationRepository.scala +++ b/pollux/lib/sql-doobie/src/main/scala/io/iohk/atala/pollux/sql/repository/JdbcPresentationRepository.scala @@ -18,12 +18,13 @@ import io.iohk.atala.shared.db.Implicits.* import io.iohk.atala.shared.models.WalletAccessContext import io.iohk.atala.shared.utils.BytesOps import zio.* - +import doobie.free.connection import java.time.Instant - +import zio.interop.catz.* // TODO: replace with actual implementation class JdbcPresentationRepository( xa: Transactor[ContextAwareTask], + xb: Transactor[Task], maxRetries: Int ) extends PresentationRepository { // serializes into hex string @@ -173,14 +174,14 @@ class JdbcPresentationRepository( .transactWallet(xa) } - override def getPresentationRecordsByStates( + private def getRecordsByStates( ignoreWithZeroRetries: Boolean, limit: Int, states: PresentationRecord.ProtocolState* - ): RIO[WalletAccessContext, Seq[PresentationRecord]] = { + ): ConnectionIO[Seq[PresentationRecord]] = { states match case Nil => - ZIO.succeed(Nil) + connection.pure(Nil) case head +: tail => val nel = NonEmptyList.of(head, tail: _*) val inClauseFragment = Fragments.in(fr"protocol_state", nel) @@ -214,9 +215,21 @@ class JdbcPresentationRepository( """.stripMargin .query[PresentationRecord] .to[Seq] - cxnIO - .transactWallet(xa) + } + override def getPresentationRecordsByStates( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: PresentationRecord.ProtocolState* + ): RIO[WalletAccessContext, Seq[PresentationRecord]] = { + getRecordsByStates(ignoreWithZeroRetries, limit, states: _*).transactWallet(xa) + } + override def getPresentationRecordsByStatesForAllWallets( + ignoreWithZeroRetries: Boolean, + limit: Int, + states: PresentationRecord.ProtocolState* + ): Task[Seq[PresentationRecord]] = { + getRecordsByStates(ignoreWithZeroRetries, limit, states: _*).transact(xb) } override def getPresentationRecord(recordId: DidCommID): RIO[WalletAccessContext, Option[PresentationRecord]] = { @@ -389,6 +402,6 @@ class JdbcPresentationRepository( object JdbcPresentationRepository { val maxRetries = 5 // TODO Move to config - val layer: URLayer[Transactor[ContextAwareTask], PresentationRepository] = - ZLayer.fromFunction(new JdbcPresentationRepository(_, maxRetries)) + val layer: URLayer[Transactor[ContextAwareTask] & Transactor[Task], PresentationRepository] = + ZLayer.fromFunction(new JdbcPresentationRepository(_, _, maxRetries)) } diff --git a/pollux/lib/sql-doobie/src/test/scala/io/iohk/atala/pollux/sql/repository/JdbcCredentialRepositorySpec.scala b/pollux/lib/sql-doobie/src/test/scala/io/iohk/atala/pollux/sql/repository/JdbcCredentialRepositorySpec.scala index 10adfebd89..5d012cf38e 100644 --- a/pollux/lib/sql-doobie/src/test/scala/io/iohk/atala/pollux/sql/repository/JdbcCredentialRepositorySpec.scala +++ b/pollux/lib/sql-doobie/src/test/scala/io/iohk/atala/pollux/sql/repository/JdbcCredentialRepositorySpec.scala @@ -20,7 +20,8 @@ object JdbcCredentialRepositorySpec extends ZIOSpecDefault, PostgresTestContaine Migrations.layer, dbConfig, pgContainerLayer, - contextAwareTransactorLayer + contextAwareTransactorLayer, + systemTransactorLayer ) override def spec = diff --git a/pollux/lib/sql-doobie/src/test/scala/io/iohk/atala/pollux/sql/repository/JdbcPresentationRepositorySpec.scala b/pollux/lib/sql-doobie/src/test/scala/io/iohk/atala/pollux/sql/repository/JdbcPresentationRepositorySpec.scala index c70405979c..a50d9deef0 100644 --- a/pollux/lib/sql-doobie/src/test/scala/io/iohk/atala/pollux/sql/repository/JdbcPresentationRepositorySpec.scala +++ b/pollux/lib/sql-doobie/src/test/scala/io/iohk/atala/pollux/sql/repository/JdbcPresentationRepositorySpec.scala @@ -20,7 +20,8 @@ object JdbcPresentationRepositorySpec extends ZIOSpecDefault, PostgresTestContai Migrations.layer, dbConfig, pgContainerLayer, - contextAwareTransactorLayer + contextAwareTransactorLayer, + systemTransactorLayer ) override def spec = diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala index 6e5d8930b5..b6890c9e1e 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/Main.scala @@ -174,11 +174,11 @@ object MainApp extends ZIOAppDefault { RepoModule.allSecretStorageLayer, RepoModule.agentTransactorLayer >>> JdbcEntityRepository.layer, RepoModule.agentTransactorLayer >>> JdbcAuthenticationRepository.layer, - RepoModule.connectContextAwareTransactorLayer >>> JdbcConnectionRepository.layer, - RepoModule.polluxContextAwareTransactorLayer >>> JdbcCredentialRepository.layer, + RepoModule.connectContextAwareTransactorLayer ++ RepoModule.connectTransactorLayer >>> JdbcConnectionRepository.layer, + RepoModule.polluxContextAwareTransactorLayer ++ RepoModule.polluxTransactorLayer >>> JdbcCredentialRepository.layer, RepoModule.polluxContextAwareTransactorLayer ++ RepoModule.polluxTransactorLayer >>> JdbcCredentialSchemaRepository.layer, RepoModule.polluxContextAwareTransactorLayer ++ RepoModule.polluxTransactorLayer >>> JdbcCredentialDefinitionRepository.layer, - RepoModule.polluxContextAwareTransactorLayer >>> JdbcPresentationRepository.layer, + RepoModule.polluxContextAwareTransactorLayer ++ RepoModule.polluxTransactorLayer >>> JdbcPresentationRepository.layer, RepoModule.polluxContextAwareTransactorLayer >>> JdbcVerificationPolicyRepository.layer, // event notification service ZLayer.succeed(500) >>> EventNotificationServiceImpl.layer, diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/PrismAgentApp.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/PrismAgentApp.scala index cb5c06d129..269edb1c5b 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/PrismAgentApp.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/PrismAgentApp.scala @@ -33,6 +33,7 @@ import io.iohk.atala.system.controller.SystemServerEndpoints import zio.* import zio.metrics.* import io.iohk.atala.shared.utils.DurationOps.toMetricsSeconds +import io.iohk.atala.agent.walletapi.storage.DIDNonSecretStorage object PrismAgentApp { @@ -50,68 +51,45 @@ object PrismAgentApp { } yield () private val issueCredentialDidCommExchangesJob: RIO[ - AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & CredentialService & DIDService & - ManagedDIDService & PresentationService & WalletManagementService, + AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & CredentialService & DIDNonSecretStorage & + DIDService & ManagedDIDService & PresentationService & WalletManagementService, Unit ] = for { config <- ZIO.service[AppConfig] - _ <- ZIO - .serviceWithZIO[WalletManagementService](_.listWallets().map(_._1)) - .mapError(_.toThrowable) - .flatMap { wallets => - ZIO.foreach(wallets) { wallet => - IssueBackgroundJobs.issueCredentialDidCommExchanges - .provideSomeLayer(ZLayer.succeed(WalletAccessContext(wallet.id))) @@ Metric - .gauge("issuance_flow_did_com_exchange_job_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - } - } + _ <- IssueBackgroundJobs.issueCredentialDidCommExchanges .repeat(Schedule.spaced(config.pollux.issueBgJobRecurrenceDelay)) - .unit + .unit @@ Metric + .gauge("issuance_flow_did_com_exchange_job_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) } yield () private val presentProofExchangeJob: RIO[ AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & PresentationService & CredentialService & - DIDService & ManagedDIDService & WalletManagementService, + DIDNonSecretStorage & DIDService & ManagedDIDService & WalletManagementService, Unit ] = for { config <- ZIO.service[AppConfig] - _ <- ZIO - .serviceWithZIO[WalletManagementService](_.listWallets().map(_._1)) - .mapError(_.toThrowable) - .flatMap { wallets => - ZIO.foreach(wallets) { wallet => - PresentBackgroundJobs.presentProofExchanges - .provideSomeLayer(ZLayer.succeed(WalletAccessContext(wallet.id))) @@ Metric - .gauge("present_proof_flow_did_com_exchange_job_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - } - } + _ <- PresentBackgroundJobs.presentProofExchanges .repeat(Schedule.spaced(config.pollux.presentationBgJobRecurrenceDelay)) - .unit + .unit @@ Metric + .gauge("present_proof_flow_did_com_exchange_job_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) } yield () private val connectDidCommExchangesJob: RIO[ - AppConfig & DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService & WalletManagementService, + AppConfig & DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService & DIDNonSecretStorage & + WalletManagementService, Unit ] = for { config <- ZIO.service[AppConfig] - _ <- ZIO - .serviceWithZIO[WalletManagementService](_.listWallets().map(_._1)) - .mapError(_.toThrowable) - .flatMap { wallets => - ZIO.foreach(wallets) { wallet => - ConnectBackgroundJobs.didCommExchanges - .provideSomeLayer(ZLayer.succeed(WalletAccessContext(wallet.id))) @@ Metric - .gauge("connection_flow_did_com_exchange_job_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - } - } + _ <- ConnectBackgroundJobs.didCommExchanges .repeat(Schedule.spaced(config.connect.connectBgJobRecurrenceDelay)) - .unit + .unit @@ Metric + .gauge("connection_flow_did_com_exchange_job_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) } yield () private val syncDIDPublicationStateFromDltJob: URIO[ManagedDIDService & WalletManagementService, Unit] = diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobsHelper.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobsHelper.scala index 3da73ba17c..37d1a3e563 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobsHelper.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/BackgroundJobsHelper.scala @@ -10,6 +10,8 @@ import io.iohk.atala.mercury.{AgentPeerService, DidAgent} import io.iohk.atala.pollux.vc.jwt.{ES256KSigner, Issuer as JwtIssuer} import io.iohk.atala.shared.models.WalletAccessContext import zio.{ZIO, ZLayer} +import io.iohk.atala.agent.walletapi.storage.DIDNonSecretStorage +import io.iohk.atala.agent.walletapi.model.error.DIDSecretStorageError.WalletNotFoundError trait BackgroundJobsHelper { @@ -72,4 +74,16 @@ trait BackgroundJobsHelper { } yield agent } + def buildWalletAccessContextLayer( + myDid: DidId + ): ZIO[DIDNonSecretStorage, WalletNotFoundError, WalletAccessContext] = { + for { + nonSecretStorage <- ZIO.service[DIDNonSecretStorage] + maybePeerDIDRecord <- nonSecretStorage.getPeerDIDRecord(myDid).orDie + peerDIDRecord <- ZIO.fromOption(maybePeerDIDRecord).mapError(_ => WalletNotFoundError(myDid)) + _ <- ZIO.logInfo(s"PeerDID record successfully loaded in DIDComm receiver endpoint: $peerDIDRecord") + walletAccessContext = WalletAccessContext(peerDIDRecord.walletId) + } yield walletAccessContext + } + } diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala index dae4e74bb4..819706de04 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/ConnectBackgroundJobs.scala @@ -18,6 +18,8 @@ import io.iohk.atala.shared.utils.DurationOps.toMetricsSeconds import io.iohk.atala.shared.utils.aspects.CustomMetricsAspect import zio.* import zio.metrics.* +import io.iohk.atala.agent.walletapi.storage.DIDNonSecretStorage +import io.iohk.atala.agent.walletapi.model.error.DIDSecretStorageError.WalletNotFoundError object ConnectBackgroundJobs extends BackgroundJobsHelper { @@ -26,7 +28,7 @@ object ConnectBackgroundJobs extends BackgroundJobsHelper { connectionService <- ZIO.service[ConnectionService] config <- ZIO.service[AppConfig] records <- connectionService - .getConnectionRecordsByStates( + .getConnectionRecordsByStatesForAllWallets( ignoreWithZeroRetries = true, limit = config.connect.connectBgJobRecordsLimit, ConnectionRecord.ProtocolState.ConnectionRequestPending, @@ -40,7 +42,7 @@ object ConnectBackgroundJobs extends BackgroundJobsHelper { private[this] def performExchange( record: ConnectionRecord ): URIO[ - DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService & WalletAccessContext & AppConfig, + DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService & DIDNonSecretStorage & AppConfig, Unit ] = { import ProtocolState.* @@ -98,23 +100,25 @@ object ConnectBackgroundJobs extends BackgroundJobsHelper { _ ) if metaRetries > 0 => val inviteeProcessFlow = for { - - didCommAgent <- buildDIDCommAgent(request.from) - resp <- MessagingService.send(request.makeMessage).provideSomeLayer(didCommAgent) @@ Metric - .gauge("connection_flow_invitee_send_connection_request_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - connectionService <- ZIO.service[ConnectionService] - _ <- { - if (resp.status >= 200 && resp.status < 300) - connectionService.markConnectionRequestSent(id) - @@ InviteeConnectionRequestMsgSuccess - @@ CustomMetricsAspect.endRecordingTime( - s"${record.id}_invitee_pending_to_req_sent", - "connection_flow_invitee_pending_to_req_sent_ms_gauge" - ) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ InviteeConnectionRequestMsgFailed - } - } yield () + walletAccessContext <- buildWalletAccessContextLayer(request.from) + result <- (for { + didCommAgent <- buildDIDCommAgent(request.from).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + resp <- MessagingService.send(request.makeMessage).provideSomeLayer(didCommAgent) @@ Metric + .gauge("connection_flow_invitee_send_connection_request_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + connectionService <- ZIO.service[ConnectionService] + _ <- { + if (resp.status >= 200 && resp.status < 300) + connectionService.markConnectionRequestSent(id).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + @@ InviteeConnectionRequestMsgSuccess + @@ CustomMetricsAspect.endRecordingTime( + s"${record.id}_invitee_pending_to_req_sent", + "connection_flow_invitee_pending_to_req_sent_ms_gauge" + ) + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ InviteeConnectionRequestMsgFailed + } + } yield ()).mapError(e => (walletAccessContext, e)) + } yield result // inviteeProcessFlow // TODO decrease metaRetries if it has a error @@ -142,22 +146,26 @@ object ConnectBackgroundJobs extends BackgroundJobsHelper { _ ) if metaRetries > 0 => val inviterProcessFlow = for { - didCommAgent <- buildDIDCommAgent(response.from) - resp <- MessagingService.send(response.makeMessage).provideSomeLayer(didCommAgent) @@ Metric - .gauge("connection_flow_inviter_send_connection_response_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - connectionService <- ZIO.service[ConnectionService] - _ <- { - if (resp.status >= 200 && resp.status < 300) - connectionService.markConnectionResponseSent(id) - @@ InviterConnectionResponseMsgSuccess - @@ CustomMetricsAspect.endRecordingTime( - s"${record.id}_inviter_pending_to_res_sent", - "connection_flow_inviter_pending_to_res_sent_ms_gauge" - ) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ InviterConnectionResponseMsgFailed - } - } yield () + walletAccessContext <- buildWalletAccessContextLayer(response.from) + result <- (for { + didCommAgent <- buildDIDCommAgent(response.from).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + resp <- MessagingService.send(response.makeMessage).provideSomeLayer(didCommAgent) @@ Metric + .gauge("connection_flow_inviter_send_connection_response_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + connectionService <- ZIO.service[ConnectionService] + _ <- { + if (resp.status >= 200 && resp.status < 300) + connectionService.markConnectionResponseSent(id).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + @@ InviterConnectionResponseMsgSuccess + @@ CustomMetricsAspect.endRecordingTime( + s"${record.id}_inviter_pending_to_res_sent", + "connection_flow_inviter_pending_to_res_sent_ms_gauge" + ) + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ InviterConnectionResponseMsgFailed + } + } yield ()).mapError(e => (walletAccessContext, e)) + } yield result + inviterProcessFlow @@ InviterProcessConnectionRecordPendingSuccess.trackSuccess @@ InviterProcessConnectionRecordPendingFailed.trackError @@ -169,19 +177,26 @@ object ConnectBackgroundJobs extends BackgroundJobsHelper { } exchange - .tapError(e => - for { - connectService <- ZIO.service[ConnectionService] - _ <- connectService - .reportProcessingFailure(record.id, Some(e.toString)) - .tapError(err => - ZIO.logErrorCause( - s"Connect - failed to report processing failure: ${record.id}", - Cause.fail(err) + .tapError({ + case walletNotFound: WalletNotFoundError => + ZIO.logErrorCause( + s"Connect - Error processing record: ${record.id}", + Cause.fail(walletNotFound) + ) + case ((walletAccessContext, e)) => + for { + connectService <- ZIO.service[ConnectionService] + _ <- connectService + .reportProcessingFailure(record.id, Some(e.toString)) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) + .tapError(err => + ZIO.logErrorCause( + s"Connect - failed to report processing failure: ${record.id}", + Cause.fail(err) + ) ) - ) - } yield () - ) + } yield () + }) .catchAll(e => ZIO.logErrorCause(s"Connect - Error processing record: ${record.id} ", Cause.fail(e))) .catchAllDefect(d => ZIO.logErrorCause(s"Connect - Defect processing record: ${record.id}", Cause.fail(d))) } diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/IssueBackgroundJobs.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/IssueBackgroundJobs.scala index b9127b4483..e61ce931c1 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/IssueBackgroundJobs.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/IssueBackgroundJobs.scala @@ -12,6 +12,7 @@ import io.iohk.atala.shared.utils.DurationOps.toMetricsSeconds import io.iohk.atala.shared.utils.aspects.CustomMetricsAspect import zio.* import zio.metrics.* +import io.iohk.atala.agent.walletapi.model.error.DIDSecretStorageError.WalletNotFoundError object IssueBackgroundJobs extends BackgroundJobsHelper { @@ -20,7 +21,7 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { credentialService <- ZIO.service[CredentialService] config <- ZIO.service[AppConfig] records <- credentialService - .getIssueCredentialRecordsByStates( + .getIssueCredentialRecordsByStatesForAllWallets( ignoreWithZeroRetries = true, limit = config.pollux.issueBgJobRecordsLimit, IssueCredentialRecord.ProtocolState.OfferPending, @@ -164,24 +165,27 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { ) => val sendOfferFlow = for { _ <- ZIO.log(s"IssueCredentialRecord: OfferPending (START)") - didCommAgent <- buildDIDCommAgent(offer.from) - resp <- MessagingService - .send(offer.makeMessage) - .provideSomeLayer(didCommAgent) @@ Metric - .gauge("issuance_flow_issuer_send_offer_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - credentialService <- ZIO.service[CredentialService] - _ <- { - if (resp.status >= 200 && resp.status < 300) - credentialService.markOfferSent(id) @@ - IssuerSendOfferMsgSucceed @@ - CustomMetricsAspect.endRecordingTime( - s"${record.id}_issuer_offer_pending_to_sent_ms_gauge", - "issuance_flow_issuer_offer_pending_to_sent_ms_gauge" - ) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ IssuerSendOfferMsgFailed - } - } yield () + walletAccessContext <- buildWalletAccessContextLayer(offer.from) + result <- (for { + didCommAgent <- buildDIDCommAgent(offer.from).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + resp <- MessagingService + .send(offer.makeMessage) + .provideSomeLayer(didCommAgent) @@ Metric + .gauge("issuance_flow_issuer_send_offer_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + credentialService <- ZIO.service[CredentialService] + _ <- { + if (resp.status >= 200 && resp.status < 300) + credentialService.markOfferSent(id).provideSomeLayer(ZLayer.succeed(walletAccessContext)) @@ + IssuerSendOfferMsgSucceed @@ + CustomMetricsAspect.endRecordingTime( + s"${record.id}_issuer_offer_pending_to_sent_ms_gauge", + "issuance_flow_issuer_offer_pending_to_sent_ms_gauge" + ) + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ IssuerSendOfferMsgFailed + } + } yield ()).mapError(e => (walletAccessContext, e)) + } yield result sendOfferFlow @@ IssuerSendOfferSucceed.trackSuccess @@ IssuerSendOfferFailed.trackError @@ -204,7 +208,7 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _, _, RequestPending, - Some(_), + Some(offer), None, _, _, @@ -215,9 +219,14 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _ ) => val holderPendingToGeneratedFlow = for { - credentialService <- ZIO.service[CredentialService] - _ <- credentialService.generateJWTCredentialRequest(id) - } yield () + walletAccessContext <- buildWalletAccessContextLayer(offer.to) + result <- (for { + credentialService <- ZIO.service[CredentialService] + _ <- credentialService + .generateJWTCredentialRequest(id) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) + } yield ()).mapError(e => (walletAccessContext, handleCredentialErrors(e))) + } yield result holderPendingToGeneratedFlow @@ HolderPendingToGeneratedSuccess.trackSuccess @@ HolderPendingToGeneratedFailed.trackError @@ -239,7 +248,7 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _, _, RequestPending, - Some(_), + Some(offer), None, _, _, @@ -250,9 +259,14 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _ ) => val holderPendingToGeneratedFlow = for { - credentialService <- ZIO.service[CredentialService] - _ <- credentialService.generateAnonCredsCredentialRequest(id) - } yield () + walletAccessContext <- buildWalletAccessContextLayer(offer.to) + result <- (for { + credentialService <- ZIO.service[CredentialService] + _ <- credentialService + .generateAnonCredsCredentialRequest(id) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) + } yield ()).mapError(e => (walletAccessContext, handleCredentialErrors(e))) + } yield result holderPendingToGeneratedFlow @@ HolderPendingToGeneratedSuccess.trackSuccess @@ HolderPendingToGeneratedFailed.trackError @@ -286,23 +300,28 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _ ) => val holderGeneratedToSentFlow = for { - didCommAgent <- buildDIDCommAgent(request.from) - resp <- MessagingService - .send(request.makeMessage) - .provideSomeLayer(didCommAgent) @@ Metric - .gauge("issuance_flow_holder_send_request_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - credentialService <- ZIO.service[CredentialService] - _ <- { - if (resp.status >= 200 && resp.status < 300) - credentialService.markRequestSent(id) @@ HolderSendReqSucceed - @@ CustomMetricsAspect.endRecordingTime( - s"${record.id}_issuance_flow_holder_req_generated_to_sent", - "issuance_flow_holder_req_generated_to_sent_ms_gauge" - ) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ HolderSendReqFailed - } - } yield () + walletAccessContext <- buildWalletAccessContextLayer(request.from) + result <- (for { + didCommAgent <- buildDIDCommAgent(request.from).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + resp <- MessagingService + .send(request.makeMessage) + .provideSomeLayer(didCommAgent) @@ Metric + .gauge("issuance_flow_holder_send_request_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + credentialService <- ZIO.service[CredentialService] + _ <- { + if (resp.status >= 200 && resp.status < 300) + credentialService + .markRequestSent(id) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) @@ HolderSendReqSucceed + @@ CustomMetricsAspect.endRecordingTime( + s"${record.id}_issuance_flow_holder_req_generated_to_sent", + "issuance_flow_holder_req_generated_to_sent_ms_gauge" + ) + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ HolderSendReqFailed + } + } yield ()).mapError(e => (walletAccessContext, e)) + } yield result holderGeneratedToSentFlow @@ HolderGeneratedToSentSucceed.trackSuccess @@ HolderGeneratedToSentFailed.trackError @@ -326,7 +345,7 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { Some(true), RequestReceived, _, - _, + Some(request), _, _, _, @@ -336,9 +355,12 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _, ) => val issuerReceivedToPendingFlow = for { - credentialService <- ZIO.service[CredentialService] - _ <- credentialService.acceptCredentialRequest(id) - } yield () + walletAccessContext <- buildWalletAccessContextLayer(request.to) + result <- (for { + credentialService <- ZIO.service[CredentialService] + _ <- credentialService.acceptCredentialRequest(id).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + } yield ()).mapError(e => (walletAccessContext, e)) + } yield result issuerReceivedToPendingFlow @@ IssuerReceivedToPendingSuccess.trackSuccess @@ IssuerReceivedToPendingFailed.trackError @@ -375,9 +397,12 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { // Set ProtocolState to CredentialGenerated // TODO Move all logic to service val issuerPendingToGeneratedFlow = for { - credentialService <- ZIO.service[CredentialService] - _ <- credentialService.generateJWTCredential(id) - } yield () + walletAccessContext <- buildWalletAccessContextLayer(issue.from) + result <- (for { + credentialService <- ZIO.service[CredentialService] + _ <- credentialService.generateJWTCredential(id).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + } yield ()).mapError(e => (walletAccessContext, e)) + } yield result issuerPendingToGeneratedFlow @@ IssuerPendingToGeneratedSuccess.trackSuccess @@ IssuerPendingToGeneratedFailed.trackError @@ -410,9 +435,14 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _, ) => val issuerPendingToGeneratedFlow = for { - credentialService <- ZIO.service[CredentialService] - _ <- credentialService.generateAnonCredsCredential(id) - } yield () + walletAccessContext <- buildWalletAccessContextLayer(issue.from) + result <- (for { + credentialService <- ZIO.service[CredentialService] + _ <- credentialService + .generateAnonCredsCredential(id) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) + } yield ()).mapError(e => (walletAccessContext, e)) + } yield result issuerPendingToGeneratedFlow @@ IssuerPendingToGeneratedSuccess.trackSuccess @@ IssuerPendingToGeneratedFailed.trackError @@ -446,19 +476,24 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { _, ) => val sendCredentialManualIssuanceFlow = for { - didCommAgent <- buildDIDCommAgent(issue.from) - resp <- MessagingService - .send(issue.makeMessage) - .provideSomeLayer(didCommAgent) @@ Metric - .gauge("issuance_flow_issuer_send_credential_msg_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - credentialService <- ZIO.service[CredentialService] - _ <- { - if (resp.status >= 200 && resp.status < 300) - credentialService.markCredentialSent(id) @@ IssuerSendCredentialMsgSuccess - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ IssuerSendCredentialMsgFailed - } - } yield () + walletAccessContext <- buildWalletAccessContextLayer(issue.from) + result <- (for { + didCommAgent <- buildDIDCommAgent(issue.from).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + resp <- MessagingService + .send(issue.makeMessage) + .provideSomeLayer(didCommAgent) @@ Metric + .gauge("issuance_flow_issuer_send_credential_msg_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + credentialService <- ZIO.service[CredentialService] + _ <- { + if (resp.status >= 200 && resp.status < 300) + credentialService + .markCredentialSent(id) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) @@ IssuerSendCredentialMsgSuccess + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ IssuerSendCredentialMsgFailed + } + } yield ()).mapError(e => (walletAccessContext, e)) + } yield result sendCredentialManualIssuanceFlow @@ IssuerSendCredentialSuccess.trackSuccess @@ IssuerSendCredentialFailed.trackError @@ -471,18 +506,28 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { } yield () aux - .tapError(e => - for { - credentialService <- ZIO.service[CredentialService] - _ <- credentialService - .reportProcessingFailure(record.id, Some(e.toString)) - .tapError(err => - ZIO.logErrorCause( - s"Issue Credential - failed to report processing failure: ${record.id}", - Cause.fail(err) - ) + .tapError( + { + case walletNotFound: WalletNotFoundError => + ZIO.logErrorCause( + s"Issue Credential- Error processing record: ${record.id}", + Cause.fail(walletNotFound) ) - } yield () + case ((walletAccessContext, e)) => + for { + credentialService <- ZIO.service[CredentialService] + _ <- credentialService + .reportProcessingFailure(record.id, Some(e.toString)) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) + .tapError(err => + ZIO.logErrorCause( + s"Issue Credential - failed to report processing failure: ${record.id}", + Cause.fail(err) + ) + ) + } yield () + + } ) .catchAll(e => ZIO.logErrorCause(s"Issue Credential - Error processing record: ${record.id} ", Cause.fail(e))) .catchAllDefect(d => @@ -491,4 +536,10 @@ object IssueBackgroundJobs extends BackgroundJobsHelper { } + private[this] def handleCredentialErrors + : PartialFunction[Throwable | CredentialServiceError, CredentialServiceError] = { + case e: CredentialServiceError => e + case t: Throwable => CredentialServiceError.UnexpectedError(t.getMessage()) + } + } diff --git a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/PresentBackgroundJobs.scala b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/PresentBackgroundJobs.scala index e9e9e825eb..b5e48c21db 100644 --- a/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/PresentBackgroundJobs.scala +++ b/prism-agent/service/server/src/main/scala/io/iohk/atala/agent/server/jobs/PresentBackgroundJobs.scala @@ -25,9 +25,12 @@ import zio.* import zio.metrics.* import zio.prelude.Validation import zio.prelude.ZValidation.* - +import io.iohk.atala.agent.walletapi.storage.DIDNonSecretStorage +import io.iohk.atala.agent.walletapi.model.error.DIDSecretStorageError.WalletNotFoundError +import io.iohk.atala.resolvers.DIDResolver import java.time.{Clock, Instant, ZoneId} - +import io.iohk.atala.castor.core.service.DIDService +import io.iohk.atala.agent.walletapi.service.ManagedDIDService object PresentBackgroundJobs extends BackgroundJobsHelper { val presentProofExchanges = { @@ -35,7 +38,7 @@ object PresentBackgroundJobs extends BackgroundJobsHelper { presentationService <- ZIO.service[PresentationService] config <- ZIO.service[AppConfig] records <- presentationService - .getPresentationRecordsByStates( + .getPresentationRecordsByStatesForAllWallets( ignoreWithZeroRetries = true, limit = config.pollux.presentationBgJobRecordsLimit, PresentationRecord.ProtocolState.RequestPending, @@ -90,7 +93,11 @@ object PresentBackgroundJobs extends BackgroundJobsHelper { jwtIssuer <- createJwtIssuer(longFormPrismDID, VerificationRelationship.Authentication) } yield jwtIssuer - private[this] def performPresentProofExchange(record: PresentationRecord) = { + private[this] def performPresentProofExchange(record: PresentationRecord): URIO[ + AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & PresentationService & CredentialService & + DIDNonSecretStorage & DIDService & ManagedDIDService, + Unit + ] = { import io.iohk.atala.pollux.core.model.PresentationRecord.ProtocolState.* val VerifierReqPendingToSentSuccess = counterMetric( @@ -168,23 +175,30 @@ object PresentBackgroundJobs extends BackgroundJobsHelper { case Some(record) => val verifierReqPendingToSentFlow = for { _ <- ZIO.log(s"PresentationRecord: RequestPending (Send Massage)") - didOps <- ZIO.service[DidOps] - didCommAgent <- buildDIDCommAgent(record.from) - resp <- MessagingService.send(record.makeMessage).provideSomeLayer(didCommAgent) @@ Metric - .gauge("present_proof_flow_verifier_send_presentation_request_msg_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - service <- ZIO.service[PresentationService] - _ <- { - if (resp.status >= 200 && resp.status < 300) - service.markRequestPresentationSent( - id - ) @@ VerifierSendPresentationRequestMsgSuccess @@ CustomMetricsAspect.endRecordingTime( - s"${record.id}_present_proof_flow_verifier_req_pending_to_sent_ms_gauge", - "present_proof_flow_verifier_req_pending_to_sent_ms_gauge" - ) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ VerifierSendPresentationRequestMsgFailed - } - } yield () + walletAccessContext <- buildWalletAccessContextLayer(record.from) + result <- (for { + didOps <- ZIO.service[DidOps] + didCommAgent <- buildDIDCommAgent(record.from).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + resp <- MessagingService.send(record.makeMessage).provideSomeLayer(didCommAgent) @@ Metric + .gauge("present_proof_flow_verifier_send_presentation_request_msg_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + service <- ZIO.service[PresentationService] + _ <- { + if (resp.status >= 200 && resp.status < 300) + service + .markRequestPresentationSent( + id + ) + .provideSomeLayer( + ZLayer.succeed(walletAccessContext) + ) @@ VerifierSendPresentationRequestMsgSuccess @@ CustomMetricsAspect.endRecordingTime( + s"${record.id}_present_proof_flow_verifier_req_pending_to_sent_ms_gauge", + "present_proof_flow_verifier_req_pending_to_sent_ms_gauge" + ) + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ VerifierSendPresentationRequestMsgFailed + } + } yield ()).mapError(e => (walletAccessContext, handlePresentationErrors(e))) + } yield result verifierReqPendingToSentFlow @@ VerifierReqPendingToSentSuccess.trackSuccess @@ -225,52 +239,58 @@ object PresentBackgroundJobs extends BackgroundJobsHelper { _, _ ) => // Prover - val proverPresentationPendingToGeneratedFlow = for { - presentationService <- ZIO.service[PresentationService] - prover <- createPrismDIDIssuerFromPresentationCredentials(id, credentialsToUse.getOrElse(Nil)) - presentationPayload <- presentationService.createPresentationPayloadFromRecord( - id, - prover, - Instant.now() - ) - signedJwtPresentation = JwtPresentation.toEncodedJwt( - presentationPayload.toW3CPresentationPayload, - prover - ) - // signedJwtPresentation = JwtPresentation.toEncodedJwt(w3cPresentationPayload, prover) - presentation <- oRequestPresentation match - case None => ZIO.fail(InvalidState("PresentationRecord 'RequestPending' with no Record")) - case Some(requestPresentation) => { // TODO create build method in mercury for Presentation - ZIO.succeed( - Presentation( - body = Presentation.Body( - goal_code = requestPresentation.body.goal_code, - comment = requestPresentation.body.comment - ), - attachments = Seq( - AttachmentDescriptor - .buildBase64Attachment( - payload = signedJwtPresentation.value.getBytes(), - mediaType = Some("prism/jwt") - ) - ), - thid = requestPresentation.thid.orElse(Some(requestPresentation.id)), - from = requestPresentation.to, - to = requestPresentation.from + // signedJwtPresentation = JwtPresentation.toEncodedJwt(w3cPresentationPayload, prover) + oRequestPresentation match + case None => ZIO.fail(InvalidState("PresentationRecord 'RequestPending' with no Record")) + case Some(requestPresentation) => // TODO create build method in mercury for Presentation + val proverPresentationPendingToGeneratedFlow = for { + walletAccessContext <- buildWalletAccessContextLayer(requestPresentation.to) + result <- (for { + presentationService <- ZIO.service[PresentationService] + prover <- createPrismDIDIssuerFromPresentationCredentials(id, credentialsToUse.getOrElse(Nil)) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) + presentationPayload <- presentationService + .createPresentationPayloadFromRecord( + id, + prover, + Instant.now() + ) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) + signedJwtPresentation = JwtPresentation.toEncodedJwt( + presentationPayload.toW3CPresentationPayload, + prover ) - ) - } - _ <- presentationService.markPresentationGenerated(id, presentation) - - } yield () + presentation <- ZIO.succeed( + Presentation( + body = Presentation.Body( + goal_code = requestPresentation.body.goal_code, + comment = requestPresentation.body.comment + ), + attachments = Seq( + AttachmentDescriptor + .buildBase64Attachment( + payload = signedJwtPresentation.value.getBytes(), + mediaType = Some("prism/jwt") + ) + ), + thid = requestPresentation.thid.orElse(Some(requestPresentation.id)), + from = requestPresentation.to, + to = requestPresentation.from + ) + ) + _ <- presentationService + .markPresentationGenerated(id, presentation) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) + } yield ()).mapError(e => (walletAccessContext, handlePresentationErrors(e))) + } yield result - proverPresentationPendingToGeneratedFlow - @@ ProverPresentationPendingToGeneratedSuccess.trackSuccess - @@ ProverPresentationPendingToGeneratedFailed.trackError - @@ ProverPresentationPendingToGenerated - @@ Metric - .gauge("present_proof_flow_prover_presentation_pending_to_generated_flow_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) + proverPresentationPendingToGeneratedFlow + @@ ProverPresentationPendingToGeneratedSuccess.trackSuccess + @@ ProverPresentationPendingToGeneratedFailed.trackError + @@ ProverPresentationPendingToGenerated + @@ Metric + .gauge("present_proof_flow_prover_presentation_pending_to_generated_flow_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) case PresentationRecord( id, @@ -297,23 +317,31 @@ object PresentBackgroundJobs extends BackgroundJobsHelper { case Some(p) => val ProverPresentationGeneratedToSentFlow = for { _ <- ZIO.log(s"PresentationRecord: PresentationPending (Send Message)") - didCommAgent <- buildDIDCommAgent(p.from) - resp <- MessagingService - .send(p.makeMessage) - .provideSomeLayer(didCommAgent) @@ Metric - .gauge("present_proof_flow_prover_send_presentation_msg_ms_gauge") - .trackDurationWith(_.toMetricsSeconds) - service <- ZIO.service[PresentationService] - _ <- { - if (resp.status >= 200 && resp.status < 300) - service.markPresentationSent(id) @@ ProverSendPresentationMsgSuccess @@ CustomMetricsAspect - .endRecordingTime( - s"${record.id}_present_proof_flow_prover_presentation_generated_to_sent_ms_gauge", - "present_proof_flow_prover_presentation_generated_to_sent_ms_gauge" - ) - else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ ProverSendPresentationMsgFailed - } - } yield () + walletAccessContext <- buildWalletAccessContextLayer(p.from) + result <- (for { + didCommAgent <- buildDIDCommAgent(p.from).provideSomeLayer(ZLayer.succeed(walletAccessContext)) + resp <- MessagingService + .send(p.makeMessage) + .provideSomeLayer(didCommAgent) @@ Metric + .gauge("present_proof_flow_prover_send_presentation_msg_ms_gauge") + .trackDurationWith(_.toMetricsSeconds) + service <- ZIO.service[PresentationService] + _ <- { + if (resp.status >= 200 && resp.status < 300) + service + .markPresentationSent(id) + .provideSomeLayer( + ZLayer.succeed(walletAccessContext) + ) @@ ProverSendPresentationMsgSuccess @@ CustomMetricsAspect + .endRecordingTime( + s"${record.id}_present_proof_flow_prover_presentation_generated_to_sent_ms_gauge", + "present_proof_flow_prover_presentation_generated_to_sent_ms_gauge" + ) + else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ ProverSendPresentationMsgFailed + } + } yield ()).mapError(e => (walletAccessContext, handlePresentationErrors(e))) + + } yield result ProverPresentationGeneratedToSentFlow @@ ProverPresentationGeneratedToSentSuccess.trackSuccess @@ -351,82 +379,98 @@ object PresentBackgroundJobs extends BackgroundJobsHelper { case Some(p) => val verifierPresentationReceivedToProcessed = for { - didResolverService <- ZIO.service[JwtDidResolver] - credentialsValidationResult <- p.attachments.head.data match { - case Base64(data) => - val base64Decoded = new String(java.util.Base64.getDecoder().decode(data)) - val maybePresentationOptions - : Either[PresentationError, Option[io.iohk.atala.pollux.core.model.presentation.Options]] = - mayBeRequestPresentation - .map( - _.attachments.headOption - .map(attachment => - decode[io.iohk.atala.mercury.model.JsonData](attachment.data.asJson.noSpaces) - .flatMap(data => - io.iohk.atala.pollux.core.model.presentation.PresentationAttachment.given_Decoder_PresentationAttachment - .decodeJson(data.json.asJson) - .map(_.options) - .leftMap(err => - PresentationDecodingError( - new Throwable(s"PresentationAttachment decoding error: $err") + walletAccessContext <- buildWalletAccessContextLayer(p.to) + result <- (for { + didResolverService <- ZIO.service[JwtDidResolver] + credentialsValidationResult <- p.attachments.head.data match { + case Base64(data) => + val base64Decoded = new String(java.util.Base64.getDecoder().decode(data)) + val maybePresentationOptions + : Either[PresentationError, Option[io.iohk.atala.pollux.core.model.presentation.Options]] = + mayBeRequestPresentation + .map( + _.attachments.headOption + .map(attachment => + decode[io.iohk.atala.mercury.model.JsonData](attachment.data.asJson.noSpaces) + .flatMap(data => + io.iohk.atala.pollux.core.model.presentation.PresentationAttachment.given_Decoder_PresentationAttachment + .decodeJson(data.json.asJson) + .map(_.options) + .leftMap(err => + PresentationDecodingError( + new Throwable(s"PresentationAttachment decoding error: $err") + ) ) - ) - ) - .leftMap(err => - PresentationDecodingError(new Throwable(s"JsonData decoding error: $err")) - ) + ) + .leftMap(err => + PresentationDecodingError(new Throwable(s"JsonData decoding error: $err")) + ) + ) + .getOrElse(Right(None)) + ) + .getOrElse(Left(UnexpectedError("RequestPresentation NotFound"))) + for { + _ <- ZIO.fromEither(maybePresentationOptions.map { + case Some(options) => + JwtPresentation.validatePresentation( + JWT(base64Decoded), + options.domain, + options.challenge ) - .getOrElse(Right(None)) - ) - .getOrElse(Left(UnexpectedError("RequestPresentation NotFound"))) - for { - _ <- ZIO.fromEither(maybePresentationOptions.map { - case Some(options) => - JwtPresentation.validatePresentation(JWT(base64Decoded), options.domain, options.challenge) - case _ => Validation.unit - }) - verificationConfig <- ZIO.service[AppConfig].map(_.agent.verification) - _ <- ZIO.log(s"VerificationConfig: ${verificationConfig}") + case _ => Validation.unit + }) + verificationConfig <- ZIO.service[AppConfig].map(_.agent.verification) + _ <- ZIO.log(s"VerificationConfig: ${verificationConfig}") - // https://www.w3.org/TR/vc-data-model/#proofs-signatures-0 - // A proof is typically attached to a verifiable presentation for authentication purposes - // and to a verifiable credential as a method of assertion. - result <- JwtPresentation.verify( - JWT(base64Decoded), - verificationConfig.toPresentationVerificationOptions() - )(didResolverService)(clock) - } yield result + // https://www.w3.org/TR/vc-data-model/#proofs-signatures-0 + // A proof is typically attached to a verifiable presentation for authentication purposes + // and to a verifiable credential as a method of assertion. + result <- JwtPresentation + .verify( + JWT(base64Decoded), + verificationConfig.toPresentationVerificationOptions() + )(didResolverService)(clock) + .mapError(error => PresentationError.UnexpectedError(error.mkString)) + } yield result - case any => ZIO.fail(NotImplemented) - } - _ <- ZIO.log(s"CredentialsValidationResult: $credentialsValidationResult") - service <- ZIO.service[PresentationService] - presReceivedToProcessedAspect = CustomMetricsAspect.endRecordingTime( - s"${record.id}_present_proof_flow_verifier_presentation_received_to_verification_success_or_failure_ms_gauge", - "present_proof_flow_verifier_presentation_received_to_verification_success_or_failure_ms_gauge" - ) - _ <- credentialsValidationResult match { - case Success(log, value) => service.markPresentationVerified(id) @@ presReceivedToProcessedAspect - case Failure(log, error) => { - for { - _ <- service.markPresentationVerificationFailed(id) @@ presReceivedToProcessedAspect - didCommAgent <- buildDIDCommAgent(p.from) - reportproblem = ReportProblem.build( - fromDID = p.to, - toDID = p.from, - pthid = p.thid.getOrElse(p.id), - code = ProblemCode("e.p.presentation-verification-failed"), - comment = Some(error.mkString) - ) - resp <- MessagingService - .send(reportproblem.toMessage) - .provideSomeLayer(didCommAgent) - _ <- ZIO.log(s"CredentialsValidationResult: $error") - } yield () + case any => ZIO.fail(NotImplemented) + } + _ <- ZIO.log(s"CredentialsValidationResult: $credentialsValidationResult") + service <- ZIO.service[PresentationService] + presReceivedToProcessedAspect = CustomMetricsAspect.endRecordingTime( + s"${record.id}_present_proof_flow_verifier_presentation_received_to_verification_success_or_failure_ms_gauge", + "present_proof_flow_verifier_presentation_received_to_verification_success_or_failure_ms_gauge" + ) + _ <- credentialsValidationResult match { + case Success(log, value) => + service + .markPresentationVerified(id) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) @@ presReceivedToProcessedAspect + case Failure(log, error) => { + for { + _ <- service + .markPresentationVerificationFailed(id) + .provideSomeLayer(ZLayer.succeed(walletAccessContext)) @@ presReceivedToProcessedAspect + didCommAgent <- buildDIDCommAgent(p.from).provideSomeLayer( + ZLayer.succeed(walletAccessContext) + ) + reportproblem = ReportProblem.build( + fromDID = p.to, + toDID = p.from, + pthid = p.thid.getOrElse(p.id), + code = ProblemCode("e.p.presentation-verification-failed"), + comment = Some(error.mkString) + ) + resp <- MessagingService + .send(reportproblem.toMessage) + .provideSomeLayer(didCommAgent) + _ <- ZIO.log(s"CredentialsValidationResult: $error") + } yield () + } } - } - } yield () + } yield ()).mapError(e => (walletAccessContext, handlePresentationErrors(e))) + } yield result verifierPresentationReceivedToProcessed @@ VerifierPresentationReceivedToProcessedSuccess.trackSuccess @@ VerifierPresentationReceivedToProcessedFailed.trackError @@ -449,22 +493,51 @@ object PresentBackgroundJobs extends BackgroundJobsHelper { } yield () aux - .tapError(e => - for { - presentationService <- ZIO.service[PresentationService] - _ <- presentationService - .reportProcessingFailure(record.id, Some(e.toString)) - .tapError(err => - ZIO.logErrorCause( - s"Present Proof - failed to report processing failure: ${record.id}", - Cause.fail(err) - ) + .tapError( + { + case error: (WalletNotFoundError | BackgroundJobError) => + ZIO.logErrorCause( + s"Present Proof - Error processing record: ${record.id}", + Cause.fail(error) ) - } yield () + case ((walletAccessContext, e)) => + for { + presentationService <- ZIO.service[PresentationService] + _ <- presentationService + .reportProcessingFailure(record.id, Some(e.toString)) + .provideSomeLayer( + ZLayer.succeed(walletAccessContext) + ) + .tapError(err => + ZIO.logErrorCause( + s"Present Proof - failed to report processing failure: ${record.id}", + Cause.fail(err) + ) + ) + } yield () + + } ) .catchAll(e => ZIO.logErrorCause(s"Present Proof - Error processing record: ${record.id} ", Cause.fail(e))) .catchAllDefect(d => ZIO.logErrorCause(s"Present Proof - Defect processing record: ${record.id}", Cause.fail(d))) } + private[this] def handlePresentationErrors: PartialFunction[ + Throwable | CredentialServiceError | PresentationError | BackgroundJobError, + PresentationError | CredentialServiceError | BackgroundJobError + ] = { + case c: CredentialServiceError => c + case p: PresentationError => p + case b: BackgroundJobError => b + case t: Throwable => PresentationError.UnexpectedError(t.getMessage()) + } + + val syncDIDPublicationStateFromDlt = + for { + managedDidService <- ZIO.service[ManagedDIDService] + _ <- managedDidService.syncManagedDIDState + _ <- managedDidService.syncUnconfirmedUpdateOperations + } yield () + }