Skip to content

Commit

Permalink
fix pg client must be a shared instance
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrebruninmaif committed Dec 21, 2023
1 parent f203f13 commit ec4fbee
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 46 deletions.
6 changes: 3 additions & 3 deletions src/main/java/fr/maif/automate/MainVerticle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.reactivex.schedulers.Schedulers
import io.vertx.core.Handler
import io.vertx.core.Promise
import io.vertx.core.json.JsonObject
import io.vertx.ext.jdbc.JDBCClient
import io.vertx.kotlin.core.json.json
import io.vertx.kotlin.core.json.obj
import io.vertx.reactivex.core.AbstractVerticle
Expand Down Expand Up @@ -82,9 +83,8 @@ class MainVerticle : AbstractVerticle() {

val pgConfig = letsAutomateConfig.postgresConfig
initDb(pgConfig).subscribe({

val postgresClient =
SQLClient.newInstance(io.vertx.ext.jdbc.JDBCClient.create(vertx.delegate, pgConfigToJson(pgConfig)))
val jdbcClient = JDBCClient.createShared(vertx.delegate, pgConfigToJson(pgConfig))
val postgresClient = SQLClient.newInstance(jdbcClient)

val client = WebClient.create(vertx)
val dnsManager = OvhDnsManager(client, vertx.createDnsClient(), letsAutomateConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class CertificateRouter(certificates: Certificates) {
}

val applyCommand = Handler<RoutingContext> { req ->
val bodyAsJson = req.bodyAsJson
val bodyAsJson = req.body().asJsonObject()
LOGGER.info("Certificate command {}", bodyAsJson)

val command = CertificateCommand.fromJson(bodyAsJson)
Expand Down Expand Up @@ -87,7 +87,7 @@ class CertificateRouter(certificates: Certificates) {
response.headers().add("Connection", "keep-alive")
response.statusCode = 200
response.write("")
certificates.eventsView.eventsStream(lastId).subscribe({ (id, evt) ->
certificates.eventsView.eventsStream().subscribe({ (id, evt) ->
context.response().write("id: $id\ndata: ${evt.encode()}\n\n")
}, {
//LOGGER.error("Error during sse", e)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package fr.maif.automate.certificate.eventhandler

import arrow.core.Either
import arrow.core.None
import arrow.core.Some
import arrow.core.toOption
Expand All @@ -9,13 +8,12 @@ import fr.maif.automate.certificate.write.*
import fr.maif.automate.commons.eventsourcing.EventReader
import fr.maif.automate.commons.eventsourcing.EventStore
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.disposables.Disposable
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicReference

class EventToCommandAdapter(private val eventStore: EventStore, val certificates: Certificates, private val eventReader: EventReader<CertificateEvent>) {
class EventToCommandAdapter(private val eventStore: EventStore, private val certificates: Certificates, private val eventReader: EventReader<CertificateEvent>) {
companion object {
val LOGGER = LoggerFactory.getLogger(EventToCommandAdapter::class.java) as Logger
const val GROUP_ID = "EventToCommandAdapter"
Expand All @@ -24,15 +22,10 @@ class EventToCommandAdapter(private val eventStore: EventStore, val certificates

fun startAdapter() {
LOGGER.info("Starting event to command adapter")
val disposable = adaptaterStream().subscribe({

}, { e ->
val disposable = adaptaterStream()
.retry(3)
.subscribe({}, { e ->
LOGGER.error("Error consuming command stream, going to restart", e)
ref.get().toOption().forall{d ->
d.dispose()
true
}
ref.set(adaptaterStream().subscribe())
})
ref.set(disposable)
}
Expand Down Expand Up @@ -70,21 +63,13 @@ class EventToCommandAdapter(private val eventStore: EventStore, val certificates
is Some -> {
certificates.onCommand(mayBeCommand.t)
.flatMap {
//when (r) {
LOGGER.info("Command success, commiting from group id $GROUP_ID and sequence_num $sequence")
eventStore.commit(GROUP_ID, sequence)
// is Either.Right -> {
//
// }
// is Either.Left -> {
// LOGGER.error("Command failure, retrying ...")
// Single.error(RuntimeException("Error handling command ${r.a}"))
// }
//}
LOGGER.info("Command success, committing from group id $GROUP_ID and sequence_num $sequence")
eventStore.commit(GROUP_ID, sequence)
}.toObservable()
}

is None -> {
LOGGER.info("Empty Command, commiting from froup id $GROUP_ID and sequence_num $sequence")
LOGGER.info("Empty Command, committing from group id $GROUP_ID and sequence_num $sequence")
eventStore.commit(GROUP_ID, sequence).toObservable()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class EventsView(private val eventStore: EventStore, private val eventReader: Ev

}

fun eventsStream(id: Option<Long> = None): Observable<Pair<Long, JsonObject>> {
fun eventsStream(): Observable<Pair<Long, JsonObject>> {
return eventStore
.eventStream()
.map { Triple(eventReader.read(it), it.eventType, it.sequence) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,9 @@ class PostgresEventStore(private val table: String, private val offetsTable: Str
}

override fun commit(groupId: String, sequenceNum: Long): Single<Unit> {
return pgClient.rxGetConnection().flatMap { connection ->
connection.rxSetAutoCommit(false)
.toSingleDefault(Unit).flatMap {
LOGGER.debug("Upsert last commit for $groupId to $sequenceNum")
val query = """INSERT INTO $offetsTable (group_id, sequence_num) VALUES(?, ?) ON CONFLICT (group_id) DO UPDATE SET sequence_num = ?"""
connection.rxUpdateWithParams(query, json { array(groupId, sequenceNum, sequenceNum) })
}.flatMap { _ ->
connection.rxCommit().toSingle { Unit }
}.doOnError { e ->
LOGGER.error("Error during commit -> rollback", e)
connection.rxRollback().subscribe()
}
.flatMap { _ -> connection.rxSetAutoCommit(true).andThen(Single.just(Unit)) }
.doFinally { connection.rxClose().subscribe() }
}
LOGGER.debug("Upsert last commit for $groupId to $sequenceNum")
val query = """INSERT INTO $offetsTable (group_id, sequence_num) VALUES(?, ?) ON CONFLICT (group_id) DO UPDATE SET sequence_num = ?"""
return pgClient.rxUpdateWithParams(query, json { array(groupId, sequenceNum, sequenceNum) }).map { _ -> Unit }
}

override fun eventStream(id: String): Observable<EventEnvelope> = eventStream().filter { it.entityId == id }
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/fr/maif/automate/dns/DnsRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class DnsRouter(dnsManager: DnsManager) {

val createRecord = Handler<RoutingContext> { req ->
val domain = req.pathParam("domain")
val record = Record.fromJson(req.bodyAsJson)
val record = Record.fromJson(req.body().asJsonObject())
dnsManager.createRecord(domain, record)
.subscribe ({ result ->
when(result) {
Expand All @@ -41,7 +41,7 @@ class DnsRouter(dnsManager: DnsManager) {
val updateRecord = Handler<RoutingContext> { req ->
val domain = req.pathParam("domain")
val recordId = req.pathParam("recordId")
val record = Record.fromJson(req.bodyAsJson)
val record = Record.fromJson(req.body().asJsonObject())
dnsManager.updateRecord(domain, recordId.toLong(), record)
.subscribe ({ result ->
when(result) {
Expand Down

0 comments on commit ec4fbee

Please sign in to comment.