Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prism-agent): Metrics for connection flow job #611

Merged
merged 9 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ lazy val D = new {

lazy val D_Shared = new {
lazy val dependencies: Seq[ModuleID] =
Seq(D.typesafeConfig, D.scalaPbGrpc, D.testcontainersPostgres, D.testcontainersVault)
Seq(D.typesafeConfig, D.scalaPbGrpc, D.testcontainersPostgres, D.testcontainersVault, D.zio)
}

lazy val D_Connect = new {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.iohk.atala.mercury.model.DidId
import io.iohk.atala.mercury.protocol.connection.*
import io.iohk.atala.mercury.protocol.invitation.v2.Invitation
import io.iohk.atala.shared.utils.Base64Utils
import io.iohk.atala.shared.utils.aspects.CustomMetricsAspect
import zio.*

import java.rmi.UnexpectedException
Expand Down Expand Up @@ -136,10 +137,12 @@ private class ConnectionServiceImpl(
record <- getRecordWithState(recordId, ProtocolState.InvitationReceived)
request = ConnectionRequest
.makeFromInvitation(record.invitation, pairwiseDid)
.copy(thid = Some(record.invitation.id)) // This logic shound be move to the SQL when fetching the record
.copy(thid = Some(record.invitation.id)) // This logic should be moved to the SQL when fetching the record
count <- connectionRepository
.updateWithConnectionRequest(recordId, request, ProtocolState.ConnectionRequestPending, maxRetries)
.mapError(RepositoryError.apply)
.mapError(RepositoryError.apply) @@ CustomMetricsAspect.startRecordingTime(
s"${record.id}_invitee_pending_to_req_sent"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a metrics which is named invitee_pending_to_req_sent with a label of record.id instead of many different metrics? It won't be possible to compare / sum by record.id if this is a full metric on it's own

)
_ <- count match
case 1 => ZIO.succeed(())
case n => ZIO.fail(RecordIdNotFound(recordId))
Expand Down Expand Up @@ -198,7 +201,9 @@ private class ConnectionServiceImpl(
// response = createDidCommConnectionResponse(record)
count <- connectionRepository
.updateWithConnectionResponse(recordId, response, ProtocolState.ConnectionResponsePending, maxRetries)
.mapError(RepositoryError.apply)
.mapError(RepositoryError.apply) @@ CustomMetricsAspect.startRecordingTime(
s"${record.id}_inviter_pending_to_res_sent"
)
_ <- count match
case 1 => ZIO.succeed(())
case n => ZIO.fail(RecordIdNotFound(recordId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import io.iohk.atala.mercury.*
import io.iohk.atala.mercury.model.*
import io.iohk.atala.mercury.model.error.*
import io.iohk.atala.resolvers.DIDResolver
import io.iohk.atala.shared.utils.aspects.CustomMetricsAspect
import io.iohk.atala.shared.utils.DurationOps.toMetricsSeconds
import zio.*
import zio.metrics.*
object ConnectBackgroundJobs {

val didCommExchanges = {
Expand All @@ -37,6 +40,43 @@ object ConnectBackgroundJobs {
): URIO[DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService, Unit] = {
import ProtocolState.*
import Role.*

def counterMetric(key: String) = Metric
.counterInt(key)
.fromConst(1)
.tagged("connectionId", record.id.toString)

val InviteeConnectionRequestMsgFailed = counterMetric(
"connection_flow_invitee_connection_request_msg_failed_counter"
)
val InviteeConnectionRequestMsgSuccess = counterMetric(
"connection_flow_invitee_connection_request_msg_success_counter"
)
val InviterConnectionResponseMsgFailed = counterMetric(
"connection_flow_inviter_connection_response_msg_failed_counter"
)
val InviterConnectionResponseMsgSuccess = counterMetric(
"connection_flow_inviter_connection_response_msg_success_counter"
)
val InviteeProcessConnectionRecordPendingSuccess = counterMetric(
"connection_flow_invitee_process_connection_record_success_counter"
)
val InviteeProcessConnectionRecordPendingFailed = counterMetric(
"connection_flow_invitee_process_connection_record_failed_counter"
)
val InviteeProcessConnectionRecordPendingTotal = counterMetric(
"connection_flow_invitee_process_connection_record_total_counter"
)
val InviterProcessConnectionRecordPendingSuccess = counterMetric(
"connection_flow_inviter_process_connection_record_success_counter"
)
val InviterProcessConnectionRecordPendingFailed = counterMetric(
"connection_flow_inviter_process_connection_record_failed_counter"
)
val InviterProcessConnectionRecordPendingTotal = counterMetric(
"connection_flow_inviter_process_connection_record_total_counter"
)

val exchange = record match {
case ConnectionRecord(
id,
Expand All @@ -53,19 +93,42 @@ object ConnectBackgroundJobs {
_,
_
) if metaRetries > 0 =>
val aux = for {
val inviteeProcessFlow = for {

didCommAgent <- buildDIDCommAgent(request.from)
resp <- MessagingService.send(request.makeMessage).provideSomeLayer(didCommAgent)
resp <- MessagingService.send(request.makeMessage).provideSomeLayer(didCommAgent) @@ Metric
.gauge("connection_flow_invitee_send_connection_request_ms_gauge")
.tagged("connectionId", record.id.toString)
.trackDurationWith(_.toMetricsSeconds)
connectionService <- ZIO.service[ConnectionService]
_ <- {
if (resp.status >= 200 && resp.status < 300) connectionService.markConnectionRequestSent(id)
else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp))
if (resp.status >= 200 && resp.status < 300)
connectionService.markConnectionRequestSent(id)
@@ InviteeConnectionRequestMsgSuccess
@@ CustomMetricsAspect.endRecordingTime(
s"${record.id}_invitee_pending_to_req_sent",
"connection_flow_invitee_pending_to_req_sent_ms_gauge",
Set(
MetricLabel(
"connectionId",
record.id.toString
)
)
)
else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ InviteeConnectionRequestMsgFailed
}
} yield ()

// aux // TODO decrete metaRetries if it has a error
aux
// inviteeProcessFlow // TODO decrease metaRetries if it has a error

inviteeProcessFlow
@@ InviteeProcessConnectionRecordPendingSuccess.trackSuccess
@@ InviteeProcessConnectionRecordPendingFailed.trackError
@@ InviteeProcessConnectionRecordPendingTotal
@@ Metric
.gauge("connection_flow_invitee_process_connection_record_ms_gauge")
.tagged("connectionId", record.id.toString)
.trackDurationWith(_.toMetricsSeconds)

case ConnectionRecord(
id,
Expand All @@ -82,16 +145,40 @@ object ConnectBackgroundJobs {
_,
_
) if metaRetries > 0 =>
for {
val inviterProcessFlow = for {
didCommAgent <- buildDIDCommAgent(response.from)
resp <- MessagingService.send(response.makeMessage).provideSomeLayer(didCommAgent)
resp <- MessagingService.send(response.makeMessage).provideSomeLayer(didCommAgent) @@ Metric
.gauge("connection_flow_inviter_send_connection_response_ms_gauge")
.tagged("connectionId", record.id.toString)
.trackDurationWith(_.toMetricsSeconds)
connectionService <- ZIO.service[ConnectionService]
_ <- {
if (resp.status >= 200 && resp.status < 300) connectionService.markConnectionResponseSent(id)
else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp))
if (resp.status >= 200 && resp.status < 300)
connectionService.markConnectionResponseSent(id)
@@ InviterConnectionResponseMsgSuccess
@@ CustomMetricsAspect.endRecordingTime(
s"${record.id}_inviter_pending_to_res_sent",
"connection_flow_inviter_pending_to_res_sent_ms_gauge",
Set(
MetricLabel(
"connectionId",
record.id.toString
)
)
)
else ZIO.fail(ErrorResponseReceivedFromPeerAgent(resp)) @@ InviterConnectionResponseMsgFailed
}
} yield ()

inviterProcessFlow
@@ InviterProcessConnectionRecordPendingSuccess.trackSuccess
@@ InviterProcessConnectionRecordPendingFailed.trackError
@@ InviterProcessConnectionRecordPendingTotal
@@ Metric
.gauge("connection_flow_inviter_process_connection_record_ms_gauge")
.tagged("connectionId", record.id.toString)
.trackDurationWith(_.toMetricsSeconds)

case _ => ZIO.unit
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.iohk.atala.shared.utils

import java.time.Duration

object DurationOps {

extension (d: Duration) def toMetricsSeconds: Double = d.toMillis.toDouble / 1000.0

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.iohk.atala.shared.utils.aspects

import zio.*
import scala.collection.mutable.{Map => MutMap}
import zio.metrics.*
import java.time.{Instant, Clock, Duration}
import io.iohk.atala.shared.utils.DurationOps.toMetricsSeconds

object CustomMetricsAspect {
private val checkpoints: MutMap[String, Instant] = MutMap.empty
private val clock = Clock.systemUTC()
private def now = ZIO.succeed(clock.instant)

def startRecordingTime(key: String): ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] =
new ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] {
override def apply[R, E, A](
zio: ZIO[R, E, A]
)(implicit trace: Trace): ZIO[R, E, A] =
for {
res <- zio
timeAfter <- now
_ = checkpoints.update(key, timeAfter)
} yield res
}

def endRecordingTime(
key: String,
metricsKey: String,
tags: Set[MetricLabel] = Set.empty
): ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] =
new ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] {
override def apply[R, E, A](
zio: ZIO[R, E, A]
)(implicit trace: Trace): ZIO[R, E, A] = {
for {
res <- zio
end <- now
maybeStart = checkpoints.get(key)
metricsZio = maybeStart.map(start => Duration.between(start, end)).fold(ZIO.unit) { duration =>
ZIO.succeed(duration.toMetricsSeconds) @@ Metric.gauge(metricsKey).tagged(tags)
}
_ <- metricsZio
} yield res
}
}

}
Loading