Skip to content

Commit

Permalink
IS-2271: Add consuming of arbeidsuforhet vurdering and storing of res…
Browse files Browse the repository at this point in the history
…ult in db (#364)

* IS-2271: WIP

* IS-2271: WIP

* IS-2271: Fix tests

* IS-2271: Updated tests
  • Loading branch information
vetlesolgaard authored May 29, 2024
1 parent 9e4c60e commit 0dfcab7
Show file tree
Hide file tree
Showing 38 changed files with 560 additions and 49 deletions.
3 changes: 3 additions & 0 deletions src/main/kotlin/no/nav/syfo/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import no.nav.syfo.cronjob.behandlendeenhet.PersonBehandlendeEnhetService
import no.nav.syfo.cronjob.launchCronjobModule
import no.nav.syfo.kafka.launchKafkaModule
import no.nav.syfo.personstatus.PersonoversiktStatusService
import no.nav.syfo.personstatus.infrastructure.database.PersonOversiktStatusRepository
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -67,9 +68,11 @@ fun main() {
databaseModule(
databaseEnvironment = environment.database,
)
val personoversiktStatusRepository = PersonOversiktStatusRepository(database = database)
personoversiktStatusService = PersonoversiktStatusService(
database = database,
pdlClient = pdlClient,
personoversiktStatusRepository = personoversiktStatusRepository,
)
personBehandlendeEnhetService = PersonBehandlendeEnhetService(
database = database,
Expand Down
5 changes: 4 additions & 1 deletion src/main/kotlin/no/nav/syfo/kafka/KafkaModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import no.nav.syfo.cronjob.behandlendeenhet.PersonBehandlendeEnhetService
import no.nav.syfo.dialogmotekandidat.kafka.launchKafkaTaskDialogmotekandidatEndring
import no.nav.syfo.dialogmotestatusendring.kafka.launchKafkaTaskDialogmoteStatusendring
import no.nav.syfo.frisktilarbeid.kafka.launchKafkaTaskFriskTilArbeidVedtak
import no.nav.syfo.trengeroppfolging.kafka.launchTrengerOppfolgingConsumer
import no.nav.syfo.identhendelse.kafka.launchKafkaTaskIdenthendelse
import no.nav.syfo.oppfolgingstilfelle.kafka.launchKafkaTaskOppfolgingstilfellePerson
import no.nav.syfo.pdlpersonhendelse.kafka.launchKafkaTaskPersonhendelse
import no.nav.syfo.personoppgavehendelse.kafka.launchKafkaTaskPersonoppgavehendelse
import no.nav.syfo.personstatus.PersonoversiktStatusService
import no.nav.syfo.personstatus.infrastructure.kafka.ArbeidsuforhetvurderingConsumer
import no.nav.syfo.trengeroppfolging.kafka.launchTrengerOppfolgingConsumer

fun launchKafkaModule(
applicationState: ApplicationState,
Expand Down Expand Up @@ -65,4 +66,6 @@ fun launchKafkaModule(
applicationState = applicationState,
kafkaEnvironment = environment.kafka,
)
ArbeidsuforhetvurderingConsumer(personoversiktStatusService = personoversiktStatusService)
.start(applicationState = applicationState, kafkaEnvironment = environment.kafka)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import no.nav.syfo.client.pdl.PdlClient
import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.oppfolgingstilfelle.domain.PersonOppfolgingstilfelleVirksomhet
import no.nav.syfo.personoppgavehendelse.kafka.*
import no.nav.syfo.personstatus.application.IPersonOversiktStatusRepository
import no.nav.syfo.personstatus.db.*
import no.nav.syfo.personstatus.domain.*
import java.sql.Connection
Expand All @@ -13,6 +14,7 @@ import java.time.LocalDate
class PersonoversiktStatusService(
private val database: DatabaseInterface,
private val pdlClient: PdlClient,
private val personoversiktStatusRepository: IPersonOversiktStatusRepository,
) {
private val isUbehandlet = true
private val isBehandlet = false
Expand Down Expand Up @@ -83,6 +85,13 @@ class PersonoversiktStatusService(
}
}

fun updateArbeidsuforhetvurderingStatus(personident: PersonIdent, isAktivVurdering: Boolean): Result<Int> {
return personoversiktStatusRepository.updateArbeidsuforhetvurderingStatus(
personident = personident,
isAktivVurdering = isAktivVurdering
)
}

private fun createOrUpdatePersonOversiktStatus(
connection: Connection,
personident: PersonIdent,
Expand Down Expand Up @@ -135,8 +144,14 @@ class PersonoversiktStatusService(
connection.updateBehandlerBerOmBistand(isUbehandlet, personident)
OversikthendelseType.BEHANDLER_BER_OM_BISTAND_BEHANDLET ->
connection.updateBehandlerBerOmBistand(isBehandlet, personident)
OversikthendelseType.ARBEIDSUFORHET_VURDER_AVSLAG_MOTTATT -> connection.updateArbeidsuforhetVurderAvslag(isUbehandlet, personident)
OversikthendelseType.ARBEIDSUFORHET_VURDER_AVSLAG_BEHANDLET -> connection.updateArbeidsuforhetVurderAvslag(isBehandlet, personident)
OversikthendelseType.ARBEIDSUFORHET_VURDER_AVSLAG_MOTTATT -> connection.updateArbeidsuforhetVurderAvslag(
isUbehandlet,
personident
)
OversikthendelseType.ARBEIDSUFORHET_VURDER_AVSLAG_BEHANDLET -> connection.updateArbeidsuforhetVurderAvslag(
isBehandlet,
personident
)
}

COUNT_KAFKA_CONSUMER_PERSONOPPGAVEHENDELSE_UPDATED_PERSONOVERSIKT_STATUS.increment()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package no.nav.syfo.personstatus.application

import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.personstatus.domain.PersonOversiktStatus

interface IPersonOversiktStatusRepository {

fun updateArbeidsuforhetvurderingStatus(personident: PersonIdent, isAktivVurdering: Boolean): Result<Int>

fun getPersonOversiktStatus(personident: PersonIdent): PersonOversiktStatus?
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ fun ResultSet.toPPersonOversiktStatus(): PPersonOversiktStatus =
antallSykedager = getObject("antall_sykedager") as Int?,
arbeidsuforhetVurderAvslagUbehandlet = getBoolean("arbeidsuforhet_vurder_avslag_ubehandlet"),
friskmeldingTilArbeidsformidlingFom = getObject("friskmelding_til_arbeidsformidling_fom", LocalDate::class.java),
isAktivArbeidsuforhetvurdering = getBoolean("arbeidsuforhet_aktiv_vurdering")
)

fun ResultSet.toVeilederBrukerKnytning(): VeilederBrukerKnytning =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ data class PPersonOversiktStatus(
val antallSykedager: Int?,
val arbeidsuforhetVurderAvslagUbehandlet: Boolean,
val friskmeldingTilArbeidsformidlingFom: LocalDate?,
val isAktivArbeidsuforhetvurdering: Boolean,
)

fun PPersonOversiktStatus.toPersonOversiktStatus(
personOppfolgingstilfelleVirksomhetList: List<PersonOppfolgingstilfelleVirksomhet>,
personOppfolgingstilfelleVirksomhetList: List<PersonOppfolgingstilfelleVirksomhet> = emptyList()
) = PersonOversiktStatus(
fnr = fnr,
navn = navn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ data class PersonOversiktStatus(
val behandlerBerOmBistandUbehandlet: Boolean = false,
val arbeidsuforhetVurderAvslagUbehandlet: Boolean = false,
val friskmeldingTilArbeidsformidlingFom: LocalDate? = null,
val isAktivArbeidsuforhetvurdering: Boolean = false,
) {
constructor(fnr: String) : this(
constructor(fnr: String, isAktivArbeidsuforhetvurdering: Boolean = false) : this(
null, fnr = fnr, null, null, null,
null, false, null, null, null,
null, null, null, null, null, null,
false, false, false, false, false,
null, false, false,
null, false, false, null, isAktivArbeidsuforhetvurdering = isAktivArbeidsuforhetvurdering
)
}

Expand Down Expand Up @@ -135,59 +136,45 @@ fun PersonOversiktStatus.applyHendelse(
OversikthendelseType.MOTEBEHOV_SVAR_MOTTATT -> this.copy(
motebehovUbehandlet = true,
)

OversikthendelseType.MOTEBEHOV_SVAR_BEHANDLET -> this.copy(
motebehovUbehandlet = false,
)

OversikthendelseType.OPPFOLGINGSPLANLPS_BISTAND_MOTTATT -> this.copy(
oppfolgingsplanLPSBistandUbehandlet = true,
)

OversikthendelseType.OPPFOLGINGSPLANLPS_BISTAND_BEHANDLET -> this.copy(
oppfolgingsplanLPSBistandUbehandlet = false,
)

OversikthendelseType.DIALOGMOTESVAR_MOTTATT -> this.copy(
dialogmotesvarUbehandlet = true,
)

OversikthendelseType.DIALOGMOTESVAR_BEHANDLET -> this.copy(
dialogmotesvarUbehandlet = false,
)

OversikthendelseType.BEHANDLERDIALOG_SVAR_MOTTATT -> this.copy(
behandlerdialogSvarUbehandlet = true,
)

OversikthendelseType.BEHANDLERDIALOG_SVAR_BEHANDLET -> this.copy(
behandlerdialogSvarUbehandlet = false,
)

OversikthendelseType.BEHANDLERDIALOG_MELDING_UBESVART_MOTTATT -> this.copy(
behandlerdialogUbesvartUbehandlet = true,
)

OversikthendelseType.BEHANDLERDIALOG_MELDING_UBESVART_BEHANDLET -> this.copy(
behandlerdialogUbesvartUbehandlet = false,
)

OversikthendelseType.BEHANDLERDIALOG_MELDING_AVVIST_MOTTATT -> this.copy(
behandlerdialogAvvistUbehandlet = true,
)

OversikthendelseType.BEHANDLERDIALOG_MELDING_AVVIST_BEHANDLET -> this.copy(
behandlerdialogAvvistUbehandlet = false,
)

OversikthendelseType.AKTIVITETSKRAV_VURDER_STANS_MOTTATT -> this.copy(
aktivitetskravVurderStansUbehandlet = true,
)

OversikthendelseType.AKTIVITETSKRAV_VURDER_STANS_BEHANDLET -> this.copy(
aktivitetskravVurderStansUbehandlet = false,
)

OversikthendelseType.BEHANDLER_BER_OM_BISTAND_MOTTATT -> this.copy(
behandlerBerOmBistandUbehandlet = true
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package no.nav.syfo.personstatus.infrastructure.database

import no.nav.syfo.application.database.DatabaseInterface
import no.nav.syfo.application.database.toList
import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.personstatus.application.IPersonOversiktStatusRepository
import no.nav.syfo.personstatus.domain.PPersonOversiktStatus
import no.nav.syfo.personstatus.domain.PersonOversiktStatus
import no.nav.syfo.personstatus.domain.toPersonOversiktStatus
import java.sql.ResultSet
import java.sql.Timestamp
import java.time.Instant
import java.time.LocalDate
import java.time.OffsetDateTime
import java.util.*

class PersonOversiktStatusRepository(private val database: DatabaseInterface) : IPersonOversiktStatusRepository {

override fun updateArbeidsuforhetvurderingStatus(
personIdent: PersonIdent,
isAktivVurdering: Boolean
): Result<Int> {
return try {
database.connection.use { connection ->
val tidspunkt = Timestamp.from(Instant.now())
val rowsUpdated = connection.prepareStatement(UPDATE_OR_INSERT_PERSON_OVERSIKT_STATUS).use {
it.setString(1, UUID.randomUUID().toString())
it.setString(2, personIdent.value)
it.setBoolean(3, isAktivVurdering)
it.setTimestamp(4, tidspunkt)
it.setTimestamp(5, tidspunkt)
it.executeUpdate()
}
val isSuccess = rowsUpdated == 1
if (isSuccess) {
connection.commit()
Result.success(rowsUpdated)
} else {
connection.rollback()
Result.failure(RuntimeException("Failed to update arbeidsuforhet vurdering status for person with fnr: ${personIdent.value}"))
}
}
} catch (e: Exception) {
Result.failure(e)
}
}

override fun getPersonOversiktStatus(personIdent: PersonIdent): PersonOversiktStatus? {
database.connection.use { connection ->
val personoversiktStatus = connection.prepareStatement(GET_PERSON_OVERSIKT_STATUS).use {
it.setString(1, personIdent.value)
it.executeQuery().toList { toPPersonOversiktStatus() }
}
return personoversiktStatus.firstOrNull()?.toPersonOversiktStatus()
}
}

companion object {
private const val GET_PERSON_OVERSIKT_STATUS =
"""
SELECT *
FROM PERSON_OVERSIKT_STATUS
WHERE fnr = ?
"""

private const val UPDATE_OR_INSERT_PERSON_OVERSIKT_STATUS =
"""
INSERT INTO person_oversikt_status (
id,
uuid,
fnr,
arbeidsuforhet_aktiv_vurdering,
opprettet,
sist_endret
) VALUES (DEFAULT, ?, ?, ?, ?, ?)
ON CONFLICT (fnr)
DO UPDATE SET
arbeidsuforhet_aktiv_vurdering = EXCLUDED.arbeidsuforhet_aktiv_vurdering,
sist_endret = EXCLUDED.sist_endret
"""
}
}

private fun ResultSet.toPPersonOversiktStatus(): PPersonOversiktStatus =
PPersonOversiktStatus(
id = getInt("id"),
uuid = getString("uuid").let { UUID.fromString(it) },
veilederIdent = getString("tildelt_veileder"),
fnr = getString("fnr"),
navn = getString("name"),
enhet = getString("tildelt_enhet"),
tildeltEnhetUpdatedAt = getObject("tildelt_enhet_updated_at", OffsetDateTime::class.java),
motebehovUbehandlet = getObject("motebehov_ubehandlet") as Boolean?,
oppfolgingsplanLPSBistandUbehandlet = getObject("oppfolgingsplan_lps_bistand_ubehandlet") as Boolean?,
dialogmotesvarUbehandlet = getObject("dialogmotesvar_ubehandlet") as Boolean,
dialogmotekandidat = getObject("dialogmotekandidat") as Boolean?,
dialogmotekandidatGeneratedAt = getObject("dialogmotekandidat_generated_at", OffsetDateTime::class.java),
motestatus = getString("motestatus"),
motestatusGeneratedAt = getObject("motestatus_generated_at", OffsetDateTime::class.java),
oppfolgingstilfelleUpdatedAt = getObject("oppfolgingstilfelle_updated_at", OffsetDateTime::class.java),
oppfolgingstilfelleGeneratedAt = getObject("oppfolgingstilfelle_generated_at", OffsetDateTime::class.java),
oppfolgingstilfelleStart = getObject("oppfolgingstilfelle_start", LocalDate::class.java),
oppfolgingstilfelleEnd = getObject("oppfolgingstilfelle_end", LocalDate::class.java),
oppfolgingstilfelleBitReferanseUuid = getString("oppfolgingstilfelle_bit_referanse_uuid")?.let {
UUID.fromString(
it
)
},
oppfolgingstilfelleBitReferanseInntruffet = getObject(
"oppfolgingstilfelle_bit_referanse_inntruffet",
OffsetDateTime::class.java
),
aktivitetskrav = getString("aktivitetskrav"),
aktivitetskravStoppunkt = getObject("aktivitetskrav_stoppunkt", LocalDate::class.java),
aktivitetskravUpdatedAt = getObject("aktivitetskrav_sist_vurdert", OffsetDateTime::class.java),
aktivitetskravVurderingFrist = getObject("aktivitetskrav_vurdering_frist", LocalDate::class.java),
behandlerdialogSvarUbehandlet = getBoolean("behandlerdialog_svar_ubehandlet"),
behandlerdialogUbesvartUbehandlet = getBoolean("behandlerdialog_ubesvart_ubehandlet"),
behandlerdialogAvvistUbehandlet = getBoolean("behandlerdialog_avvist_ubehandlet"),
aktivitetskravVurderStansUbehandlet = getBoolean("aktivitetskrav_vurder_stans_ubehandlet"),
trengerOppfolging = getBoolean("trenger_oppfolging") as Boolean,
trengerOppfolgingFrist = getObject("trenger_oppfolging_frist", LocalDate::class.java),
behandlerBerOmBistandUbehandlet = getBoolean("behandler_bistand_ubehandlet"),
antallSykedager = getObject("antall_sykedager") as Int?,
arbeidsuforhetVurderAvslagUbehandlet = getBoolean("arbeidsuforhet_vurder_avslag_ubehandlet"),
isAktivArbeidsuforhetvurdering = getBoolean("arbeidsuforhet_aktiv_vurdering"),
friskmeldingTilArbeidsformidlingFom = getObject("friskmelding_til_arbeidsformidling_fom", LocalDate::class.java),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package no.nav.syfo.personstatus.infrastructure.kafka

import no.nav.syfo.application.ApplicationState
import no.nav.syfo.application.kafka.KafkaEnvironment
import no.nav.syfo.application.kafka.kafkaAivenConsumerConfig
import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.kafka.KafkaConsumerService
import no.nav.syfo.kafka.launchKafkaTask
import no.nav.syfo.personstatus.PersonoversiktStatusService
import no.nav.syfo.util.configuredJacksonMapper
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.Deserializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*

class ArbeidsuforhetvurderingConsumer(
private val personoversiktStatusService: PersonoversiktStatusService,
) : KafkaConsumerService<ArbeidsuforhetvurderingRecord> {

override val pollDurationInMillis: Long = 1000

override fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, ArbeidsuforhetvurderingRecord>) {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
if (records.count() > 0) {
log.info("ArbeidsuforhetvurderingConsumer trace: Received ${records.count()} records")
processRecords(records = records)
kafkaConsumer.commitSync()
}
}

private fun processRecords(records: ConsumerRecords<String, ArbeidsuforhetvurderingRecord>): List<Result<Int>> {
val validRecords = records.requireNoNulls()
return validRecords.map { record ->
val recordValue = record.value()
personoversiktStatusService.updateArbeidsuforhetvurderingStatus(
personident = PersonIdent(recordValue.personident),
isAktivVurdering = !recordValue.isFinal,
)
}
}

fun start(applicationState: ApplicationState, kafkaEnvironment: KafkaEnvironment) {
val consumerProperties = Properties().apply {
putAll(kafkaAivenConsumerConfig(kafkaEnvironment = kafkaEnvironment))
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] =
ArbeidsuforhetvurderingRecordDeserializer::class.java.canonicalName
}
launchKafkaTask(
applicationState = applicationState,
kafkaConsumerService = this,
consumerProperties = consumerProperties,
topic = ARBEIDSUFORHET_VURDERING_TOPIC,
)
}

companion object {
private const val ARBEIDSUFORHET_VURDERING_TOPIC = "teamsykefravr.arbeidsuforhet-vurdering"
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}
}

class ArbeidsuforhetvurderingRecordDeserializer : Deserializer<ArbeidsuforhetvurderingRecord> {
private val mapper = configuredJacksonMapper()
override fun deserialize(topic: String, data: ByteArray): ArbeidsuforhetvurderingRecord =
mapper.readValue(data, ArbeidsuforhetvurderingRecord::class.java)
}
Loading

0 comments on commit 0dfcab7

Please sign in to comment.