Skip to content

Commit

Permalink
Feat/#134 LogQueue의 부하를 감소하도록 Producer Consumer 구조로 최적화 (#135)
Browse files Browse the repository at this point in the history
* feat: LogQueue의 동기화 메커니즘 최적화

- consume() 메서드에 대해 consumeLock 객체를 사용한 동기화 구현
- produce() 메서드의 동기화 제거로 불필요한 락킹 방지
- consume() 메서드의 대기 로직을 do-while 루프로 변경하여 spurious wakeup 방지
- produce() 메서드에서 consumeLock.notify() 호출로 정확한 스레드 깨우기
- 전반적인 동시성 처리 개선으로 생산자와 소비자 간 효율성 향상

* fix: process 시 락 범위 감소하도록 변경

* fix: Primary를 LogQueue로 이동

* refactor: LogQueue 동작 로직 수정

- LockSupports를 활용한 Thread lock방식을 적용했습니다.
- `volatile Thread consumerThread`에 대한 VarHandle을 적용했습니다.

* refactor: AsyncLogProcessor

- Producer, Consumer 방식을 적용해 리팩토링을 진행했습니다.
- 이에 따라 사용하지 않는 테스트를 Deprecated 처리했습니다.

* docs: Java-Doc 추가

- LogQueue에 대한 Java-Doc을 추가했습니다.

* chore: Java-Doc및 사용하지 않는 메서드 Deprecated 처리

- 사용하지 않는 메서드에 대한 Deprecated 처리를 진행했습니다.
- AsyncLogProcessor에 대한 JavaDoc를 작성했습니다.

* chore: 패키지 이동 및 클래스 이름 변경

- `Consumer` -> `EventConsumer`, `Producer` -> `EventProducer` 이름 변경을 진행했습니다.
- 패키지 위치를 변경했습니다.

* remove: 사용하지 않는 테스트 제거

- Deprecated 된 테스트를 제거했습니다.

* refactor: VarHandle 제거

- volatile 키워드 로 충분히 가시성이 보장

* fix: ReentrantLogQueue에 대해 producer와 consumer간의 경쟁 제거

* fix: LogQueue process 시 RentrantLogQueue를 사용하도록 변경

- LogQueue 비활성화

---------

Co-authored-by: KyungMin Lee <[email protected]>
Co-authored-by: luizy <[email protected]>
  • Loading branch information
3 people authored Aug 29, 2024
1 parent 52f471d commit b11d0b3
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 154 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package info.logbat.common.event;

import java.util.List;

public interface EventConsumer<T> {

List<T> consume();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package info.logbat.common.event;

import java.util.List;

public interface EventProducer<T> {

void produce(List<T> data);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package info.logbat.domain.log.application;

import info.logbat.domain.log.domain.Log;
import info.logbat.domain.log.flatter.LogRequestFlatter;
import info.logbat.domain.log.presentation.payload.request.CreateLogRequest;
import info.logbat.domain.log.repository.LogRepository;
import info.logbat.domain.project.application.AppService;

import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
Expand All @@ -16,7 +15,7 @@
@RequiredArgsConstructor
public class LogService {

private final LogRepository logRepository;
private final LogRequestFlatter logRequestFlatter;
private final AppService appService;

public void saveLogs(String appKey, List<CreateLogRequest> requests) {
Expand All @@ -25,12 +24,11 @@ public void saveLogs(String appKey, List<CreateLogRequest> requests) {
requests.forEach(request -> {
try {
logs.add(request.toEntity(appId));
}
catch (Exception e) {
} catch (Exception e) {
log.error("Failed to convert request to entity: {}", request, e);
}
});
logRepository.saveAll(logs);
logRequestFlatter.flatten(logs);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package info.logbat.domain.log.flatter;

import info.logbat.common.event.EventProducer;
import info.logbat.domain.log.domain.Log;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class LogRequestFlatter {

private final EventProducer<Log> eventProducer;
private final ExecutorService executor = Executors.newFixedThreadPool(1);

public void flatten(List<Log> logs) {
executor.submit(() -> eventProducer.produce(logs));
}
}
104 changes: 104 additions & 0 deletions logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package info.logbat.domain.log.queue;

import info.logbat.common.event.EventConsumer;
import info.logbat.common.event.EventProducer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* 로깅 데이터 전달 목적의 Thread-Safe Queue 구현체로, 생산과 소비 작업을 모두 지원합니다. 이 클래스는 단일 스레드 환경에서 작동하도록 설계되었으며, 효율적인
* 대량 작업을 허용합니다.
* <p>
* 이 큐는 생산자가 데이터를 추가하고, 소비자가 데이터를 꺼내는 방식으로 동작합니다. 소비자는 큐에 충분한 데이터가 쌓일 때까지 대기하다가, 큐에 데이터가 충분히 쌓이면
* 일괄적으로 데이터를 꺼내서 처리합니다. 이 때, 일괄 처리 크기는 생성자를 통해 지정할 수 있습니다. 만약 큐에 충분한 데이터가 쌓이지 않은 상태에서 소비자가 데이터를 꺼내려
* 할 때, 소비자 스레드는 일정 시간 동안 대기하다가 큐에 데이터가 추가되면 깨어나서 데이터를 꺼냅니다. 이 때 대기 시간은 생성자를 통해 지정할 수 있습니다.
* </p>
*
* @param <T> 이 큐에 저장되는 요소의 타입
*/
@Component
public class LogQueue<T> implements EventProducer<T>, EventConsumer<T> {

// T 타입의 요소를 저장하는 list
private final LinkedList<T> queue;
// 소비자 스레드가 대기하는 시간 (나노초 단위)
private final long timeoutNanos;
// 일괄 처리 크기
private final int bulkSize;
// 소비자 스레드, volatile로 선언하여 가시성 보장
private volatile Thread consumerThread;


/**
* 지정된 타임아웃과 벌크 크기로 새 LogQueue를 생성합니다.
*
* @param timeoutMillis 큐가 비어있을 때 소비자가 대기하는 시간(밀리초)
* @param bulkSize 단일 작업에서 소비될 수 있는 최대 요소 수
*/
public LogQueue(@Value("${jdbc.async.timeout}") Long timeoutMillis,
@Value("${jdbc.async.bulk-size}") Integer bulkSize) {
this.queue = new LinkedList<>();
this.timeoutNanos = timeoutMillis * 1_000_000; // Convert to nanoseconds
this.bulkSize = bulkSize;
}

/**
* 큐에서 요소를 일괄적으로 꺼내서 반환합니다. 큐에 충분한 요소가 쌓이지 않은 경우, 소비자 스레드는 일정 시간 동안 대기합니다.
* <p>
* 이 메서드는 단일 스레드 환경에서만 호출해야 합니다. 만약 다중 스레드 환경에서 호출하면 예상치 못한 결과가 발생할 수 있습니다. 이 메서드는 큐에 충분한 요소가 쌓일
* 때까지 대기하다가, 큐에 요소가 쌓이면 일괄적으로 요소를 꺼내서 반환합니다. 만약 큐에 충분한 요소가 쌓이지 않은 경우, 일정 시간 동안 대기하다가 큐에 요소가
* 추가되면 깨어나서 요소를 꺼냅니다. 이 때 대기 시간은 생성자를 통해 지정할 수 있습니다. 이 메서드는 큐에 쌓인 요소를 꺼내서 반환하는 것이 목적이므로, 큐에 요소를
* 추가하는 작업은 {@link #produce(List)} 메서드를 사용해야 합니다.
* </p>
*
* @return 큐에서 꺼낸 요소의 리스트 (최대 {@link #bulkSize}개)
*/
@Override
public List<T> consume() {
List<T> result = new ArrayList<>(bulkSize);

if (queue.size() >= bulkSize) {
for (int i = 0; i < bulkSize; i++) {
result.add(queue.poll());
}
return result;
}

consumerThread = Thread.currentThread();

do {
LockSupport.parkNanos(timeoutNanos);
} while (queue.isEmpty());

for (int i = 0; i < bulkSize; i++) {
result.add(queue.poll());
if (queue.isEmpty()) {
break;
}
}

consumerThread = null;
return result;
}

/**
* 큐에 요소를 추가합니다. 큐에 요소가 추가되면, 소비자 스레드를 깨워서 요소를 꺼내도록 합니다.
* <p>
* 이 메서드는 단일 스레드 환경에서만 호출해야 합니다. 만약 다중 스레드 환경에서 호출하면 예상치 못한 결과가 발생할 수 있습니다. 이 메서드는 큐에 요소를 추가하는
* 것이 목적이므로, 큐에서 요소를 꺼내는 작업은 {@link #consume()} 메서드를 사용해야 합니다.
* </p>
*
* @param data 큐에 추가할 요소의 리스트
*/
@Override
public void produce(List<T> data) {
queue.addAll(data);
if (consumerThread != null && queue.size() >= bulkSize) {
LockSupport.unpark(consumerThread);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package info.logbat.domain.log.queue;

import info.logbat.common.event.EventConsumer;
import info.logbat.common.event.EventProducer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

@Primary
@Component
public class ReentrantLogQueue<T> implements EventProducer<T>, EventConsumer<T> {

private final LinkedList<T> queue = new LinkedList<>();
private final long timeout;
private final int bulkSize;
private final ReentrantLock bulkLock = new ReentrantLock();
private final Condition bulkCondition = bulkLock.newCondition();

public ReentrantLogQueue(@Value("${jdbc.async.timeout}") Long timeout,
@Value("${jdbc.async.bulk-size}") Integer bulkSize) {
this.timeout = timeout;
this.bulkSize = bulkSize;
}

/*
* Consumer should be one thread
*/
@Override
public List<T> consume() {
List<T> result = new ArrayList<>();

try {
bulkLock.lockInterruptibly();
// Case1: Full Flush
if (queue.size() >= bulkSize) {
for (int i = 0; i < bulkSize; i++) {
result.add(queue.poll());
}
return result;
}
// Else Case: Blocking
// Blocked while Queue is Not Empty
do {
bulkCondition.await(timeout, TimeUnit.MILLISECONDS);
} while (queue.isEmpty());

// Bulk Size 만큼 꺼내서 반환
for (int i = 0; i < bulkSize; i++) {
result.add(queue.poll());
if (queue.isEmpty()) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
bulkLock.unlock();
}
return result;
}

/*
* Producer should be one thread
*/
@Override
public void produce(List<T> data) {
bulkLock.lock();
try {
queue.addAll(data);
if (queue.size() >= bulkSize) {
bulkCondition.signal();
}
} finally {
bulkLock.unlock();
}
}

}
Loading

0 comments on commit b11d0b3

Please sign in to comment.