diff --git a/logbat/src/main/java/info/logbat/domain/log/application/LogService.java b/logbat/src/main/java/info/logbat/domain/log/application/LogService.java index 684b7a8..7438fc6 100644 --- a/logbat/src/main/java/info/logbat/domain/log/application/LogService.java +++ b/logbat/src/main/java/info/logbat/domain/log/application/LogService.java @@ -1,7 +1,7 @@ package info.logbat.domain.log.application; +import info.logbat.common.event.EventProducer; import info.logbat.domain.log.domain.Log; -import info.logbat.domain.log.flatter.LogRequestFlatter; import info.logbat.domain.log.presentation.payload.request.CreateLogRequest; import info.logbat.domain.project.application.AppService; import java.util.ArrayList; @@ -15,7 +15,7 @@ @RequiredArgsConstructor public class LogService { - private final LogRequestFlatter logRequestFlatter; + private final EventProducer producer; private final AppService appService; public void saveLogs(String appKey, List requests) { @@ -28,7 +28,7 @@ public void saveLogs(String appKey, List requests) { log.error("Failed to convert request to entity: {}", request, e); } }); - logRequestFlatter.flatten(logs); + producer.produce(logs); } } diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java b/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java index 64c69e2..d63a7bf 100644 --- a/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java +++ b/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java @@ -9,10 +9,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; -@Primary +//@Primary @Component public class ReentrantLogQueue implements EventProducer, EventConsumer { diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java index 9b7e330..9df8714 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java @@ -11,13 +11,11 @@ import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Component; /** * 비동기적으로 로그를 처리하는 클래스입니다. 이 클래스는 로그를 저장하는 비동기 작업을 수행하며, 이를 위해 별도의 스레드 풀을 사용합니다. */ @Slf4j -@Component public class AsyncLogProcessor { // 로그 저장 작업을 수행하는 스레드 풀 diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java index 9e4c5ee..44a8967 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java @@ -20,25 +20,25 @@ public class AsyncLogRepository implements LogRepository { private final JdbcTemplate jdbcTemplate; - private final AsyncLogProcessor asyncLogProcessor; + private final AsyncMultiProcessor asyncMultiProcessor; private static final Long DEFAULT_RETURNS = 0L; @PostConstruct public void init() { log.info("AsyncLogRepository is initialized."); - asyncLogProcessor.init(this::saveLogsToDatabase); + asyncMultiProcessor.init(this::saveLogsToDatabase); } + @Deprecated @Override public long save(Log log) { - asyncLogProcessor.submitLog(log); return DEFAULT_RETURNS; } + @Deprecated @Override public List saveAll(List logs) { - asyncLogProcessor.submitLogs(logs); return logs; } diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java new file mode 100644 index 0000000..6ab3adc --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java @@ -0,0 +1,80 @@ +package info.logbat.domain.log.repository; + +import com.zaxxer.hikari.HikariDataSource; +import info.logbat.common.event.EventProducer; +import info.logbat.domain.log.queue.ReentrantLogQueue; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import javax.sql.DataSource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Primary +@Component +public class AsyncMultiProcessor implements EventProducer { + + private final List> queues; + private final List flatterExecutors; + private Consumer> saveFunction; + private final int queueCount; + + public AsyncMultiProcessor(@Value("${queue.count:3}") int queueCount, + @Value("${jdbc.async.timeout:5000}") Long timeout, + @Value("${jdbc.async.bulk-size:3000}") Integer bulkSize, JdbcTemplate jdbcTemplate) { + this.queueCount = queueCount; + this.queues = new ArrayList<>(queueCount); + this.flatterExecutors = new ArrayList<>(queueCount); + int poolSize = getPoolSize(jdbcTemplate); + setup(queueCount, timeout, bulkSize, poolSize); + } + + public void init(Consumer> saveFunction) { + this.saveFunction = saveFunction; + } + + @Override + public void produce(List data) { + if (data.isEmpty()) { + return; + } + int selectedQueue = ThreadLocalRandom.current().nextInt(queueCount); + flatterExecutors.get(selectedQueue).execute(() -> queues.get(selectedQueue).produce(data)); + } + + private void setup(int queueCount, Long timeout, Integer bulkSize, int poolSize) { + ExecutorService followerExecutor = Executors.newFixedThreadPool(poolSize); + ReentrantLogQueue queue = new ReentrantLogQueue<>(timeout, bulkSize); + + for (int i = 0; i < queueCount; i++) { + queues.add(queue); + flatterExecutors.add(Executors.newSingleThreadExecutor()); + } + CompletableFuture.runAsync(() -> leaderTask(queue, followerExecutor)); + } + + private void leaderTask(ReentrantLogQueue queue, ExecutorService follower) { + while (!Thread.currentThread().isInterrupted()) { + List element = queue.consume(); + follower.execute(() -> saveFunction.accept(element)); + } + } + + private static int getPoolSize(JdbcTemplate jdbcTemplate) { + DataSource dataSource = jdbcTemplate.getDataSource(); + if (!(dataSource instanceof HikariDataSource)) { + throw new IllegalArgumentException("DataSource is null"); + } + int poolSize = ((HikariDataSource) dataSource).getMaximumPoolSize(); + log.debug("Creating AsyncLogProcessor with pool size: {}", poolSize); + return poolSize * 5 / 10; + } +}