Skip to content

Commit

Permalink
feat: Redis Stream 연동을 위한 기반 코드 작성 (#294)
Browse files Browse the repository at this point in the history
* feat: redis stream 빈 설정

* feat: redis stream consume 클래스 생성

- RedisStreamConsumer 소비자 클래스입니다.
    - RedisStream에서 메시지를 읽고 pending 상태로 만듭니다.

- RedisOperator는 redis의 명령을 추상화하여 다른 클래스에서 쉽게 사용할 수 있도록합니다.

* chore: RedisStream 설정 분리

- RedisStreamConsumer에서의 streamkey, consumer group name을 application.yaml에서 관리하도록 변경합니다.

* fix: @value 매핑 오류 수정

* feat: LazyAuctioneer 생성

### LazyAuctioneer
- 경매 입찰 과정을 비동기로 나중에 처리하는 작업을 실행하기 위한 클래스입니다.

### AuctionPurchaseRequestMessage 데이터 클래스
- 경매 입찰 프로세스에 필요한 최소한의 데이터를 가지고 있는 데이터 클래스입니다.

* feat: Redis stream message ack 로직 추가

* feat: RedisStreamConfig 설정 통합

- RedisStreamConsumer로 부터 Stream에 필요한 정보를 조회하도록 변경하였습니다.
- streamKey, consumerGroupName, consumerName이 config에 추가되었습니다.

* feat: XPENDING, XCLAIM redis operator 구현

* feat: 미처리 PENDING 메시지 처리 스케줄러 구현

- 1분 이상 미처리 상태인 메시지를 처리하는 스케줄러를 구현하였습니다.

* fix: test 실패 오류 수정

- 테스트 환경에서만 적용되는 값만 치환해서 사용할 수 있도록 application.yaml을 application-test.yaml으로 변경
- ServiceTest, RepositoryTest 프로파일 test 적용

* refactor: Auctioneer 메서드 파라미터 변경

* fix: AutioneeorProxy 제거

* test: 테스트 코드 버그 수정
  • Loading branch information
yudonggeun authored Aug 25, 2024
1 parent 2a0175c commit 5474177
Show file tree
Hide file tree
Showing 18 changed files with 473 additions and 3,282 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ tasks.named('test') {
}

asciidoctor.doFirst {
delete file('src/main/resources/static/docs')
delete file('src/main/resources/docs')
}

asciidoctor {
Expand All @@ -94,7 +94,7 @@ task createDocument(type: Copy) {
dependsOn asciidoctor

from asciidoctor.outputDir
into file("src/main/resources/static")
into file("src/main/resources/docs")
}

bootJar {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.wootecam.luckyvickyauction.consumer.config;

import java.util.UUID;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;

@Configuration
public class RedisStreamConfig {

@Value("${spring.data.redis.host}")
private String redisHost;

@Value("${spring.data.redis.port}")
private String redisPort;

@Value("${spring.data.redis.password}")
private String redisPassword;

@Getter
@Value("${stream.key}")
private String streamKey;
@Getter
@Value("${stream.consumer.groupName}")
private String consumerGroupName;
@Getter
private String consumerName = UUID.randomUUID().toString();

// todo [추후 서버를 분리하면 모듈이 분리될 상황을 가정한 빈입니다.] [2024-08-23] [yudonggeun]
// @Bean
public RedisConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(redisHost);
redisStandaloneConfiguration.setPort(Integer.parseInt(redisPort));
redisStandaloneConfiguration.setPassword(redisPassword);

return new LettuceConnectionFactory(redisStandaloneConfiguration);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.wootecam.luckyvickyauction.consumer.presentation;

import com.wootecam.luckyvickyauction.consumer.config.RedisStreamConfig;
import java.time.Duration;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Slf4j
@EnableScheduling
@Component
@RequiredArgsConstructor
public class PendingMessageConsumer {

private final RedisOperator redisOperator;
private final RedisStreamConfig redisStreamConfig;

@Scheduled(fixedRate = 1000)
public void consumePendingMessage() {

// 처리되지 않은 메시지 조회
PendingMessages pendingMessageInfos = redisOperator.getPendingMessage(
redisStreamConfig.getStreamKey(),
redisStreamConfig.getConsumerGroupName(),
redisStreamConfig.getConsumerName()
);

RecordId[] recordIds = pendingMessageInfos.stream().map(
PendingMessage::getId
).toArray(RecordId[]::new);

// 처리되지 않은 메시지 데이터 조회
List<MapRecord<String, Object, Object>> messages = redisOperator.claim(
redisStreamConfig.getStreamKey(),
redisStreamConfig.getConsumerGroupName(),
redisStreamConfig.getConsumerName(),
Duration.ofMinutes(1),
recordIds
);

// 메시지 처리
messages.forEach(message -> {
log.info("MessageId: {}", message.getId());
log.info("Stream: {}", message.getStream());
log.info("Body: {}", message.getValue());
redisOperator.acknowledge(redisStreamConfig.getConsumerGroupName(), message);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.wootecam.luckyvickyauction.consumer.presentation;

import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandKeyword;
import io.lettuce.core.protocol.CommandType;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class RedisOperator {

private static final Logger log = LoggerFactory.getLogger(RedisOperator.class);
private final RedisConnectionFactory redisConnectionFactory;
private final StringRedisTemplate redisTemplate;


public boolean isStreamConsumerGroupExist(String streamKey, String consumerGroupName) {
Iterator<StreamInfo.XInfoGroup> iterator = this.redisTemplate
.opsForStream().groups(streamKey).stream().iterator();

while (iterator.hasNext()) {
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
if (xInfoGroup.groupName().equals(consumerGroupName)) {
return true;
}
}
return false;
}

public void createStreamConsumerGroup(String streamKey, String consumerGroupName) {
// if stream is not exist, create stream and consumer group of it
if (Boolean.FALSE.equals(this.redisTemplate.hasKey(streamKey))) {
RedisAsyncCommands commands = (RedisAsyncCommands) redisConnectionFactory
.getConnection()
.getNativeConnection();

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)
.add(CommandKeyword.CREATE)
.add(streamKey)
.add(consumerGroupName)
.add("0")
.add("MKSTREAM");

commands.dispatch(CommandType.XGROUP, new StatusOutput(StringCodec.UTF8), args);
}
// stream is exist, create consumerGroup if is not exist
else {
if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
this.redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}
}

public StreamMessageListenerContainer createStreamMessageListenerContainer() {
return StreamMessageListenerContainer.create(redisConnectionFactory,
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions.builder()
.hashKeySerializer(new StringRedisSerializer())
.hashValueSerializer(new StringRedisSerializer())
.pollTimeout(Duration.ofMillis(20))
.build()
);
}

public void acknowledge(String consumerGroup, MapRecord<String, Object, Object> message) {
Long ack = this.redisTemplate.opsForStream().acknowledge(consumerGroup, message);
if (ack == 0) {
log.error("Acknowledge failed. MessageId: {}", message.getId());
} else {
log.info("Acknowledge success. MessageId: {}", message.getId());
}
}

public PendingMessages getPendingMessage(String streamKey, String consumerGroup, String consumerName) {
return this.redisTemplate.opsForStream()
.pending(streamKey,
Consumer.from(consumerGroup, consumerName),
Range.unbounded(),
100L
);
}

public List<MapRecord<String, Object, Object>> claim(String streamKey,
String consumerGroup, String consumerName,
Duration minIdleTime, RecordId... messageIds) {
if (messageIds.length < 1) {
return List.of();
}

return this.redisTemplate.opsForStream()
.claim(streamKey, consumerGroup, consumerName, minIdleTime, messageIds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.wootecam.luckyvickyauction.consumer.presentation;

import com.wootecam.luckyvickyauction.consumer.config.RedisStreamConfig;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisStreamConsumer implements StreamListener<String, MapRecord<String, Object, Object>> {

private final RedisOperator redisOperator;
private final RedisStreamConfig redisStreamConfig;
private StreamMessageListenerContainer<String, MapRecord<String, Object, Object>> listenerContainer;
private Subscription subscription;

@Override
public void onMessage(MapRecord<String, Object, Object> message) {
log.info("MessageId: {}", message.getId());
log.info("Stream: {}", message.getStream());
log.info("Body: {}", message.getValue());
redisOperator.acknowledge(redisStreamConfig.getConsumerGroupName(), message);
}

@PostConstruct
public void init() throws InterruptedException {
// Consumer Group 설정
this.redisOperator.createStreamConsumerGroup(
redisStreamConfig.getStreamKey(),
redisStreamConfig.getConsumerGroupName());

// StreamMessageListenerContainer 설정
this.listenerContainer = this.redisOperator.createStreamMessageListenerContainer();

//Subscription 설정
this.subscription = this.listenerContainer.receive(
Consumer.from(
redisStreamConfig.getConsumerGroupName(),
redisStreamConfig.getConsumerName()
),
StreamOffset.create(
redisStreamConfig.getStreamKey(),
ReadOffset.lastConsumed()
),
this
);

// redis stream 구독 생성까지 Blocking 된다. 이때의 timeout 2초다. 만약 2초보다 빠르게 구독이 생성되면 바로 다음으로 넘어간다.
this.subscription.await(Duration.ofSeconds(2));

// redis listen 시작
this.listenerContainer.start();
}

@PreDestroy
public void destroy() {
if (this.subscription != null) {
this.subscription.cancel();
}
if (this.listenerContainer != null) {
this.listenerContainer.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import com.wootecam.luckyvickyauction.core.member.controller.Login;
import com.wootecam.luckyvickyauction.core.member.dto.SignInInfo;
import com.wootecam.luckyvickyauction.core.payment.service.PaymentService;
import com.wootecam.luckyvickyauction.global.dto.AuctionPurchaseRequestMessage;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
Expand Down Expand Up @@ -53,7 +55,15 @@ public ResponseEntity<Void> submitAuction(@Login SignInInfo signInInfo,
@CurrentTime LocalDateTime now,
@PathVariable(name = "auctionId") Long auctionId,
@RequestBody PurchaseRequest purchaseRequest) {
auctioneer.process(signInInfo, purchaseRequest.price(), auctionId, purchaseRequest.quantity(), now);
var message = new AuctionPurchaseRequestMessage(
UUID.randomUUID().toString(),
signInInfo.id(),
purchaseRequest.price(),
auctionId,
purchaseRequest.quantity(),
now
);
auctioneer.process(message);
return ResponseEntity.ok().build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.wootecam.luckyvickyauction.core.auction.service;

import com.wootecam.luckyvickyauction.core.member.dto.SignInInfo;
import java.time.LocalDateTime;
import com.wootecam.luckyvickyauction.global.dto.AuctionPurchaseRequestMessage;
import com.wootecam.luckyvickyauction.global.dto.AuctionRefundRequestMessage;

/**
* 경매 입찰 로직 분리
* <a href="{https://github.com/woowa-techcamp-2024/Team7-ELEVEN/issues/246}">#246</a>
*/
public interface Auctioneer {

void process(SignInInfo buyerInfo, long price, long auctionId, long quantity, LocalDateTime requestTime);
void process(AuctionPurchaseRequestMessage message);

void refund(AuctionRefundRequestMessage message);

}
Loading

0 comments on commit 5474177

Please sign in to comment.