Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Redis Stream 연동을 위한 기반 코드 작성 #294

Merged
merged 16 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.wootecam.luckyvickyauction.consumer.config;

import java.util.UUID;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;

@Configuration
public class RedisStreamConfig {

@Value("${spring.data.redis.host}")
private String redisHost;

@Value("${spring.data.redis.port}")
private String redisPort;

@Value("${spring.data.redis.password}")
private String redisPassword;

@Getter
@Value("${stream.key}")
private String streamKey;
@Getter
@Value("${stream.consumer.groupName}")
private String consumerGroupName;
@Getter
private String consumerName = UUID.randomUUID().toString();

// todo [추후 서버를 분리하면 모듈이 분리될 상황을 가정한 빈입니다.] [2024-08-23] [yudonggeun]
// @Bean
public RedisConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(redisHost);
redisStandaloneConfiguration.setPort(Integer.parseInt(redisPort));
redisStandaloneConfiguration.setPassword(redisPassword);

return new LettuceConnectionFactory(redisStandaloneConfiguration);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.wootecam.luckyvickyauction.consumer.presentation;

import com.wootecam.luckyvickyauction.consumer.config.RedisStreamConfig;
import java.time.Duration;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Slf4j
@EnableScheduling
@Component
@RequiredArgsConstructor
public class PendingMessageConsumer {

private final RedisOperator redisOperator;
private final RedisStreamConfig redisStreamConfig;

@Scheduled(fixedRate = 1000)
public void consumePendingMessage() {

// 처리되지 않은 메시지 조회
PendingMessages pendingMessageInfos = redisOperator.getPendingMessage(
redisStreamConfig.getStreamKey(),
redisStreamConfig.getConsumerGroupName(),
redisStreamConfig.getConsumerName()
);

RecordId[] recordIds = pendingMessageInfos.stream().map(
PendingMessage::getId
).toArray(RecordId[]::new);

// 처리되지 않은 메시지 데이터 조회
List<MapRecord<String, Object, Object>> messages = redisOperator.claim(
redisStreamConfig.getStreamKey(),
redisStreamConfig.getConsumerGroupName(),
redisStreamConfig.getConsumerName(),
Duration.ofMinutes(1),
recordIds
);

// 메시지 처리
messages.forEach(message -> {
log.info("MessageId: {}", message.getId());
log.info("Stream: {}", message.getStream());
log.info("Body: {}", message.getValue());
redisOperator.acknowledge(redisStreamConfig.getConsumerGroupName(), message);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.wootecam.luckyvickyauction.consumer.presentation;

import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandKeyword;
import io.lettuce.core.protocol.CommandType;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class RedisOperator {

private static final Logger log = LoggerFactory.getLogger(RedisOperator.class);
private final RedisConnectionFactory redisConnectionFactory;
private final StringRedisTemplate redisTemplate;


public boolean isStreamConsumerGroupExist(String streamKey, String consumerGroupName) {
Iterator<StreamInfo.XInfoGroup> iterator = this.redisTemplate
.opsForStream().groups(streamKey).stream().iterator();

while (iterator.hasNext()) {
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
if (xInfoGroup.groupName().equals(consumerGroupName)) {
return true;
}
}
return false;
}

public void createStreamConsumerGroup(String streamKey, String consumerGroupName) {
// if stream is not exist, create stream and consumer group of it
if (Boolean.FALSE.equals(this.redisTemplate.hasKey(streamKey))) {
RedisAsyncCommands commands = (RedisAsyncCommands) redisConnectionFactory
.getConnection()
.getNativeConnection();

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)
.add(CommandKeyword.CREATE)
.add(streamKey)
.add(consumerGroupName)
.add("0")
.add("MKSTREAM");

commands.dispatch(CommandType.XGROUP, new StatusOutput(StringCodec.UTF8), args);
}
// stream is exist, create consumerGroup if is not exist
else {
if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
this.redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}
}

public StreamMessageListenerContainer createStreamMessageListenerContainer() {
return StreamMessageListenerContainer.create(redisConnectionFactory,
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions.builder()
.hashKeySerializer(new StringRedisSerializer())
.hashValueSerializer(new StringRedisSerializer())
.pollTimeout(Duration.ofMillis(20))
.build()
);
}

public void acknowledge(String consumerGroup, MapRecord<String, Object, Object> message) {
Long ack = this.redisTemplate.opsForStream().acknowledge(consumerGroup, message);
if (ack == 0) {
log.error("Acknowledge failed. MessageId: {}", message.getId());
} else {
log.info("Acknowledge success. MessageId: {}", message.getId());
}
}

public PendingMessages getPendingMessage(String streamKey, String consumerGroup, String consumerName) {
return this.redisTemplate.opsForStream()
.pending(streamKey,
Consumer.from(consumerGroup, consumerName),
Range.unbounded(),
100L
);
}

public List<MapRecord<String, Object, Object>> claim(String streamKey,
String consumerGroup, String consumerName,
Duration minIdleTime, RecordId... messageIds) {
if (messageIds.length < 1) {
return List.of();
}

return this.redisTemplate.opsForStream()
.claim(streamKey, consumerGroup, consumerName, minIdleTime, messageIds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.wootecam.luckyvickyauction.consumer.presentation;

import com.wootecam.luckyvickyauction.consumer.config.RedisStreamConfig;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisStreamConsumer implements StreamListener<String, MapRecord<String, Object, Object>> {

private final RedisOperator redisOperator;
private final RedisStreamConfig redisStreamConfig;
private StreamMessageListenerContainer<String, MapRecord<String, Object, Object>> listenerContainer;
private Subscription subscription;

@Override
public void onMessage(MapRecord<String, Object, Object> message) {
log.info("MessageId: {}", message.getId());
log.info("Stream: {}", message.getStream());
log.info("Body: {}", message.getValue());
redisOperator.acknowledge(redisStreamConfig.getConsumerGroupName(), message);
}
Comment on lines +28 to +34
Copy link
Collaborator

@chhs2131 chhs2131 Aug 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

해당 메서드에서 RedisStream을 통해 전달된 메세지를 확인하고,
메시지에 맞는 로직을 호출한 뒤 정상처리되면 redisOperator.ack를 보내는 식으로 동작하는 것인가요?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

정확합니다!


@PostConstruct
public void init() throws InterruptedException {
// Consumer Group 설정
this.redisOperator.createStreamConsumerGroup(
redisStreamConfig.getStreamKey(),
redisStreamConfig.getConsumerGroupName());

// StreamMessageListenerContainer 설정
this.listenerContainer = this.redisOperator.createStreamMessageListenerContainer();

//Subscription 설정
this.subscription = this.listenerContainer.receive(
Consumer.from(
redisStreamConfig.getConsumerGroupName(),
redisStreamConfig.getConsumerName()
),
StreamOffset.create(
redisStreamConfig.getStreamKey(),
ReadOffset.lastConsumed()
),
this
);

// redis stream 구독 생성까지 Blocking 된다. 이때의 timeout 2초다. 만약 2초보다 빠르게 구독이 생성되면 바로 다음으로 넘어간다.
this.subscription.await(Duration.ofSeconds(2));

// redis listen 시작
this.listenerContainer.start();
}

@PreDestroy
public void destroy() {
if (this.subscription != null) {
this.subscription.cancel();
}
if (this.listenerContainer != null) {
this.listenerContainer.stop();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.wootecam.luckyvickyauction.core.auction.service.auctioneer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wootecam.luckyvickyauction.core.auction.service.Auctioneer;
import com.wootecam.luckyvickyauction.core.member.dto.SignInInfo;
import com.wootecam.luckyvickyauction.global.dto.AuctionPurchaseRequestMessage;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.StringRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class LazyAuctioneer implements Auctioneer {

private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
@Value("${stream.key}")
private String streamKey;

@Override
public void process(SignInInfo buyerInfo, long price, long auctionId, long quantity, LocalDateTime requestTime) {

String messageType = "purchase";

var message = new AuctionPurchaseRequestMessage(
UUID.randomUUID().toString(),
buyerInfo.id(),
price,
auctionId,
quantity,
requestTime
);

try {
String stringMessage = objectMapper.writeValueAsString(message);

StringRecord record = StreamRecords
.string(Map.of(messageType, stringMessage))
.withStreamKey(streamKey);
redisTemplate.opsForStream().add(record);

} catch (JsonProcessingException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.wootecam.luckyvickyauction.global.dto;

import java.time.LocalDateTime;

public record AuctionPurchaseRequestMessage(
String requestId,
Long buyerId,
Long auctionId,
Long price,
Long quantity,
LocalDateTime requestTime
) {
Comment on lines +5 to +12
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

스트림으로 보낼 구매 요청 메세지를 이곳에 담는군요..! requestId는 UUID! 좋습니다!

}
7 changes: 6 additions & 1 deletion src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ spring:
port: 6379
password: 1q2w3e
host: localhost

management:
server:
port: 8081
Expand All @@ -41,3 +41,8 @@ logging:
org:
hibernate:
SQL: debug

stream:
key: LVA-Stream
consumer:
groupName: LVA-Group
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;

// 중간에서 맵핑을 담당하는 CoreRepository 의 경우 자동 생성 대상이 아니므로 직접 명시해 줄 것
@Import({JpaConfig.class, MemberCoreRepository.class, AuctionCoreRepository.class, ReceiptCoreRepository.class})
@DataJpaTest
@ActiveProfiles("test")
public abstract class RepositoryTest {

@Autowired
Expand Down
Loading