Skip to content

Commit

Permalink
Ensure proper threads for InterProcessMutex
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewWestberg committed Sep 23, 2024
1 parent ff7886e commit c68d4f7
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.newm.server.features.earnings.model.Earning
import io.newm.server.features.song.database.SongTable
import io.newm.server.features.song.repo.SongRepository
import io.newm.server.features.user.repo.UserRepository
import io.newm.server.ktx.withLock
import io.newm.server.typealiases.SongId
import io.newm.shared.koin.inject
import io.newm.shared.ktx.toDate
Expand Down Expand Up @@ -199,11 +200,8 @@ class EarningsRepositoryImpl(

override suspend fun createClaimOrder(claimOrderRequest: ClaimOrderRequest): ClaimOrder? {
// create any claim orders one at a time to ensure a flood attack can't create a bunch of dup claim orders
claimOrderMutex.acquire()
try {
return createClaimOrderInternal(claimOrderRequest)
} finally {
claimOrderMutex.release()
return claimOrderMutex.withLock(CLAIM_ORDER_INTER_PROCESS_MUTEX_PATH) {
createClaimOrderInternal(claimOrderRequest)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.newm.server.ktx

import com.github.benmanes.caffeine.cache.Caffeine
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.curator.framework.recipes.locks.InterProcessMutex

// InterProcessMutext does not play nice with coroutines if the block() resumes on a different thread
// This extension function ensures that the block() is executed on the same thread as the lock and
// releases the lock after the block is executed on the same thread.

@OptIn(ExperimentalCoroutinesApi::class)
private val singleThreadContextCache = Caffeine
.newBuilder()
.build<String, CoroutineContext> { name ->
@OptIn(DelicateCoroutinesApi::class)
newSingleThreadContext(name)
}

suspend fun <T> InterProcessMutex.withLock(
lockName: String,
block: suspend () -> T
): T =
withContext(singleThreadContextCache.get(lockName)) {
// acquire() with throw and exit early if we couldn't acquire the lock
acquire()
try {
block()
} finally {
release()
}
}
20 changes: 17 additions & 3 deletions newm-server/src/main/resources/openapi/documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -594,9 +594,7 @@ paths:
content:
'application/json':
schema:
type: "array"
items:
$ref: "#/components/schemas/Earning"
$ref: "#/components/schemas/GetEarningsResponse"
/v1/earnings:
post:
tags:
Expand Down Expand Up @@ -2312,6 +2310,22 @@ components:
- "stakeAddress"
- "memo"
- "createdAt"
GetEarningsResponse:
type: "object"
properties:
totalClaimed:
type: "integer"
format: "int64"
earnings:
type: "array"
items:
$ref: "#/components/schemas/Earning"
amountCborHex:
type: "string"
required:
- "totalClaimed"
- "earnings"
- "amountCborHex"
ClaimOrderRequest:
type: "object"
properties:
Expand Down

0 comments on commit c68d4f7

Please sign in to comment.