Skip to content

Commit

Permalink
IS-2329: Update aktiv aktivitetskrav status when consuming vurdering …
Browse files Browse the repository at this point in the history
…topic
  • Loading branch information
vetlesolgaard committed Jul 11, 2024
1 parent fd4004d commit 35981d9
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
package no.nav.syfo.aktivitetskravvurdering.kafka

import no.nav.syfo.aktivitetskravvurdering.persistAktivitetskrav
import no.nav.syfo.domain.PersonIdent
import no.nav.syfo.personstatus.PersonoversiktStatusService
import no.nav.syfo.personstatus.infrastructure.database.DatabaseInterface
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import java.time.Duration

class KafkaAktivitetskravVurderingConsumer(
class AktivitetskravVurderingConsumer(
private val database: DatabaseInterface,
) : KafkaConsumerService<KafkaAktivitetskravVurdering> {
private val personoversiktStatusService: PersonoversiktStatusService,
) : KafkaConsumerService<AktivitetskravVurderingRecord> {

override val pollDurationInMillis: Long = 1000

override fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, KafkaAktivitetskravVurdering>) {
override fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, AktivitetskravVurderingRecord>) {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
if (records.count() > 0) {
processRecords(records)
processRecordsV2(records)
kafkaConsumer.commitSync()
}
}

private fun processRecords(
consumerRecords: ConsumerRecords<String, KafkaAktivitetskravVurdering>,
consumerRecords: ConsumerRecords<String, AktivitetskravVurderingRecord>,
) {
val (tombstoneRecords, validRecords) = consumerRecords.partition { it.value() == null }

Expand All @@ -34,7 +38,7 @@ class KafkaAktivitetskravVurderingConsumer(

database.connection.use { connection ->
validRecords.forEach { record ->
log.info("Received ${KafkaAktivitetskravVurdering::class.java.simpleName} with key=${record.key()}, ready to process.")
log.info("Received ${AktivitetskravVurderingRecord::class.java.simpleName} with key=${record.key()}, ready to process.")
val aktivitetskrav = record.value().toAktivitetskrav()
persistAktivitetskrav(
connection = connection,
Expand All @@ -45,7 +49,23 @@ class KafkaAktivitetskravVurderingConsumer(
}
}

private fun processRecordsV2(records: ConsumerRecords<String, AktivitetskravVurderingRecord>) {
val (tombstoneRecords, validRecords) = records.partition { it.value() == null }
if (tombstoneRecords.isNotEmpty()) {
val numberOfTombstones = tombstoneRecords.size
log.error("Value of $numberOfTombstones ConsumerRecord are null, most probably due to a tombstone. Contact the owner of the topic if an error is suspected")
}
validRecords.forEach { record ->
log.info("Received ${AktivitetskravVurderingRecord::class.java.simpleName} with key=${record.key()}, ready to process.")
val vurdering = record.value()
personoversiktStatusService.upsertAktivitetskravvurderingStatus(
PersonIdent(vurdering.personIdent),
!vurdering.isFinal
)
}
}

companion object {
private val log = LoggerFactory.getLogger(KafkaAktivitetskravVurdering::class.java)
private val log = LoggerFactory.getLogger(AktivitetskravVurderingRecord::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import no.nav.syfo.domain.PersonIdent
import java.time.LocalDate
import java.time.OffsetDateTime

data class KafkaAktivitetskravVurdering(
data class AktivitetskravVurderingRecord(
val uuid: String,
val personIdent: String,
val createdAt: OffsetDateTime,
val status: String,
val isFinal: Boolean,
val stoppunktAt: LocalDate,
val beskrivelse: String?,
val sistVurdert: OffsetDateTime?,
val frist: LocalDate?,
)

fun KafkaAktivitetskravVurdering.toAktivitetskrav() = Aktivitetskrav(
fun AktivitetskravVurderingRecord.toAktivitetskrav() = Aktivitetskrav(
personIdent = PersonIdent(personIdent),
status = AktivitetskravStatus.valueOf(this.status),
stoppunkt = this.stoppunktAt,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.nav.syfo.aktivitetskravvurdering.kafka

import no.nav.syfo.ApplicationState
import no.nav.syfo.personstatus.PersonoversiktStatusService
import no.nav.syfo.personstatus.infrastructure.database.database
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaEnvironment
import no.nav.syfo.personstatus.infrastructure.kafka.kafkaAivenConsumerConfig
Expand All @@ -15,9 +16,11 @@ const val AKTIVITETSKRAV_VURDERING_TOPIC = "teamsykefravr.aktivitetskrav-vurderi
fun launchKafkaTaskAktivitetskravVurdering(
applicationState: ApplicationState,
kafkaEnvironment: KafkaEnvironment,
personoversiktStatusService: PersonoversiktStatusService,
) {
val kafkaAktivitetskravVurderingConsumer = KafkaAktivitetskravVurderingConsumer(
val aktivitetskravVurderingConsumer = AktivitetskravVurderingConsumer(
database = database,
personoversiktStatusService = personoversiktStatusService,
)
val consumerProperties = Properties().apply {
putAll(kafkaAivenConsumerConfig(kafkaEnvironment = kafkaEnvironment))
Expand All @@ -29,12 +32,12 @@ fun launchKafkaTaskAktivitetskravVurdering(
applicationState = applicationState,
topic = AKTIVITETSKRAV_VURDERING_TOPIC,
consumerProperties = consumerProperties,
kafkaConsumerService = kafkaAktivitetskravVurderingConsumer,
kafkaConsumerService = aktivitetskravVurderingConsumer,
)
}

class KafkaAktivitetskravVurderingDeserializer : Deserializer<KafkaAktivitetskravVurdering> {
class KafkaAktivitetskravVurderingDeserializer : Deserializer<AktivitetskravVurderingRecord> {
private val mapper = configuredJacksonMapper()
override fun deserialize(topic: String, data: ByteArray): KafkaAktivitetskravVurdering =
mapper.readValue(data, KafkaAktivitetskravVurdering::class.java)
override fun deserialize(topic: String, data: ByteArray): AktivitetskravVurderingRecord =
mapper.readValue(data, AktivitetskravVurderingRecord::class.java)
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ class PersonoversiktStatusService(
)
}

fun upsertAktivitetskravvurderingStatus(personident: PersonIdent, isAktivVurdering: Boolean): Result<Int> {
return personoversiktStatusRepository.upsertAktivitetskravAktivStatus(
fun upsertAktivitetskravvurderingStatus(personident: PersonIdent, isAktivVurdering: Boolean): Result<Int> =
personoversiktStatusRepository.upsertAktivitetskravAktivStatus(
personident = personident,
isAktivVurdering = isAktivVurdering
)
}

private fun createOrUpdatePersonOversiktStatus(
connection: Connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fun launchKafkaModule(
launchKafkaTaskAktivitetskravVurdering(
applicationState = applicationState,
kafkaEnvironment = environment.kafka,
personoversiktStatusService = personoversiktStatusService,
)

launchKafkaTaskIdenthendelse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import io.ktor.server.testing.*
import io.mockk.*
import no.nav.syfo.aktivitetskravvurdering.domain.AktivitetskravStatus
import no.nav.syfo.oppfolgingstilfelle.kafka.toPersonOversiktStatus
import no.nav.syfo.personstatus.PersonoversiktStatusService
import no.nav.syfo.personstatus.application.arbeidsuforhet.IArbeidsuforhetvurderingClient
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.IOppfolgingsoppgaveClient
import no.nav.syfo.personstatus.db.*
import no.nav.syfo.personstatus.infrastructure.database.repository.PersonOversiktStatusRepository
import no.nav.syfo.testutil.*
import no.nav.syfo.testutil.generator.*
import org.amshove.kluent.*
Expand All @@ -21,18 +25,26 @@ class KafkaAktivitetskravVurderingConsumerSpek : Spek({
val externalMockEnvironment = ExternalMockEnvironment.instance
val database = externalMockEnvironment.database

val kafkaConsumerMock = mockk<KafkaConsumer<String, KafkaAktivitetskravVurdering>>()
val kafkaAktivitetskravVurderingConsumer = KafkaAktivitetskravVurderingConsumer(database = database)
val kafkaConsumerMock = mockk<KafkaConsumer<String, AktivitetskravVurderingRecord>>()
val personOppgaveRepository = PersonOversiktStatusRepository(database = database)
val personoversiktStatusService = PersonoversiktStatusService(
database = database,
pdlClient = externalMockEnvironment.pdlClient,
arbeidsuforhetvurderingClient = mockk<IArbeidsuforhetvurderingClient>(),
personoversiktStatusRepository = personOppgaveRepository,
oppfolgingsoppgaveClient = mockk<IOppfolgingsoppgaveClient>(),
)
val aktivitetskravVurderingConsumer =
AktivitetskravVurderingConsumer(database = database, personoversiktStatusService = personoversiktStatusService)

val aktivitetskravVurderingTopicPartition = aktivitetskravVurderingTopicPartition()
val kafkaAktivitetskravVurderingNy = generateKafkaAktivitetskravVurdering(
status = AktivitetskravStatus.NY
)
val kafkaAktivitetskravVurderingNy = generateKafkaAktivitetskravVurdering(status = AktivitetskravStatus.NY, isFinal = false)
val kafkaAktivitetskravVurderingAvventer = generateKafkaAktivitetskravVurdering(
status = AktivitetskravStatus.AVVENT,
beskrivelse = "Avventer",
sistVurdert = OffsetDateTime.now().minusMinutes(30),
frist = LocalDate.now().plusWeeks(1),
isFinal = false,
)

beforeEachTest {
Expand All @@ -42,19 +54,19 @@ class KafkaAktivitetskravVurderingConsumerSpek : Spek({
every { kafkaConsumerMock.commitSync() } returns Unit
}

describe("${KafkaAktivitetskravVurderingConsumer::class.java.simpleName}: pollAndProcessRecords") {
describe("${AktivitetskravVurderingConsumer::class.java.simpleName}: pollAndProcessRecords") {
it("creates new PersonOversiktStatus if no PersonOversiktStatus exists for personident") {
every { kafkaConsumerMock.poll(any<Duration>()) } returns ConsumerRecords(
mapOf(
aktivitetskravVurderingTopicPartition to listOf(
aktivitetskravVurderingConsumerRecord(
kafkaAktivitetskravVurdering = kafkaAktivitetskravVurderingNy,
aktivitetskravVurderingRecord = kafkaAktivitetskravVurderingNy,
),
)
)
)

kafkaAktivitetskravVurderingConsumer.pollAndProcessRecords(
aktivitetskravVurderingConsumer.pollAndProcessRecords(
kafkaConsumer = kafkaConsumerMock,
)

Expand Down Expand Up @@ -84,7 +96,7 @@ class KafkaAktivitetskravVurderingConsumerSpek : Spek({
mapOf(
aktivitetskravVurderingTopicPartition to listOf(
aktivitetskravVurderingConsumerRecord(
kafkaAktivitetskravVurdering = kafkaAktivitetskravVurderingAvventer,
aktivitetskravVurderingRecord = kafkaAktivitetskravVurderingAvventer,
),
)
)
Expand All @@ -98,7 +110,7 @@ class KafkaAktivitetskravVurderingConsumerSpek : Spek({
)
)

kafkaAktivitetskravVurderingConsumer.pollAndProcessRecords(
aktivitetskravVurderingConsumer.pollAndProcessRecords(
kafkaConsumer = kafkaConsumerMock,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package no.nav.syfo.testutil.generator

import no.nav.syfo.aktivitetskravvurdering.domain.AktivitetskravStatus
import no.nav.syfo.aktivitetskravvurdering.kafka.AKTIVITETSKRAV_VURDERING_TOPIC
import no.nav.syfo.aktivitetskravvurdering.kafka.KafkaAktivitetskravVurdering
import no.nav.syfo.aktivitetskravvurdering.kafka.AktivitetskravVurderingRecord
import no.nav.syfo.testutil.UserConstants
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
Expand All @@ -15,7 +15,8 @@ fun generateKafkaAktivitetskravVurdering(
frist: LocalDate? = null,
sistVurdert: OffsetDateTime? = null,
beskrivelse: String? = null,
) = KafkaAktivitetskravVurdering(
isFinal: Boolean,
) = AktivitetskravVurderingRecord(
uuid = UUID.randomUUID().toString(),
personIdent = UserConstants.ARBEIDSTAKER_FNR,
createdAt = OffsetDateTime.now(),
Expand All @@ -24,6 +25,7 @@ fun generateKafkaAktivitetskravVurdering(
beskrivelse = beskrivelse,
sistVurdert = sistVurdert,
frist = frist,
isFinal = isFinal
)

fun aktivitetskravVurderingTopicPartition() = TopicPartition(
Expand All @@ -32,11 +34,11 @@ fun aktivitetskravVurderingTopicPartition() = TopicPartition(
)

fun aktivitetskravVurderingConsumerRecord(
kafkaAktivitetskravVurdering: KafkaAktivitetskravVurdering,
aktivitetskravVurderingRecord: AktivitetskravVurderingRecord,
) = ConsumerRecord(
AKTIVITETSKRAV_VURDERING_TOPIC,
0,
1,
"key1",
kafkaAktivitetskravVurdering
aktivitetskravVurderingRecord
)

0 comments on commit 35981d9

Please sign in to comment.