Apache Flink 톺아보기
1. Apache Flink?
컴퓨터 과학에서 Stream Processing는 시간 순서대로 발생하는 연속된 이벤트(스트림)를 계산의 중심 입력 및 출력으로 다루는 프로그래밍 패러다임이다. 스트림 처리 시스템은 데이터 스트림에 대한 병렬 처리를 제공하며, 효율적인 구현을 위해 스트리밍 알고리즘에 의존한다.
데이터 스트림에 대한 상태 기반 연산(Stateful Computations over Data Streams)
Apache Flink는 무한(unbounded) 또는 유한(bounded) 데이터 스트림에 대한 상태 기반 연산(Stateful Computation)을 수행하기 위한 프레임워크이자 분산 처리 엔진이다. Flink는 모든 일반적인 클러스터 환경에서 실행할 수 있으며, 인메모리 수준의 속도로 어떤 규모에서든 연산을 수행한다.
Stateful Computation
데이터 흐름의 많은 작업은 한 번에 하나의 개별 이벤트만 처리하지만, 일부 작업들은 여러 이벤트에 걸쳐 정보를 기억한다.
- 애플리케이션이 특정 이벤트 패턴을 검색할 때, 상태는 지금까지 발생한 이벤트 시퀀스를 저장한다.
- 분/시간/일 단위로 이벤트를 집계할 때, 상태는 대기 중인 집계 값을 보유한다.
- 데이터 포인트 스트림을 통해 머신 러닝 모델을 학습할 때, 상태는 현재 버전의 모델 파라미터를 보유한다.
- 과거 데이터를 관리해야 할 때, 상태는 과거에 발생한 이벤트에 효율적으로 접근할 수 있게 한다.
이런 작업을 Stateful하다고 한다.
Flink는 장애를 허용하기 위해 체크포인트와 세이브포인트를 통해 상태를 관리한다. 상태 정보를 활용하면 Flink 애플리케이션의 리스케일링이 가능하며, 인스턴스들 간에 상태를 자동으로 재분배한다.
Keyed State
Keyed State는 임베디드 key-value 저장소 형태로 유지된다. 상태는 연산자가 읽는 스트림과 함께 엄격하게 분할(partition)되고 분산된다. 따라서 key-value 상태 접근은 keyed 스트림, 즉 keyed/partitioned 데이터 교환 이후에만 가능하며, 현재 이벤트의 키와 연관된 값으로 제한된다.
스트림과 상태의 키를 일치시키면 모든 상태 업데이트가 로컬에서 처리되므로, 분산 트랜잭션 오버헤드 없이 일관성을 보장한다. Flink는 이 방식으로 상태 재분배와 스트림 파티셔닝을 자동으로 처리한다.
Keyed State는 Key Group이라는 단위로 더 세분화된다. Key Group은 Flink가 Keyed State를 재분배하는 최소 단위(atomic unit)로, 정의된 최대 병렬성(maximum parallelism) 개수만큼 존재한다. 실행 중에 각 병렬 인스턴스는 하나 이상의 Key Group을 담당하여 키를 처리한다.
2. Stateful Stream Processing
State Persistence
Flink는 스트림 재생(stream replay) 과 체크포인팅(checkpointing) 을 결합하여 장애 허용성을 구현한다. 체크포인트는 각 입력 스트림의 특정 지점과 각 연산자의 상태를 표시한다. 장애 발생 시 연산자 상태를 복원하고 체크포인트 시점부터 레코드를 재생하여, 정확히 한 번(Exactly Once) 처리 의미론을 유지하면서 데이터 플로우를 재개한다.
체크포인트 간격은 실행 중 장애 허용성 오버헤드와 복구 시간 간의 트레이드오프를 조정한다.
장애 허용성 메커니즘은 분산 스트리밍 데이터 플로우의 스냅샷을 지속적으로 생성한다. 상태가 작은 스트리밍 애플리케이션의 경우, 이러한 스냅샷은 매우 가벼우며 성능에 큰 영향 없이 자주 생성할 수 있다. 스트리밍 애플리케이션의 상태는 구성 가능한 위치, 일반적으로 분산 파일 시스템에 저장한다.
장애가 발생하면 Flink는 데이터 플로우를 중지하고, 연산자를 재시작한 후 마지막으로 성공한 체크포인트로 복원한다.
Checkpointing
Flink의 장애 허용성 메커니즘의 핵심은 분산 데이터 스트림과 연산자 상태의 일관된 스냅샷을 생성하는 것이다. 이러한 스냅샷은 장애 발생 시 시스템이 되돌아갈 수 있는 일관된 체크포인트 역할을 한다. Flink의 스냅샷 생성 메커니즘은 “Lightweight Asynchronous Snapshots for Distributed Dataflows“에 설명되어 있다. 이는 분산 스냅샷을 위한 표준 Chandy-Lamport 알고리즘에서 영감을 받아 Flink의 실행 모델에 맞게 설계했다.
체크포인팅과 관련된 모든 작업은 비동기적으로 수행할 수 있다. 체크포인트 배리어는 동기적으로 이동하지 않으며 연산은 비동기적으로 상태를 스냅샷한다.
Flink 1.11부터 체크포인트는 배리어 정렬(alignment)을 사용하거나 사용하지 않고 생성할 수 있다. 이 섹션에서는 먼저 배리어를 정렬하는 체크포인트를 설명한다.
Barriers
Flink 분산 스냅샷의 핵심 요소는 스트림 배리어(stream barriers) 다. 배리어는 데이터 스트림에 주입되어 레코드와 함께 흐른다. 배리어는 레코드를 추월하지 않고 엄격한 순서를 유지한다. 배리어는 스트림의 레코드를 현재 스냅샷에 포함될 레코드와 다음 스냅샷에 포함될 레코드로 구분한다. 각 배리어는 그 앞의 레코드들이 속한 스냅샷 ID를 전달한다. 배리어는 스트림 흐름을 방해하지 않으므로 매우 가볍다. 서로 다른 스냅샷의 여러 배리어가 동시에 스트림에 존재할 수 있으며, 이는 여러 스냅샷이 동시에 진행될 수 있음을 의미한다.
스트림 배리어는 데이터 소스에서 주입된다. 스냅샷 \(n\)의 배리어가 주입되는 지점 \(S_n\)은 해당 스냅샷이 포함하는 소스 스트림의 위치다. 예를 들어 Apache Kafka에서는 파티션의 마지막 레코드 오프셋이 된다. 이 위치 \(S_n\)은 체크포인트 코디네이터(JobManager)에 보고된다.
그런 다음 배리어는 다운스트림으로 흐른다. 중간 연산자가 모든 입력 스트림에서 스냅샷 \(n\)에 대한 배리어를 받으면 모든 출력 스트림으로 스냅샷 \(n\)에 대한 배리어를 내보낸다. 싱크 연산자(스트리밍 DAG의 끝)가 모든 입력 스트림에서 배리어 \(n\)을 받으면 체크포인트 코디네이터에 스냅샷 \(n\)을 확인한다. 모든 싱크가 스냅샷을 확인하면 완료된 것으로 간주된다.
스냅샷 \(n\)이 완료되면 \(S_n\) 이전의 레코드를 소스에서 다시 요청하지 않는다. 해당 레코드와 그 파생 레코드들은 이미 전체 데이터 플로우를 통과했기 때문이다.
둘 이상의 입력 스트림을 받는 연산자는 스냅샷 배리어를 기준으로 입력 스트림을 동기화해야 한다:
- 연산자가 들어오는 스트림에서 스냅샷 배리어 \(n\)을 받으면, 다른 입력에서도 배리어 \(n\)을 받을 때까지 해당 스트림에서 더 이상 레코드를 처리하지 않는다. 그렇지 않으면 스냅샷 \(n\)에 속하는 레코드와 스냅샷 \(n+1\)에 속하는 레코드가 섞이게 된다.
- 마지막 스트림이 배리어 \(n\)을 받으면 연산자는 대기 중인 모든 출력 레코드를 내보낸 다음 스냅샷 \(n\) 배리어 자체를 내보낸다.
- 상태를 스냅샷하고 모든 입력 스트림에서 레코드 처리를 재개한다. 스트림에서 레코드를 처리하기 전에 입력 버퍼의 레코드를 먼저 처리한다.
- 마지막으로 연산자는 상태를 state backend에 비동기적으로 기록한다.
배리어 동기화는 여러 입력을 가진 모든 연산자와 여러 업스트림 서브태스크의 출력 스트림을 소비하는 셔플 이후의 연산자에 필요하다.
Snapshotting Operator State
연산자에 어떤 형태의 상태가 포함되어 있으면 이 상태도 스냅샷의 일부여야 한다.
연산자는 모든 입력 스트림에서 배리어를 받은 후, 출력으로 배리어를 내보내기 전에 상태를 스냅샷한다. 이 시점에서 배리어 이전 레코드에 의한 모든 상태 업데이트는 완료되었고, 배리어 이후 레코드에 의한 업데이트는 적용되지 않았다. 스냅샷 상태는 구성 가능한 state backend에 저장한다. 기본값은 JobManager 메모리지만, 프로덕션에서는 분산 저장소(예: HDFS)를 사용해야 한다. 상태 저장 후 연산자는 체크포인트를 확인하고, 출력으로 배리어를 내보낸 뒤 계속 진행한다.
결과 스냅샷에는 다음이 포함된다:
- 각 병렬 스트림 데이터 소스에 대해 스냅샷이 시작될 때 스트림의 오프셋/위치
- 각 연산자에 대해 스냅샷의 일부로 저장된 상태에 대한 포인터
Recovery
복구는 간단하다. 장애 발생 시 Flink는 마지막으로 완료된 체크포인트 \(k\)를 선택한다. 시스템은 전체 데이터 플로우를 재배포하고, 각 연산자에 체크포인트 \(k\)의 스냅샷 상태를 복원한다. 소스는 위치 \(S_k\)부터 스트림을 읽기 시작한다. 예를 들어 Apache Kafka에서는 컨슈머가 오프셋 \(S_k\)부터 페칭을 시작한다.
상태가 증분적으로 스냅샷된 경우, 연산자는 최신 전체 스냅샷의 상태로 시작한 다음 해당 상태에 일련의 증분 스냅샷 업데이트를 적용한다.
Unaligned Checkpointing
체크포인팅은 배리어를 동기화하지 않는 방식으로도 수행할 수 있다. 기본 아이디어는 진행 중인 데이터가 연산자 상태의 일부가 되는 한 체크포인트가 모든 진행 중인 데이터를 추월할 수 있다는 것이다.
이 접근 방식은 실제로 Chandy-Lamport 알고리즘에 더 가깝다. 다만 Flink는 체크포인트 코디네이터의 과부하를 피하기 위해 여전히 소스에 배리어를 삽입한다.
그림은 연산자가 배리어를 동기화하지 않고 체크포인트를 처리하는 방법을 보여준다:
- 연산자는 입력 버퍼에 저장된 첫 번째 배리어에 반응한다.
- 즉시 출력 버퍼의 끝에 추가하여 다운스트림 연산자로 배리어를 전달한다.
- 연산자는 추월된 모든 레코드를 비동기적으로 저장하도록 표시하고 자체 상태의 스냅샷을 생성한다.
결과적으로 연산자는 버퍼 표시, 배리어 전달, 상태 스냅샷 생성을 위해 입력 처리를 잠깐만 중지한다.
배리어를 동기화하지 않는 체크포인팅은 배리어가 가능한 한 빨리 싱크에 도착하도록 보장한다. 배리어 동기화 시간이 몇 시간에 달할 수 있는 느리게 움직이는 데이터 경로가 하나 이상 있는 애플리케이션에 특히 적합하다. 그러나 추가 I/O 압력을 가하기 때문에 state backend에 대한 I/O가 병목인 경우에는 도움이 되지 않는다.
세이브포인트는 항상 배리어를 동기화한다.
Unaligned Recovery
배리어를 동기화하지 않는 체크포인팅에서 연산자는 업스트림 연산자에서 데이터를 처리하기 전에 먼저 진행 중인 데이터를 복구한다. 그 외에는 배리어를 동기화하는 체크포인트의 복구와 동일한 단계를 수행한다.
State Backends
key/value 인덱스가 저장되는 정확한 데이터 구조는 선택한 state backend에 따라 다르다. 한 state backend는 데이터를 인메모리 해시 맵에 저장하고, 다른 state backend는 RocksDB를 key/value 저장소로 사용한다. State backend는 상태를 보유하는 데이터 구조를 정의하는 것 외에도, key/value 상태의 특정 시점 스냅샷을 생성하고 해당 스냅샷을 체크포인트의 일부로 저장하는 로직도 구현한다. 애플리케이션 로직을 변경하지 않고 state backend를 구성할 수 있다.
Savepoints
체크포인팅을 사용하는 모든 프로그램은 세이브포인트(savepoint) 에서 실행을 재개할 수 있다. 세이브포인트를 사용하면 상태를 잃지 않고 프로그램과 Flink 클러스터를 모두 업데이트할 수 있다.
세이브포인트는 수동으로 트리거하는 체크포인트로, 프로그램의 스냅샷을 생성하여 state backend에 기록한다. 이를 위해 일반 체크포인팅 메커니즘을 사용한다.
세이브포인트는 사용자가 트리거하고 새로운 체크포인트가 완료될 때 자동으로 만료되지 않는다는 점을 제외하면 체크포인트와 유사하다.
Exactly Once vs. At Least Once
배리어 동기화는 스트리밍 프로그램에 지연을 발생시킨다. 일반적으로 수 밀리초 수준이지만, 일부 이상치(outlier)는 지연이 크게 증가한다. 일관되게 매우 낮은 지연을 요구하는 애플리케이션을 위해 Flink는 배리어 동기화를 건너뛸 수 있다. 이 경우 연산자가 각 입력에서 배리어를 받는 즉시 체크포인트 스냅샷을 생성한다.
배리어 동기화를 건너뛰면 연산자는 체크포인트 \(n\)의 배리어가 도착한 후에도 모든 입력을 계속 처리한다. 이 방식에서는 체크포인트 \(n\)의 상태 스냅샷이 생성되기 전에 체크포인트 \(n+1\)에 속하는 요소들도 처리한다. 복구 시 이 레코드들은 중복으로 발생한다. 체크포인트 \(n\)의 상태 스냅샷에 포함되어 있으면서 동시에 체크포인트 \(n\) 이후 데이터로도 재생되기 때문이다.
배리어 동기화는 여러 선행 작업을 가진 연산자(조인 등)와 셔플 이후 여러 발신자를 가진 연산자에서만 발생한다. 따라서 완전 병렬(embarrassingly parallel) 연산(
map()
,flatMap()
,filter()
등)만으로 구성된 데이터 플로우는 at least once 모드에서도 사실상 정확히 한 번(exactly once) 보장을 제공한다.
State and Fault Tolerance in Batch Programs
Flink는 배치 프로그램을 유한한 스트림을 가진 스트리밍 프로그램으로 실행한다. 따라서 위의 개념은 배치 프로그램에도 스트리밍 프로그램과 동일하게 적용되지만 몇 가지 예외가 있다:
- 배치 프로그램의 장애 허용성은 체크포인팅을 사용하지 않는다. 입력이 유한하기 때문에 장애 발생 시 스트림을 처음부터 완전히 재생하여 복구한다. 이 방식은 복구 비용이 증가하지만, 체크포인트를 생성하지 않으므로 정상 실행 시 처리 비용이 낮아진다.
- 배치 실행 모드의 State backend는 key/value 인덱스가 아닌 단순화된 인메모리/아웃오브코어 데이터 구조를 사용한다.
3. Architecture of Apache Flink
Flink는 분산 시스템으로, 스트리밍 애플리케이션을 실행하기 위해 컴퓨팅 리소스를 효과적으로 할당하고 관리해야 한다. Hadoop YARN 및 Kubernetes와 같은 모든 일반적인 클러스터 리소스 매니저와 통합할 수 있으며, 독립 실행형 클러스터로 설정하거나 라이브러리로도 실행할 수 있다.
Flink 런타임은 두 가지 유형의 프로세스로 구성된다.
- 하나의 JobManager
- 하나 이상의 TaskManager
Client는 런타임과 프로그램 실행의 일부가 아니며, 데이터 플로우를 준비하여 JobManager에 전송한다.
이후 클라이언트는 연결을 끊거나(detached mode), 진행 상황 보고를 받기 위해 연결을 유지할 수 있다(attached mode).
클라이언트는 Java 프로그램 내부에서 실행되거나 명령줄 프로세스 ./bin/flink run ...
로 실행된다.
JobManager와 TaskManager는 다양한 방식으로 시작할 수 있다. 독립 실행형 클러스터로 머신에서 직접 실행하거나, 컨테이너에서 실행하거나, YARN과 같은 리소스 프레임워크를 통해 관리할 수 있다. TaskManager는 JobManager에 연결하여 사용 가능함을 알리고 작업을 할당받는다.
JobManager
JobManager는 Flink 애플리케이션의 분산 실행을 조정한다. 다음 태스크를 언제 스케줄링할지 결정하고, 태스크 완료나 실행 실패에 반응하며, 체크포인트와 장애 복구를 조정한다. 이 프로세스는 세 가지 구성 요소로 이루어진다.
ResourceManager는 Flink 클러스터의 리소스 할당/해제와 프로비저닝을 담당한다. 리소스 스케줄링 단위인 task slot을 관리한다. Flink는 YARN, Kubernetes, 독립 실행형 배포 등 다양한 환경을 위한 여러 ResourceManager를 제공한다. 독립 실행형 설정에서 ResourceManager는 기존 TaskManager의 슬롯만 배포할 수 있으며, 새 TaskManager를 직접 시작할 수 없다.
Dispatcher는 실행을 위해 Flink 애플리케이션을 제출할 수 있는 REST 인터페이스를 제공하고 제출된 각 작업에 대해 새 JobMaster를 시작한다. 또한 작업 실행 정보를 제공하기 위해 Flink WebUI를 실행한다.
JobMaster는 단일 JobGraph의 실행을 관리하는 역할을 한다. Flink 클러스터에서 여러 작업이 동시에 실행될 수 있으며, 각각 자체 JobMaster를 가진다.
항상 하나 이상의 JobManager가 있다. 고가용성 설정에는 여러 JobManager가 있을 수 있으며, 그중 하나는 항상 leader이고 나머지는 standby이다.
TaskManagers
TaskManager(또는 worker)는 데이터 플로우의 태스크를 실행하고, 데이터 스트림을 버퍼링하며 교환한다.
항상 하나 이상의 TaskManager가 있어야 한다. TaskManager에서 리소스 스케줄링의 가장 작은 단위는 task slot이다. TaskManager의 task slot 수는 동시 처리 태스크 수를 나타낸다. 여러 연산자가 하나의 task slot에서 실행될 수 있다.
Tasks and Operator Chains
분산 실행을 위해 Flink는 연산자 서브태스크를 task로 체이닝(chaining) 한다. 각 태스크는 하나의 스레드에 의해 실행된다. 연산자를 태스크로 체이닝하는 것은 유용한 최적화이다: 스레드 간 핸드오버 및 버퍼링의 오버헤드를 줄이고, 지연을 줄이면서 전체 처리량을 증가시킨다. 체이닝 동작은 구성 가능하다.
아래 그림의 샘플 데이터 플로우는 5개의 서브태스크로 실행되므로 5개의 병렬 스레드로 실행된다.
Task Slots and Resources
각 워커(TaskManager)는 JVM 프로세스이며, 별도의 스레드에서 하나 이상의 서브태스크를 실행할 수 있다. TaskManager가 수락하는 태스크 수를 제어하기 위해 task slot(최소 하나)이라는 것이 있다.
각 task slot은 TaskManager 리소스의 고정된 부분 집합을 나타낸다. 예를 들어, 3개의 슬롯이 있는 TaskManager는 관리 메모리의 1/3을 각 슬롯에 할당한다. 리소스를 슬롯팅한다는 것은 서브태스크가 다른 작업의 서브태스크와 관리 메모리를 놓고 경쟁하지 않고 일정량의 예약된 관리 메모리를 갖는다는 것을 의미한다. 여기서는 CPU 격리가 발생하지 않는다. 현재 슬롯은 태스크의 관리 메모리만 분리한다.
task slot 수를 조정하여 사용자는 서브태스크의 격리 방식을 정의할 수 있다. TaskManager당 하나의 슬롯을 가지면 각 태스크 그룹이 별도의 JVM에서 실행된다(예를 들어 별도의 컨테이너에서 시작할 수 있다). 여러 슬롯을 가지면 더 많은 서브태스크가 동일한 JVM을 공유한다. 동일한 JVM의 태스크는 TCP 연결(멀티플렉싱을 통해)과 하트비트 메시지를 공유한다. 또한 데이터 세트와 데이터 구조를 공유하여 태스크당 오버헤드를 줄인다.
기본적으로 Flink는 서브태스크가 동일한 작업에서 온 것이라면 다른 태스크의 서브태스크인 경우에도 슬롯을 공유할 수 있도록 허용한다. 결과적으로 하나의 슬롯이 작업의 전체 파이프라인을 보유할 수 있다. 이러한 slot sharing을 허용하면 두 가지 주요 이점이 있다:
Flink 클러스터는 작업에서 사용되는 가장 높은 병렬성만큼의 task slot만 필요하다. 프로그램이 총 몇 개의 태스크(다양한 병렬성 포함)를 포함하는지 계산할 필요가 없다.
더 나은 리소스 활용을 얻기가 쉽다. 슬롯 공유가 없으면 비집약적인
source/map()
서브태스크가 리소스 집약적인window
서브태스크만큼 많은 리소스를 차단한다. 슬롯 공유를 사용하면 예제에서 기본 병렬성을 2에서 6으로 증가시키면 슬롯 리소스를 완전히 활용하는 동시에 무거운 서브태스크가 TaskManager 간에 공정하게 분산되도록 보장한다.
Flink Application Execution
Flink Application은 main()
메서드에서 하나 이상의 Flink 작업을 생성하는 사용자 프로그램이다.
작업은 로컬 JVM(LocalEnvironment
) 또는 클러스터의 원격 환경(RemoteEnvironment
)에서 실행된다.
ExecutionEnvironment
는 작업 실행을 제어하고(예: 병렬성 설정) 외부와 상호 작용하는 메서드를 제공한다.
Deployment Modes
Flink는 애플리케이션을 두 가지 모드로 실행할 수 있다. Application Mode와 Session Mode는 클러스터 수명 주기, 리소스 격리 방식, 그리고 애플리케이션의 main()
메서드 실행 위치에서 차이가 있다.
Application Mode에서는 제출된 애플리케이션마다 전용 클러스터가 생성된다. 애플리케이션의 main()
메서드는 JobManager에서 실행되며, 해당 애플리케이션의 작업들끼리만 리소스를 공유한다. 이를 통해 애플리케이션 수준의 리소스 격리와 로드 밸런싱을 제공한다.
여러 작업이 있는 애플리케이션 제출을 허용하며, 작업 실행 순서는 사용된 메서드(execute()
또는 executeAsync()
)에 따라 결정된다.
Application Mode는 Kubernetes와 같은 환경에서 다른 애플리케이션처럼 Flink Application을 배포할 수 있게 한다.
클러스터 엔트리포인트(ApplicationClusterEntryPoint
)가 main()
메서드를 호출하여 JobGraph를 추출한다.
Session Mode에서는 이미 실행 중인 클러스터를 사용한다. 애플리케이션들이 동일한 클러스터 리소스를 공유하고 경쟁한다. 각 작업마다 클러스터를 새로 시작하는 오버헤드를 피할 수 있어, 실행 시간이 매우 짧은 작업(예: 짧은 쿼리의 대화형 분석)에 특히 유용하다.
다만 단점도 있다. 한 작업이 잘못 동작하면 같은 클러스터의 다른 작업에 영향을 줄 수 있으며, TaskManager 실패 시 대규모 복구가 필요할 수 있다. 또한 여러 작업을 관리하는 JobManager의 부하가 증가한다.
4. State Backend
Flink는 상태를 저장하는 방식에 따라 두 가지 State Backend를 제공한다.
HashMapStateBackend (순수 메모리 기반)
HashMapStateBackend는 JVM Heap 메모리에 상태를 저장하는 순수 메모리 기반 State Backend다. 작업 중인 상태(working state)를 TaskManager의 메모리(JVM heap)에 보관하며, 설정된 CheckpointStorage를 기반으로 체크포인트를 생성한다.
상태 크기 고려사항: 작업 상태는 TaskManager의 Heap에 유지한다. TaskManager가 여러 Task를 동시에 실행하는 경우(TaskManager가 여러 슬롯을 가지거나 슬롯 공유를 사용하는 경우), 모든 Task의 상태 합계가 TaskManager의 메모리에 들어가야 한다.
따라서 HashMapStateBackend는 빠른 접근 속도를 제공하지만, 메모리 크기 제한이 명확하다. 기가바이트 이하의 작은 상태를 다루는 애플리케이션에 적합하다.
참고:
HashMapStateBackend.java:40-48
RocksDBStateBackend (메모리 + 디스크 하이브리드)
RocksDBStateBackend는 메모리와 디스크를 함께 사용하는 하이브리드 State Backend다. 상태를 RocksDB에 저장하며, 체크포인트 생성 시 CheckpointStreamFactory가 제공하는 스트림으로 상태를 직렬화한다. 이 State Backend는 메모리를 초과하는 매우 큰 상태를 저장할 수 있으며, 메모리를 넘어선 데이터를 디스크로 spill한다.
RocksDB는 다음과 같은 특성을 가진다:
- 작은 상태: 주로 메모리(MemTable, Block Cache)에서 처리하여 빠른 성능 유지
- 큰 상태: 메모리를 초과하면 자동으로 디스크(SST - Sorted String Table 파일)로 저장되어 테라바이트급 상태 처리 가능
- 효율적 디스크 활용: LSM-Tree 구조로 디스크 기반임에도 높은 접근 효율 제공
Checkpoint와 Durable Storage
두 State Backend 모두 장애 복구를 위해 주기적으로 상태를 Durable Storage에 체크포인팅한다. Amazon Managed Service for Apache Flink에서는 Amazon S3를 Durable Storage로 사용한다:
- 로컬 상태: RocksDB 또는 JVM Heap에 저장
- Checkpoint 시: 상태를 S3에 업로드
- 복구 시: S3의 checkpoint로부터 상태 복원
5. RocksDB
Flink 작업의 상태가 JVM Heap에 담기에 너무 크거나, 증분 체크포인팅을 통해 체크포인트 오버헤드를 줄이려는 경우, 또는 예측 가능한 지연시간이 중요한 경우 RocksDBStateBackend를 사용한다. RocksDB는 TaskManager 프로세스 내에 네이티브 스레드로 임베디드되어 동작하며, 로컬 디스크의 파일과 함께 작동한다. 따라서 RocksDBStateBackend는 별도의 외부 시스템이나 프로세스를 설정하고 관리할 필요 없이 즉시 사용할 수 있다.
LSM Tree
LSM-Tree (Log-Structured Merge-Tree)는 쓰기 성능을 극대화한 디스크 기반 데이터 구조로, Apache Flink의 RocksDB State Backend에서 사용한다. 전통적인 B-Tree와 달리, LSM-Tree는 쓰기 작업을 먼저 메모리에 수행한 후 주기적으로 디스크로 플러시하여 높은 쓰기 처리량을 달성한다.
LSM-Tree는 메모리 영역과 디스크 영역으로 나뉘며, 디스크는 여러 레벨(Level)로 구성된다:
쓰기 과정
LSM-Tree의 쓰기는 메모리 중심으로 최적화되어 있다. 새로운 데이터가 들어오면 먼저 MemTable이라는 메모리 내 자료구조에 기록한다. MemTable은 정렬된 상태를 유지하며, 쓰기 작업은 \(O(1)\) 시간 복잡도로 매우 빠르게 수행한다.
Flink는 MemTable의 크기를 다음과 같이 설정한다:
// RocksDBConfigurableOptions.java:224-230
public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
key("state.backend.rocksdb.writebuffer.size")
.memoryType()
.defaultValue(MemorySize.parse("64mb"))
.withDescription(
"The amount of data built up in memory (backed by an unsorted log on disk) "
+ "before converting to a sorted on-disk files. The default writebuffer size is '64MB'.");
// RocksDBResourceContainer.java:396-400
currentOptions.setWriteBufferSize(
internalGetOption(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE).getBytes());
currentOptions.setMaxWriteBufferNumber(
internalGetOption(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER));
MemTable이 설정된 크기(Flink 기본값 64MB)에 도달하면 Immutable MemTable로 전환한다. 이는 더 이상 쓰기가 불가능한 읽기 전용 상태를 의미한다. 동시에 새로운 MemTable을 생성하여 이후 쓰기 작업을 처리하므로, 쓰기 작업이 중단되지 않는다.
백그라운드에서는 Immutable MemTable을 디스크로 플러시(Flush)한다. 이때 Level 0 SST 파일이 생성되며, 이 파일들은 정렬되어 있지만 서로 간에 키 범위가 겹칠 수 있다.
Flink는 SST 파일의 크기를 다음과 같이 제어한다:
// RocksDBConfigurableOptions.java:208-214
public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASE =
key("state.backend.rocksdb.compaction.level.target-file-size-base")
.memoryType()
.defaultValue(MemorySize.parse("64mb"))
.withDescription(
"The target file size for compaction, which determines a level-1 file size. "
+ "The default value is '64MB'.");
// RocksDBConfigurableOptions.java:216-222
public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASE =
key("state.backend.rocksdb.compaction.level.max-size-level-base")
.memoryType()
.defaultValue(MemorySize.parse("256mb"))
.withDescription(
"The upper-bound of the total size of level base files in bytes. "
+ "The default value is '256MB'.");
Level 0에 파일이 쌓이면, RocksDB는 자동으로 Compaction 프로세스를 시작한다. Compaction은 여러 SST 파일을 병합하여 상위 레벨(Level 1, 2, 3…)로 이동시킨다. 이 과정에서 중복된 키는 최신 값으로 통합되고, 삭제된 데이터는 실제로 제거되어 공간이 회수된다.
Flink는 기본적으로 LEVEL Compaction 방식을 사용한다:
// RocksDBConfigurableOptions.java:136-148
public static final ConfigOption<CompactionStyle> COMPACTION_STYLE =
key("state.backend.rocksdb.compaction.style")
.enumType(CompactionStyle.class)
.defaultValue(LEVEL)
.withDescription(
String.format(
"The specified compaction style for DB. Candidate compaction style is %s, %s, %s or %s, "
+ "and Flink chooses '%s' as default style.",
LEVEL.name(),
FIFO.name(),
UNIVERSAL.name(),
NONE.name(),
LEVEL.name()));
// RocksDBResourceContainer.java:381-382
currentOptions.setCompactionStyle(
internalGetOption(RocksDBConfigurableOptions.COMPACTION_STYLE));
상위 레벨로 갈수록 데이터는 더 정렬되고 압축되며, 각 레벨의 크기는 이전 레벨보다 약 10배씩 증가한다.
읽기 과정
읽기 작업은 최신 데이터부터 순차적으로 탐색한다. 먼저 MemTable을 확인하고, 여기에 데이터가 있다면 즉시 반환한다. MemTable에 없다면 Immutable MemTable을 확인한다.
디스크 검색이 필요한 경우, LSM-Tree는 Bloom Filter를 활용하여 효율성을 높인다. Bloom Filter는 확률적 자료구조로, 원소가 집합에 속하는지를 빠르게 판단한다. 특히 “원소가 집합에 없다"는 것을 100% 확신할 수 있으며, “있을 수도 있다"고 답할 때는 위양성(false positive)이 발생할 수 있다.
Bloom Filter는 \(m\)개의 비트로 구성된 배열과 \(k\)개의 해시 함수로 구성한다. 초기 상태에서는 모든 비트를 0으로 설정한다. 위 그림에서는 \(m=18\) (18개의 비트), \(k=3\) (3개의 해시 함수)를 사용한다.
원소를 추가할 때는 \(k\)개의 해시 함수를 모두 적용하여 \(k\)개의 위치를 얻는다. 그림에서 \(x, y, z\)를 추가할 때 각 원소는 3개의 해시 함수에 의해 3개의 비트 위치에 매핑하고, 해당 위치의 비트를 1로 설정한다. 예를 들어 \(x\)는 색칠된 화살표가 가리키는 세 위치의 비트를 1로 만든다.
원소가 집합에 속하는지 확인할 때도 동일한 \(k\)개의 해시 함수를 적용한다. 모든 해당 위치의 비트가 1이면 “원소가 있을 수 있음"으로 판단한다. 하지만 하나라도 0이면 “원소가 확실히 없음"으로 판단한다. 그림에서 \(w\)는 집합 \(\{x, y, z\}\)에 없는 원소인데, \(w\)를 해싱한 결과 중 하나가 0인 위치를 가리키므로 즉시 “없음"으로 판단할 수 있다.
LSM-Tree에서 Bloom Filter는 각 SST 파일마다 생성되며, 특정 키가 해당 파일에 존재하지 않음을 \(O(1)\) 시간에 확률적으로 판단할 수 있다. 예를 들어, 10개의 SST 파일을 검색해야 하는 상황에서 Bloom Filter를 통해 8개 파일에 키가 없음을 즉시 판단하면, 실제로 디스크를 읽는 파일은 2개로 줄어든다.
Flink에서 Bloom Filter를 활성화하는 방법이다.
// RocksDBConfigurableOptions.java:281-287
public static final ConfigOption<Boolean> USE_BLOOM_FILTER =
key("state.backend.rocksdb.use-bloom-filter")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, every newly created SST file will contain a Bloom filter. "
+ "It is disabled by default.");
// RocksDBResourceContainer.java:434-442
if (internalGetOption(RocksDBConfigurableOptions.USE_BLOOM_FILTER)) {
final double bitsPerKey =
internalGetOption(RocksDBConfigurableOptions.BLOOM_FILTER_BITS_PER_KEY);
final boolean blockBasedMode =
internalGetOption(RocksDBConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE);
BloomFilter bloomFilter = new BloomFilter(bitsPerKey, blockBasedMode);
handlesToClose.add(bloomFilter);
blockBasedTableConfig.setFilterPolicy(bloomFilter);
}
Bloom Filter 검사를 통과한 SST 파일은 Block Cache를 먼저 확인한다. Block Cache는 자주 접근하는 데이터 블록을 메모리에 캐싱하여, 반복적인 디스크 읽기를 방지한다.
// RocksDBConfigurableOptions.java:265-271
public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
key("state.backend.rocksdb.block.cache-size")
.memoryType()
.defaultValue(MemorySize.parse("8mb"))
.withDescription(
"The amount of the cache for data blocks in RocksDB. "
+ "The default block-cache size is '8MB'.");
// RocksDBResourceContainer.java:232-236
blockBasedTableConfig.setBlockCache(blockCache);
blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true);
opt.setTableFormatConfig(blockBasedTableConfig);
캐시에 데이터가 없을 때만 실제 디스크에서 SST 파일을 읽는다.
디스크 읽기는 Level 0부터 시작하여 하위 레벨로 순차적으로 진행된다. 각 SST 파일 내부에서는 데이터가 정렬되어 있으므로 이진 탐색을 통해 빠르게 키를 찾을 수 있다. 원하는 키를 찾거나 모든 레벨을 탐색할 때까지 이 과정을 반복한다.
Flink의 최적화 설정
Flink는 사용 환경에 따라 사전 정의된 옵션을 제공한다. SPINNING_DISK_OPTIMIZED는 HDD 환경을 위한 기본 설정이고, SPINNING_DISK_OPTIMIZED_HIGH_MEM은 더 많은 메모리를 사용하여 성능을 향상시킨다:
// PredefinedOptions.java:76-86
SPINNING_DISK_OPTIMIZED(
new HashMap<ConfigOption<?>, Object>() {
{
put(RocksDBConfigurableOptions.COMPACTION_STYLE, CompactionStyle.LEVEL);
put(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, true);
put(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
put(RocksDBConfigurableOptions.MAX_OPEN_FILES, -1);
}
}),
// PredefinedOptions.java:117-136
SPINNING_DISK_OPTIMIZED_HIGH_MEM(
new HashMap<ConfigOption<?>, Object>() {
{
put(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, MemorySize.parse("256mb"));
put(RocksDBConfigurableOptions.BLOCK_SIZE, MemorySize.parse("128kb"));
put(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, true);
put(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
put(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, MemorySize.parse("1gb"));
put(RocksDBConfigurableOptions.MAX_OPEN_FILES, -1);
put(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, 4);
put(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE, 3);
put(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, MemorySize.parse("256mb"));
put(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, MemorySize.parse("64mb"));
put(RocksDBConfigurableOptions.USE_BLOOM_FILTER, true);
}
}),
HIGH_MEM 설정은 Block Cache를 256MB로, Write Buffer를 64MB로 설정하고 Bloom Filter를 활성화하여 대용량 상태 처리에 최적화되어 있다.
RocksDB가 Flink에서 효율적인 이유는 다음과 같이 정리해볼 수 있다.
- 빠른 쓰기: 메모리의 MemTable에 \(O(1)\)로 직접 기록
- 효율적인 읽기:
- Bloom Filter로 불필요한 디스크 I/O 제거
- Block Cache로 자주 접근하는 데이터를 메모리에 유지
- 정렬된 SST 파일에서 이진 탐색으로 빠른 검색
- 공간 최적화: Compaction으로 중복 데이터 제거 및 삭제된 데이터 정리
- 확장성: 메모리 크기를 초과하는 상태도 디스크를 활용하여 처리 가능
이러한 특성 덕분에 Flink는 테라바이트급 상태를 효율적으로 관리하면서도 낮은 지연시간을 유지할 수 있다.
6. Apache Flink API
Flink는 다양한 추상화 수준의 API를 제공하여 사용자가 필요에 따라 적절한 레벨을 선택할 수 있다.
Process Function
가장 낮은 레벨의 API로, DataStream API에 내장되어 있다. 상태 기반(stateful) 이고 시간 기반(timely) 스트림 처리를 제공하며, 일관되고 장애 허용적인 상태를 유지하면서 이벤트를 직접 처리한다. 복잡한 시간 기반 콜백을 통해 세밀한 제어가 가능하다.
stream.keyBy(event -> event.getUserId())
.process(new ProcessFunction<Event, Result>() {
private ValueState<Long> state;
@Override
public void processElement(Event event, Context ctx, Collector<Result> out) {
// 이벤트별 세밀한 처리 로직
Long count = state.value();
state.update(count + 1);
// 타이머 등록
ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) {
// 타이머 콜백 처리
}
});
DataStream API
무한(unbounded)과 유한(bounded) 스트림을 모두 처리한다. 데이터 처리를 위한 일반적인 빌딩 블록을 제공하며, transformations, joins, aggregations, windows, state 등을 지원한다. 데이터는 언어별 클래스로 처리하며, Process Function과 통합한다.
DataStream<Event> events = env.addSource(new KafkaSource<>(...));
DataStream<Statistics> statistics = events
.filter(event -> event.getType().equals("purchase"))
.keyBy(event -> event.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new SumAggregateFunction());
statistics.addSink(new ElasticsearchSink<>(...));
Table API
동적으로 변경되는 테이블을 중심으로 하는 API다.
스키마가 있는 관계형 모델을 따르며, select
, join
, aggregate
와 같은 선언적 연산을 제공한다.
Core API보다 표현력은 낮지만 더 간결하며, 쿼리 최적화를 위한 옵티마이저를 포함한다.
테이블과 DataStream 간 변환이 가능하다.
TableEnvironment tableEnv = TableEnvironment.create(settings);
Table events = tableEnv.from("Events");
Table result = events
.where($("type").isEqual("purchase"))
.groupBy($("userId"))
.select(
$("userId"),
$("amount").sum().as("totalAmount"),
$("userId").count().as("purchaseCount")
);
tableEnv.toDataStream(result).print();
SQL
가장 높은 추상화 수준의 API다. 의미론적으로 Table API와 유사하며, 프로그램을 SQL 쿼리 표현식으로 나타낸다. Table API와 긴밀하게 상호 작용하며, Table API로 정의한 테이블에 대해 쿼리를 실행할 수 있다.
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.executeSql(
"CREATE TABLE Events (" +
" userId STRING," +
" type STRING," +
" amount DECIMAL(10, 2)," +
" eventTime TIMESTAMP(3)," +
" WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'events'" +
")"
);
Table result = tableEnv.sqlQuery(
"SELECT " +
" userId, " +
" SUM(amount) AS totalAmount, " +
" COUNT(*) AS purchaseCount " +
"FROM Events " +
"WHERE type = 'purchase' " +
"GROUP BY userId"
);