diff --git a/src/main/java/fr/maif/automate/MainVerticle.kt b/src/main/java/fr/maif/automate/MainVerticle.kt index f109cd1..fc2f3d4 100644 --- a/src/main/java/fr/maif/automate/MainVerticle.kt +++ b/src/main/java/fr/maif/automate/MainVerticle.kt @@ -26,6 +26,7 @@ import io.reactivex.schedulers.Schedulers import io.vertx.core.Handler import io.vertx.core.Promise import io.vertx.core.json.JsonObject +import io.vertx.ext.jdbc.JDBCClient import io.vertx.kotlin.core.json.json import io.vertx.kotlin.core.json.obj import io.vertx.reactivex.core.AbstractVerticle @@ -82,9 +83,8 @@ class MainVerticle : AbstractVerticle() { val pgConfig = letsAutomateConfig.postgresConfig initDb(pgConfig).subscribe({ - - val postgresClient = - SQLClient.newInstance(io.vertx.ext.jdbc.JDBCClient.create(vertx.delegate, pgConfigToJson(pgConfig))) + val jdbcClient = JDBCClient.createShared(vertx.delegate, pgConfigToJson(pgConfig)) + val postgresClient = SQLClient.newInstance(jdbcClient) val client = WebClient.create(vertx) val dnsManager = OvhDnsManager(client, vertx.createDnsClient(), letsAutomateConfig) diff --git a/src/main/java/fr/maif/automate/certificate/CertificateRouter.kt b/src/main/java/fr/maif/automate/certificate/CertificateRouter.kt index 166a6ea..fbcb526 100644 --- a/src/main/java/fr/maif/automate/certificate/CertificateRouter.kt +++ b/src/main/java/fr/maif/automate/certificate/CertificateRouter.kt @@ -30,7 +30,7 @@ class CertificateRouter(certificates: Certificates) { } val applyCommand = Handler { req -> - val bodyAsJson = req.bodyAsJson + val bodyAsJson = req.body().asJsonObject() LOGGER.info("Certificate command {}", bodyAsJson) val command = CertificateCommand.fromJson(bodyAsJson) @@ -87,7 +87,7 @@ class CertificateRouter(certificates: Certificates) { response.headers().add("Connection", "keep-alive") response.statusCode = 200 response.write("") - certificates.eventsView.eventsStream(lastId).subscribe({ (id, evt) -> + certificates.eventsView.eventsStream().subscribe({ (id, evt) -> context.response().write("id: $id\ndata: ${evt.encode()}\n\n") }, { //LOGGER.error("Error during sse", e) diff --git a/src/main/java/fr/maif/automate/certificate/eventhandler/EventToCommandAdapter.kt b/src/main/java/fr/maif/automate/certificate/eventhandler/EventToCommandAdapter.kt index 19bbcbf..916cbbe 100644 --- a/src/main/java/fr/maif/automate/certificate/eventhandler/EventToCommandAdapter.kt +++ b/src/main/java/fr/maif/automate/certificate/eventhandler/EventToCommandAdapter.kt @@ -1,6 +1,5 @@ package fr.maif.automate.certificate.eventhandler -import arrow.core.Either import arrow.core.None import arrow.core.Some import arrow.core.toOption @@ -9,13 +8,12 @@ import fr.maif.automate.certificate.write.* import fr.maif.automate.commons.eventsourcing.EventReader import fr.maif.automate.commons.eventsourcing.EventStore import io.reactivex.Observable -import io.reactivex.Single import io.reactivex.disposables.Disposable import org.slf4j.Logger import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicReference -class EventToCommandAdapter(private val eventStore: EventStore, val certificates: Certificates, private val eventReader: EventReader) { +class EventToCommandAdapter(private val eventStore: EventStore, private val certificates: Certificates, private val eventReader: EventReader) { companion object { val LOGGER = LoggerFactory.getLogger(EventToCommandAdapter::class.java) as Logger const val GROUP_ID = "EventToCommandAdapter" @@ -24,15 +22,10 @@ class EventToCommandAdapter(private val eventStore: EventStore, val certificates fun startAdapter() { LOGGER.info("Starting event to command adapter") - val disposable = adaptaterStream().subscribe({ - - }, { e -> + val disposable = adaptaterStream() + .retry(3) + .subscribe({}, { e -> LOGGER.error("Error consuming command stream, going to restart", e) - ref.get().toOption().forall{d -> - d.dispose() - true - } - ref.set(adaptaterStream().subscribe()) }) ref.set(disposable) } @@ -70,21 +63,13 @@ class EventToCommandAdapter(private val eventStore: EventStore, val certificates is Some -> { certificates.onCommand(mayBeCommand.t) .flatMap { - //when (r) { - LOGGER.info("Command success, commiting from group id $GROUP_ID and sequence_num $sequence") - eventStore.commit(GROUP_ID, sequence) -// is Either.Right -> { -// -// } -// is Either.Left -> { -// LOGGER.error("Command failure, retrying ...") -// Single.error(RuntimeException("Error handling command ${r.a}")) -// } - //} + LOGGER.info("Command success, committing from group id $GROUP_ID and sequence_num $sequence") + eventStore.commit(GROUP_ID, sequence) }.toObservable() } + is None -> { - LOGGER.info("Empty Command, commiting from froup id $GROUP_ID and sequence_num $sequence") + LOGGER.info("Empty Command, committing from group id $GROUP_ID and sequence_num $sequence") eventStore.commit(GROUP_ID, sequence).toObservable() } } diff --git a/src/main/java/fr/maif/automate/certificate/views/EventsView.kt b/src/main/java/fr/maif/automate/certificate/views/EventsView.kt index 6090e35..e78e858 100644 --- a/src/main/java/fr/maif/automate/certificate/views/EventsView.kt +++ b/src/main/java/fr/maif/automate/certificate/views/EventsView.kt @@ -24,7 +24,7 @@ class EventsView(private val eventStore: EventStore, private val eventReader: Ev } - fun eventsStream(id: Option = None): Observable> { + fun eventsStream(): Observable> { return eventStore .eventStream() .map { Triple(eventReader.read(it), it.eventType, it.sequence) } diff --git a/src/main/java/fr/maif/automate/commons/eventsourcing/PostgresEventStore.kt b/src/main/java/fr/maif/automate/commons/eventsourcing/PostgresEventStore.kt index c3381a7..48b5680 100644 --- a/src/main/java/fr/maif/automate/commons/eventsourcing/PostgresEventStore.kt +++ b/src/main/java/fr/maif/automate/commons/eventsourcing/PostgresEventStore.kt @@ -85,21 +85,9 @@ class PostgresEventStore(private val table: String, private val offetsTable: Str } override fun commit(groupId: String, sequenceNum: Long): Single { - return pgClient.rxGetConnection().flatMap { connection -> - connection.rxSetAutoCommit(false) - .toSingleDefault(Unit).flatMap { - LOGGER.debug("Upsert last commit for $groupId to $sequenceNum") - val query = """INSERT INTO $offetsTable (group_id, sequence_num) VALUES(?, ?) ON CONFLICT (group_id) DO UPDATE SET sequence_num = ?""" - connection.rxUpdateWithParams(query, json { array(groupId, sequenceNum, sequenceNum) }) - }.flatMap { _ -> - connection.rxCommit().toSingle { Unit } - }.doOnError { e -> - LOGGER.error("Error during commit -> rollback", e) - connection.rxRollback().subscribe() - } - .flatMap { _ -> connection.rxSetAutoCommit(true).andThen(Single.just(Unit)) } - .doFinally { connection.rxClose().subscribe() } - } + LOGGER.debug("Upsert last commit for $groupId to $sequenceNum") + val query = """INSERT INTO $offetsTable (group_id, sequence_num) VALUES(?, ?) ON CONFLICT (group_id) DO UPDATE SET sequence_num = ?""" + return pgClient.rxUpdateWithParams(query, json { array(groupId, sequenceNum, sequenceNum) }).map { _ -> Unit } } override fun eventStream(id: String): Observable = eventStream().filter { it.entityId == id } diff --git a/src/main/java/fr/maif/automate/dns/DnsRouter.kt b/src/main/java/fr/maif/automate/dns/DnsRouter.kt index 99793aa..f5466fb 100644 --- a/src/main/java/fr/maif/automate/dns/DnsRouter.kt +++ b/src/main/java/fr/maif/automate/dns/DnsRouter.kt @@ -22,7 +22,7 @@ class DnsRouter(dnsManager: DnsManager) { val createRecord = Handler { req -> val domain = req.pathParam("domain") - val record = Record.fromJson(req.bodyAsJson) + val record = Record.fromJson(req.body().asJsonObject()) dnsManager.createRecord(domain, record) .subscribe ({ result -> when(result) { @@ -41,7 +41,7 @@ class DnsRouter(dnsManager: DnsManager) { val updateRecord = Handler { req -> val domain = req.pathParam("domain") val recordId = req.pathParam("recordId") - val record = Record.fromJson(req.bodyAsJson) + val record = Record.fromJson(req.body().asJsonObject()) dnsManager.updateRecord(domain, recordId.toLong(), record) .subscribe ({ result -> when(result) {