Skip to content

Commit

Permalink
Feat/#137 MultiQueue를 이용하여 Controller에서 오는 로그 요청의 부하 감소 (#140)
Browse files Browse the repository at this point in the history
* chore: 임시로 ReentrantLogQueue에서 Primary 제거

* feat: MultiProcessor 추가

- 임시로 여러 큐를 가지는 MultiProcessor를 추가함

* chore: default queue개수 조정

* feat: flatterQueue 싱글 스레드풀 추가 및 queue count default를 3 설정

* chore: default queue사이즈 설정

* refactor: AsyncMultiProcessor 리팩토링

- 내부 로직 일원화 및 린팅을 적용했습니다.
- MultiProcessor -> AsyncMultiProcessor 클래스 명 변경을 진행했습니다.

* chore: 불필요한 메서드 구현 제거

- EventConsume<T> 구현을 제거했습니다.

* chore: 의존성 정리, 사용하지 않는 Component 제거

---------

Co-authored-by: KyungMin Lee <[email protected]>
  • Loading branch information
2 people authored and LuizyHub committed Aug 29, 2024
1 parent 6c8a22a commit eebd78b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package info.logbat.domain.log.application;

import info.logbat.common.event.EventProducer;
import info.logbat.domain.log.domain.Log;
import info.logbat.domain.log.flatter.LogRequestFlatter;
import info.logbat.domain.log.presentation.payload.request.CreateLogRequest;
import info.logbat.domain.project.application.AppService;
import java.util.ArrayList;
Expand All @@ -15,7 +15,7 @@
@RequiredArgsConstructor
public class LogService {

private final LogRequestFlatter logRequestFlatter;
private final EventProducer<Log> producer;
private final AppService appService;

public void saveLogs(String appKey, List<CreateLogRequest> requests) {
Expand All @@ -28,7 +28,7 @@ public void saveLogs(String appKey, List<CreateLogRequest> requests) {
log.error("Failed to convert request to entity: {}", request, e);
}
});
logRequestFlatter.flatten(logs);
producer.produce(logs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

@Primary
//@Primary
@Component
public class ReentrantLogQueue<T> implements EventProducer<T>, EventConsumer<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

/**
* 비동기적으로 로그를 처리하는 클래스입니다. 이 클래스는 로그를 저장하는 비동기 작업을 수행하며, 이를 위해 별도의 스레드 풀을 사용합니다.
*/
@Slf4j
@Component
public class AsyncLogProcessor {

// 로그 저장 작업을 수행하는 스레드 풀
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@
public class AsyncLogRepository implements LogRepository {

private final JdbcTemplate jdbcTemplate;
private final AsyncLogProcessor asyncLogProcessor;
private final AsyncMultiProcessor<Log> asyncMultiProcessor;

private static final Long DEFAULT_RETURNS = 0L;

@PostConstruct
public void init() {
log.info("AsyncLogRepository is initialized.");
asyncLogProcessor.init(this::saveLogsToDatabase);
asyncMultiProcessor.init(this::saveLogsToDatabase);
}

@Deprecated
@Override
public long save(Log log) {
asyncLogProcessor.submitLog(log);
return DEFAULT_RETURNS;
}

@Deprecated
@Override
public List<Log> saveAll(List<Log> logs) {
asyncLogProcessor.submitLogs(logs);
return logs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package info.logbat.domain.log.repository;

import com.zaxxer.hikari.HikariDataSource;
import info.logbat.common.event.EventProducer;
import info.logbat.domain.log.queue.ReentrantLogQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Primary
@Component
public class AsyncMultiProcessor<E> implements EventProducer<E> {

private final List<ReentrantLogQueue<E>> queues;
private final List<ExecutorService> flatterExecutors;
private Consumer<List<E>> saveFunction;
private final int queueCount;

public AsyncMultiProcessor(@Value("${queue.count:3}") int queueCount,
@Value("${jdbc.async.timeout:5000}") Long timeout,
@Value("${jdbc.async.bulk-size:3000}") Integer bulkSize, JdbcTemplate jdbcTemplate) {
this.queueCount = queueCount;
this.queues = new ArrayList<>(queueCount);
this.flatterExecutors = new ArrayList<>(queueCount);
int poolSize = getPoolSize(jdbcTemplate);
setup(queueCount, timeout, bulkSize, poolSize);
}

public void init(Consumer<List<E>> saveFunction) {
this.saveFunction = saveFunction;
}

@Override
public void produce(List<E> data) {
if (data.isEmpty()) {
return;
}
int selectedQueue = ThreadLocalRandom.current().nextInt(queueCount);
flatterExecutors.get(selectedQueue).execute(() -> queues.get(selectedQueue).produce(data));
}

private void setup(int queueCount, Long timeout, Integer bulkSize, int poolSize) {
ExecutorService followerExecutor = Executors.newFixedThreadPool(poolSize);
ReentrantLogQueue<E> queue = new ReentrantLogQueue<>(timeout, bulkSize);

for (int i = 0; i < queueCount; i++) {
queues.add(queue);
flatterExecutors.add(Executors.newSingleThreadExecutor());
}
CompletableFuture.runAsync(() -> leaderTask(queue, followerExecutor));
}

private void leaderTask(ReentrantLogQueue<E> queue, ExecutorService follower) {
while (!Thread.currentThread().isInterrupted()) {
List<E> element = queue.consume();
follower.execute(() -> saveFunction.accept(element));
}
}

private static int getPoolSize(JdbcTemplate jdbcTemplate) {
DataSource dataSource = jdbcTemplate.getDataSource();
if (!(dataSource instanceof HikariDataSource)) {
throw new IllegalArgumentException("DataSource is null");
}
int poolSize = ((HikariDataSource) dataSource).getMaximumPoolSize();
log.debug("Creating AsyncLogProcessor with pool size: {}", poolSize);
return poolSize * 5 / 10;
}
}

0 comments on commit eebd78b

Please sign in to comment.