From 52f471d7565d3f6d93701eb462bdc0d19567347f Mon Sep 17 00:00:00 2001 From: Kim Minju <111269144+miiiinju1@users.noreply.github.com> Date: Tue, 27 Aug 2024 19:52:40 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20=EB=B2=8C=ED=81=AC=20=EC=97=B0=EC=82=B0?= =?UTF-8?q?=EC=9D=84=20=EC=9D=B4=EC=9A=A9=ED=95=98=EC=A7=80=20=EB=AA=BB=20?= =?UTF-8?q?=ED=95=98=EB=8A=94=20=EC=A0=90=EC=9D=84=20=ED=95=B4=EA=B2=B0?= =?UTF-8?q?=ED=95=A9=EB=8B=88=EB=8B=A4.=20(#133)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - bulkSize까지 최대한 채우다가, timeout이 지난다면 일시적으로 bulk insert를 진행하도록 수정했습니다. --- .../log/repository/AsyncLogProcessor.java | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java index f8f984f..11818fc 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java @@ -9,7 +9,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; @@ -93,30 +92,27 @@ public void submitLogs(List logs) { * @param saveFunction 로그 저장 함수 */ private void leaderTask(Consumer> saveFunction) { + long lastFlushTime = System.currentTimeMillis(); + while (!Thread.currentThread().isInterrupted()) { try { - final Log log = logQueue.poll(defaultTimeout, TimeUnit.MILLISECONDS); - /* - * Log가 천천히 들어오는 경우 Timeout에 한 번씩 저장 - * - * Log가 높은 부하로 들어오는 경우 Bulk Size만큼 한 번에 저장 - * - * Timeout동안 들어온 Log가 없는 경우 다음 반복문 cycle 수행 - */ - if (log == null) { - continue; - } - List logs = new ArrayList<>(); - logs.add(log); - //drainTo는 Queue에 있는 요소를 maxElements만큼 꺼내서 Collection에 담아준다. - logQueue.drainTo(logs, defaultBulkSize - 1); - // Follower Thread Pool에 저장 요청 - followerExecutor.execute(() -> saveFunction.accept(logs)); + int size = logQueue.size(); + + // 현재 시간 확인 + long currentTime = System.currentTimeMillis(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Leader thread was interrupted. Exiting.", e); - break; + // 2초가 지났거나, 큐의 크기가 defaultBulkSize 이상인 경우 flush + if ((currentTime - lastFlushTime > defaultTimeout) || size >= defaultBulkSize) { + List logs = new ArrayList<>(); + + // 로그를 최대 defaultBulkSize 개수만큼 큐에서 가져옴 + logQueue.drainTo(logs, defaultBulkSize); + + if (!logs.isEmpty()) { + followerExecutor.execute(() -> saveFunction.accept(logs)); + } + lastFlushTime = currentTime; + } } catch (Exception e) { log.error("Unexpected error in leader thread", e); }