Ring Buffer

1. Ring Buffer

Ring Buffer(또는 Circular Buffer)는 고정된 크기의 버퍼를 순환적으로 사용하는 자료구조다. 버퍼의 끝에 도달하면 다시 처음으로 돌아가서 쓰기 때문에 “Ring(고리)” 형태라고 부른다.

고정 크기 배열과 읽기/쓰기 인덱스만으로 간단하게 구현할 수 있다. 인덱스가 버퍼 끝에 도달하면 처음으로 되돌아가는 순환 구조다. 대략적인 구조는 아래와 같다.

class NaiveRingBuffer {
    private final Object[] buffer;
    private final int size;
    private int writeIndex = 0;
    private int readIndex = 0;

    public void write(Object item) {
        buffer[writeIndex % size] = item;  // 매번 나눗셈
        writeIndex++;
    }

    public Object read() {
        Object item = buffer[readIndex % size];  // 매번 나눗셈
        readIndex++;
        return item;
    }
}

포인터들의 순환 구조를 구현하기 위한 가장 직관적인 방법은 모듈로 연산(%)을 사용하는 것이다. 하지만, 이 순환 동작을 효율적으로 구현하기 위해 Ring Buffer는 크기를 2의 제곱수로 제한하고 모듈로 연산 대신 비트 연산자를 사용할 수 있다.

class OptimizedRingBuffer {
    private final Object[] buffer;
    private final int mask;  // size - 1
    private long writeIndex = 0;
    private long readIndex = 0;

    public OptimizedRingBuffer(int size) {
        if ((size & (size - 1)) != 0) {
            throw new IllegalArgumentException("Size must be power of 2");
        }
        this.buffer = new Object[size];
        this.mask = size - 1;
    }

    public void write(Object item) {
        buffer[(int)(writeIndex & mask)] = item;  // 비트 연산
        writeIndex++;
    }

    public Object read() {
        Object item = buffer[(int)(readIndex & mask)];
        readIndex++;
        return item;
    }

    public boolean isEmpty() {
        return readIndex == writeIndex;
    }

    public boolean isFull() {
        return writeIndex - readIndex == buffer.length;
    }
}

이는 모듈로 연산의 비용 문제를 해결한다. 첫 번째 구현에서는 write()read()를 호출할 때마다 나눗셈이 발생한다. 현대 CPU에서 정수 나눗셈은 평균 10-20 사이클이 걸리는 반면, 비트 연산은 단일 사이클에 처리된다1. 초당 수백만 건의 메시지를 처리하는 시스템에서는 이 비용이 누적된다.

2. LMAX Disruptor

전통적인 동시성 큐의 근본적인 한계가 있다. ArrayBlockingQueueConcurrentLinkedQueue 같은 Java 표준 큐들은 여러 스레드가 동시에 접근하면 락을 사용하기 때문이다. 한 스레드가 큐에 데이터를 쓰는 동안 다른 스레드들은 락을 기다려야 한다. 락을 획득하고 해제하는 과정에서 CPU 캐시가 무효화되고, 이는 다시 메모리 접근으로 이어져 성능을 크게 떨어뜨린다.

여기에 더해 Java의 전통적인 큐는 요소를 추가할 때마다 객체를 생성하고, 제거할 때마다 GC(Garbage Collection)의 대상이 된다. 초당 수백만 건의 메시지를 처리해야 하는 시스템에서 이런 오버헤드를 무시할 수 없다.

LMAX Disruptor는 Ring Buffer 기반의 고성능 메시지 처리 프레임워크이며 이런 문제를 해결한다.2.

Disruptor의 철학은 Mechanical Sympathy다. CPU 캐시, 메모리 배리어, 커널 모드 전환 같은 하드웨어 특성을 이해하고 그에 맞춰 설계하면 수십 배 빠른 소프트웨어를 만들 수 있다는 것이다.

Preallocation

전통적인 큐는 요소를 추가할 때마다 객체를 생성하고 메모리를 할당한다. Disruptor는 시작 시점에 모든 슬롯을 미리 할당하여 GC 압력을 제거한다.

// 전통적인 큐
queue.offer(new Event(data));  // 매번 객체 생성

// Disruptor
long sequence = ringBuffer.next();
Event event = ringBuffer.get(sequence);  // 재사용
event.setData(data);
ringBuffer.publish(sequence);

Cache Line Padding

CPU는 메모리에서 데이터를 읽을 때 개별 바이트가 아니라 캐시 라인(Cache Line) 단위로 가져온다. 캐시 라인은 보통 64바이트이며, CPU는 이 단위로 데이터를 캐싱하고 다른 코어와 동기화한다.

문제는 논리적으로 독립적인 두 변수가 우연히 같은 캐시 라인에 배치될 때 발생한다. 예를 들어, 스레드 1이 변수 X를 수정하면 그 캐시 라인 전체가 수정된 것으로 표시되고, 같은 캐시 라인에 있는 변수 Y를 사용하는 스레드 2의 캐시는 무효화된다. X와 Y는 실제로 아무 관계가 없는데도, 단지 같은 캐시 라인에 있다는 이유만으로 캐시 무효화가 반복되어 성능이 크게 저하된다. 이것이 False Sharing이다.

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;  // 56 bytes
}

class Value extends LhsPadding {
    protected volatile long value;  // 8 bytes
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;  // 56 bytes
}

// 총 120 bytes = 다른 변수와 캐시 라인을 공유하지 않음

Disruptor는 이를 방지하기 위해 중요한 변수들을 64바이트로 패딩하여 각각 독립적인 캐시 라인에 배치한다. 이 구조는 상속을 통해 value 앞뒤로 56바이트씩 패딩을 배치하여, value가 반드시 독립적인 캐시 라인에 위치하도록 보장한다. 총 120바이트 크기는 최소 2개의 캐시 라인(64바이트 × 2)을 점유하므로, value 전후의 메모리 공간에 다른 객체의 변수가 배치되더라도 value와 같은 캐시 라인을 공유하지 않는다. 결과적으로 다른 스레드가 인접한 메모리를 수정하더라도 value의 캐시는 무효화되지 않아, 멀티 스레드 환경에서 불필요한 캐시 동기화 오버헤드를 제거할 수 있다.

Lock-Free

Disruptor는 락 대신 CAS(Compare-And-Swap) 연산을 사용한다. 락은 커널 모드 전환을 일으키지만3, CAS는 CPU 명령어 하나로 처리된다.

// java.util.concurrent.locks.Lock 대신
private final AtomicLong cursor = new AtomicLong(-1);

public long next() {
    return cursor.incrementAndGet();  // CAS 연산
}

메모리 배리어 (Memory Barrier)

멀티코어 환경에서 각 코어는 자신의 캐시를 가지고 있다. 한 스레드가 변수를 업데이트해도 다른 스레드는 즉시 보지 못할 수 있다. 메모리 배리어(Memory Barrier)는 메모리 연산의 순서를 보장하는 CPU 명령어다.

// Volatile write - Full barrier (StoreLoad + StoreStore)
volatile long sequence;
sequence = newValue;  // 모든 이전 쓰기가 완료된 후 실행, 이후 읽기/쓰기 재정렬 금지

// Volatile read - LoadLoad + LoadStore barrier
long value = sequence;  // 이후 읽기/쓰기가 이전으로 재정렬 안 됨

// Ordered write - StoreStore barrier (Java 9+: VarHandle.setRelease)
UNSAFE.putOrderedLong(this, SEQUENCE_OFFSET, newValue);  // 이전 쓰기만 완료 보장

3. JCTool

JCTools(Java Concurrency Tools)는 Disruptor의 아이디어를 실용적인 큐 구현으로 발전시킨 라이브러리다. JCTools는 사용 패턴에 따라 최적화된 큐를 제공한다.

SPSC의 메모리 배리어 최적화

JCTools의 SPSC Queue는 메모리 배리어를 정교하게 활용한다.

// Producer 쓰기
public boolean offer(E e) {
    long producerIndex = lpProducerIndex();  // plain load (배리어 없음)
    long offset = calcElementOffset(producerIndex);

    soElement(offset, e);  // ordered store (StoreStore)
    soProducerIndex(producerIndex + 1);  // ordered store (StoreStore)
    return true;
}

// Consumer 읽기
public E poll() {
    long consumerIndex = lpConsumerIndex();  // plain load
    long offset = calcElementOffset(consumerIndex);

    E e = lvElement(offset);  // volatile load (LoadLoad)
    if (e == null) {
        return null;
    }

    soConsumerIndex(consumerIndex + 1);  // ordered store
    return e;
}

연산 순서를 보장하는 것. Producer가 데이터를 쓴 후 반드시 인덱스를 증가시켜야 한다. 만약 순서가 바뀌어서 인덱스가 먼저 증가하고 데이터가 나중에 쓰인다면, Consumer는 쓰레기 값을 읽게 된다. ordered store는 이전의 모든 쓰기가 완료된 후에 실행되도록 보장한다.

반대로 Consumer는 데이터를 읽기 전에 반드시 인덱스를 확인해야 한다. volatile load는 이후의 모든 읽기가 최신 값을 보도록 보장한다. 이렇게 최소한의 배리어만 사용하여 불필요한 volatile write(Full barrier)를 피한다.

배리어 타입비용사용 시점
Plain (no barrier)~1 cycle단일 스레드 접근
StoreStore (ordered)~4 cyclesSPSC Producer
LoadLoad~4 cyclesSPSC Consumer
Full (volatile)~20-40 cyclesMPSC Producer
구현처리량지연시간
ArrayBlockingQueue5.5M ops/s~180ns
최적화된 Ring Buffer111M ops/s~9ns

MPSC의 최적화

// jctools-core/org/jctools/queues/MpscArrayQueue.java
public abstract class MpscArrayQueueL1Pad<E> extends ConcurrentCircularArrayQueue<E> {
    long p01, p02, p03, p04, p05, p06, p07;  // Cache line padding
    long p10, p11, p12, p13, p14, p15, p16, p17;
}

public abstract class MpscArrayQueueProducerIndexField<E> extends MpscArrayQueueL1Pad<E> {
    private volatile long producerIndex;  // CAS로 보호
}

public abstract class MpscArrayQueueL2Pad<E> extends MpscArrayQueueProducerIndexField<E> {
    long p01, p02, p03, p04, p05, p06, p07;  // Cache line padding
    long p10, p11, p12, p13, p14, p15, p16, p17;
}

public abstract class MpscArrayQueueConsumerIndexField<E> extends MpscArrayQueueL2Pad<E> {
    protected long consumerIndex;  // 단일 Consumer, volatile 불필요
}

Producer와 Consumer의 인덱스를 별도의 캐시 라인에 배치하여 False Sharing을 방지한다.

MPMC의 복잡성: Vyukov 알고리즘

SPSC는 단순하지만, Multi Producer Multi Consumer (MPMC)는 훨씬 복잡하다.

문제점은 명확하다. 두 Producer가 동시에 같은 슬롯에 쓰기를 시도하거나, 두 Consumer가 동시에 같은 슬롯에서 읽기를 시도하면 충돌이 발생한다. 또한 한 Producer가 느리면 “구멍(hole)“이 생겨서 불연속성이 발생한다.

Dmitry Vyukov의 Bounded MPMC Queue는 이를 해결한다. 각 슬롯에 시퀀스 번호를 추가하여 슬롯의 상태를 추적한다:

class MPMCArrayQueue<E> {
    private final E[] buffer;
    private final long[] sequences;  // 각 슬롯의 시퀀스 번호
    private final int mask;

    // Producer
    public boolean offer(E e) {
        long currentProducerSequence;
        long seq;
        long seqOffset;

        while (true) {
            currentProducerSequence = producerSequence.get();
            seqOffset = calcSequenceOffset(currentProducerSequence);
            seq = lvSequence(seqOffset);  // 슬롯의 시퀀스 읽기

            long delta = seq - currentProducerSequence;

            if (delta == 0) {
                // 슬롯이 비어있음 - CAS로 인덱스 획득 시도
                if (producerSequence.compareAndSet(
                        currentProducerSequence,
                        currentProducerSequence + 1)) {
                    break;  // 성공
                }
            } else if (delta < 0) {
                // 큐가 가득 참
                return false;
            } else {
                // 다른 Producer가 선점 - 재시도
            }
        }

        // 슬롯 획득 성공, 데이터 쓰기
        long offset = calcElementOffset(currentProducerSequence);
        soElement(offset, e);
        soSequence(seqOffset, currentProducerSequence + 1);  // 시퀀스 업데이트
        return true;
    }
}

시퀀스 번호는 슬롯의 상태를 나타낸다. seq == producerIndex면 슬롯이 비어있어서 쓰기 가능하고, seq == producerIndex + 1이면 슬롯에 데이터가 있어서 읽기 가능하다. seq < producerIndex인 경우는 Producer가 느려서 아직 쓰기를 완료하지 못한 상태다.

메모리 오버헤드는 큐 타입에 따라 다르다. SPSC와 MPSC는 배열과 2개의 인덱스만 필요하지만, MPMC는 슬롯마다 시퀀스 번호를 저장해야 하므로 약 3배의 메모리가 필요하다.

4. in reactor…

고성능 웹 어플리케이션을 개발할 때 사용되는 Spring Webflux에서도 버퍼 사이즈를 2의 제곱수가 아닌 수로 설정하게 되었을 때 실제로는 근접한 2의 제곱수로 설정된다. Reactor는 내부적으로 JCTools의 Ring Buffer 구현을 사용한다.

Reactor는 대부분의 경우 SPSC를 사용한다. 실제 소스코드를 보면 기본 unbounded()는 SPSC를 반환한다.

// reactor-core/reactor/util/concurrent/Queues.java
public static <T> Supplier<Queue<T>> unbounded(int linkSize) {
    if (linkSize == XS_BUFFER_SIZE) {
        return XS_UNBOUNDED;
    }
    else if (linkSize == Integer.MAX_VALUE || linkSize == SMALL_BUFFER_SIZE) {
        return unbounded();
    }
    return () -> Hooks.wrapQueue(new SpscLinkedArrayQueue<>(linkSize));
}

public static <T> Supplier<Queue<T>> unboundedMultiproducer() {
    return () -> Hooks.wrapQueue(new MpscLinkedQueue<T>());
}

unbounded()SpscLinkedArrayQueue를 생성하고, 이 생성자는 2의 제곱수로 변환한다:

// reactor-core/reactor/util/concurrent/SpscLinkedArrayQueue.java
SpscLinkedArrayQueue(int linkSize) {
    int c = Queues.ceilingNextPowerOfTwo(Math.max(8, linkSize));
    this.producerArray = this.consumerArray = new AtomicReferenceArray<>(c + 1);
    this.mask = c - 1;
}

// reactor-core/reactor/util/concurrent/Queues.java
public static int ceilingNextPowerOfTwo(int x) {
    return 1 << (32 - Integer.numberOfLeadingZeros(x - 1));
}

ceilingNextPowerOfTwo()는 입력값보다 크거나 같은 가장 작은 2의 제곱수를 반환한다. 따라서 10,000을 설정하면 16,384 (\(2^{14}\))가 할당된다.

Reactor의 파이프라인은 체인 형태로 동작한다. 각 Operator는 단일 업스트림에서 데이터를 받고, 단일 다운스트림으로 데이터를 전달하기 때문에 각 단계는 SPSC 큐로 충분하다.

Flux.range(1, 100)           // Source
    .map(x -> x * 2)         // Operator1 (업스트림 1개, 다운스트림 1개)
    .filter(x -> x > 10)     // Operator2 (업스트림 1개, 다운스트림 1개)
    .subscribe(System.out::println); // Sink

여러 소스를 병합할 때만 MPSC가 필요하다.

// reactor-core/reactor/core/publisher/FluxMerge.java

final class FluxMerge<T> extends Flux<T> {
    final Publisher<? extends T>[] sources;  // 여러 소스!
    final Supplier<? extends Queue<T>> mainQueueSupplier;  // MPSC 필요

    FluxMerge(Publisher<? extends T>[] sources,
              boolean delayError,
              int maxConcurrency,
              Supplier<? extends Queue<T>> mainQueueSupplier, // ← 여기에 MPSC 전달
              int prefetch,
              Supplier<? extends Queue<T>> innerQueueSupplier) {
        this.sources = sources;
        this.mainQueueSupplier = mainQueueSupplier;
        // ...
    }
}

merge, mergeSequential, zip 등 여러 스트림을 합칠 때만 MPSC를 사용한다.

Flux<Integer> flux1 = Flux.range(1, 100);
Flux<Integer> flux2 = Flux.range(101, 100);
Flux<Integer> flux3 = Flux.range(201, 100);

Flux.merge(flux1, flux2, flux3)  // 3개의 Producer → 1개의 Consumer (MPSC 사용)
    .subscribe(System.out::println);

  1. Agner Fog, Instruction tables - Intel Skylake: DIV 26 cycles, AND 1 cycle ↩︎

  2. Martin Fowler, The LMAX Architecture ↩︎

  3. Jake Edge, A futex overview and update - Futex는 경합이 없을 때 유저 공간에서 atomic 연산만 사용하고, 경합이 있을 때만 커널을 호출한다. ↩︎


comments powered by Disqus