Skip to content

Commit

Permalink
fix: 벌크 연산을 이용하지 못 하는 점을 해결합니다. (#133)
Browse files Browse the repository at this point in the history
- bulkSize까지 최대한 채우다가, timeout이 지난다면 일시적으로 bulk insert를 진행하도록 수정했습니다.
  • Loading branch information
miiiinju1 authored Aug 27, 2024
1 parent 06cc529 commit 52f471d
Showing 1 changed file with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -93,30 +92,27 @@ public void submitLogs(List<Log> logs) {
* @param saveFunction 로그 저장 함수
*/
private void leaderTask(Consumer<List<Log>> saveFunction) {
long lastFlushTime = System.currentTimeMillis();

while (!Thread.currentThread().isInterrupted()) {
try {
final Log log = logQueue.poll(defaultTimeout, TimeUnit.MILLISECONDS);
/*
* Log가 천천히 들어오는 경우 Timeout에 한 번씩 저장
*
* Log가 높은 부하로 들어오는 경우 Bulk Size만큼 한 번에 저장
*
* Timeout동안 들어온 Log가 없는 경우 다음 반복문 cycle 수행
*/
if (log == null) {
continue;
}
List<Log> logs = new ArrayList<>();
logs.add(log);
//drainTo는 Queue에 있는 요소를 maxElements만큼 꺼내서 Collection에 담아준다.
logQueue.drainTo(logs, defaultBulkSize - 1);
// Follower Thread Pool에 저장 요청
followerExecutor.execute(() -> saveFunction.accept(logs));
int size = logQueue.size();

// 현재 시간 확인
long currentTime = System.currentTimeMillis();

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Leader thread was interrupted. Exiting.", e);
break;
// 2초가 지났거나, 큐의 크기가 defaultBulkSize 이상인 경우 flush
if ((currentTime - lastFlushTime > defaultTimeout) || size >= defaultBulkSize) {
List<Log> logs = new ArrayList<>();

// 로그를 최대 defaultBulkSize 개수만큼 큐에서 가져옴
logQueue.drainTo(logs, defaultBulkSize);

if (!logs.isEmpty()) {
followerExecutor.execute(() -> saveFunction.accept(logs));
}
lastFlushTime = currentTime;
}
} catch (Exception e) {
log.error("Unexpected error in leader thread", e);
}
Expand Down

0 comments on commit 52f471d

Please sign in to comment.