Kafka Streams, 코파티셔닝(co-partitioning)을 모르면 벌어지는 KStream-KTable Join 대참사
Kafka Streams를 활용하여 실시간 피드 제공 기능을 개발하는 도중 KStream-KTable 간 조인 시 발생했던 문제 상황을 공유하고자 합니다.
글에서 제공되는 예시 코드는 실제로 동작하지 않는 pseudo code입니다.
현재 상황
코스피/코스닥에 상장된 기업들의 주가 정보(시가/고가/종가/현재가 등등)을 5초마다 호출하여 파티션이 5개인 토픽에 저장합니다.
오전 6시에 이전 영업일의 코스피/코스닥에 상장된 기업들의 valuation 값(per, pbr, psr 등등)들이 저장된 엑셀 데이터를 파싱하여 DB 저장 및 파티션이 1개인 토픽에 저장합니다.
두 토픽에 저장된 데이터를 이용하여 각 기업의 실시간 주가 정보를 이용하여 valuation을 계산하여 오전 6시에 엑셀 데이터를 파싱하여 토픽에 저장한 valuation 값과의 비교를 통하여 조건에 해당하는 기업을 당일에 한번만 찾아 별도의 토픽에 생성하는 기능입니다.
주가 정보가 들어오는건 5초마다 저장되어 값 계산을 해야 했기 때문에 KStreams로 처리하고, 비교 대상이 되는 valuation 값은 오전 6시에 한번만 생성되어 토픽에 저장되기 때문에 KTable으로 처리하여 두 값을 비교하여 계산하기 위해 KStreams-KTable간 조인이 필요했습니다.
추가로 당일에 조건에 부합하여 시그널로 생성된 기업은 이후에 다시 조건에 부합되어도 토픽에 저장하면 안된다는 제약 조건이 있습니다.
문제 해결을 위해 가장 처음 시도한 방법
처음에는 KStream과 KTable을 조인하여 스트림 프로세서(process 메서드)에서 특정 조건에 부합하면 메시지(시그널 기업 메시지)를 생성하고 중복 메시지 필터링을 위해 상태 저장소에 저장하고 별도의 토픽에 보냈습니다.
상태저장소를 선언한 이유는 당일에 시그널로 생성된 기업은 한번만 생성되어야 하기 때문에 스트림 프로세서에서 조건에 부합하여도 상태 저장소에 해당 기업이 존재하는지 체크를 하고 없으면 피드로 나가기 위해 필터링을 위한 상태저장소 입니다.
예시 코드
public void topology(StreamsBuilder streamsBuilder) {
streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("signal-company-state-store")
));
KTable<NewKey, Value> ktable = streamsBuilder.stream("topic-valuation")
.selectKey((key, value) -> new NewKey(key, value))
.toTable(...);
streamsBuilder.stream("topic-company")
.selectKey((key, value) -> new NewKey(key, value))
.join(ktable, new Joiner())
.process(new Processor())
.to("topic-signal-company")
}
public class Processor implements ProcessorSupplier<NewKey, JoinedValue, SignalKey, Signalvalue> {
... (get 메서드 오버라이드 Processor 인터페이스를 구현하고 있는 ContextualProcessor을 상속받은 CustomProcessor 생성)
public class CustomProcessor extends ContextualProcessor<NewKey, JoinedValue, SignalKey, Signalvalue> {
private KeyValueStore<SignalKey, Signalvalue> signalStateStore;
@Override
public void init(ProcessorContext<SignalKey, Signalvalue> context) {
this.signalStateStore = context.getStateStore("signal-company-state-store"); // 이전 토폴로지에서 선언한 상태 저장소 선언
}
@Override
public void process(Record<NewKey, JoinedValue> record) {
... // 메시지를 가져와 조건 검사
if (signalStateStore.get(signalKey) == null) {
signalStateStore.put(signalKey) // 시그널 메시지로 생성된 기업을 필터링 하기 위해 상태 저장소에 저장
this.context().forward(new Record<>(signalKey, signalValue));
} // 조건에 부합하는 메시지가 상태 저장소에 있는지 검사하여 없으면 상태 저장소에 저장 이후 스트림 처리 -> .to() 로 토픽에 메시지 전송
}
}
}
예상대로 입력 스트림을 통해 들어온 기업 주가 정보 메시지는 기업들의 valuation 값 데이터와 조인하여 프로세서에 조건에 부합하는 시그널 기업 메시지는 생성되었습니다.
하지만 몇 초 뒤 동일한 기업의 시그널 메시지가 또 생성되는것이 확인 되었습니다. 즉 상태 저장소를 활용한 필터링이 제대로 동작하지 않는것을 알 수 있었습니다.
이는 코파티셔닝이라는 개념을 알지 못해 발생한 문제였습니다.
KStream-KTable간 조인 시 중요한 개념인 코파티셔닝(co-partitioning)을 알아야 중복 메시지가 상태 저장소를 이용해 필터링 되지 않은 이유를 알 수 있습니다.
코파티셔닝
코파티셔닝을 간단하게 설명하면 동일 키가 KStream과 KTable에서 같은 파티션(동일 Task)에 위치하도록 파티션 개수와 파티셔닝 전략을 일치시키는 것을 의미합니다.
- 파티션 개수가 갖고 파티셔닝 전략이 같은 경우에는 동일한 메시지 키를 가진 데이터가 동일한 태스크에 들어가는 것을 보장합니다.
즉, 현재 KStream으로 선언한 기업의 주가 정보를 저장하는 토픽의 파티션은 5개, KTable으로 선언한 기업의 valuation 지표를 저장하는 토픽의 파티션은 1개로 코파티셔닝이 되지 않은 상태로 조인하였기 때문에 발생한 문제였습니다.
현재 상황을 그림으로 표현하면 아래 그림과 같습니다.
5개의 파티션을 갖고 있는 "topic-company" 토픽과 1개의 파티션을 갖고 있는 "topic-valuation" 토픽에 producer가 메시지를 생성하는 모습입니다. (실제로는 각 토픽에는 다른 producer가 메시지를 생성)
그리고 Streams Application에서는 각 파티션별로 Task가 존재하기 때문에 topic-company의 state-store는 1~5까지 존재하며, topic-valuation의 state-store는 1개만 존재합니다.
그리고 Key3, Key4의 메시지가 topic-valuation은 파티션 1번에 저장되어 있지만, topic-company는 각각 파티션 2, 3번에 저장되어 있습니다.
동일 키(Key3, Key4)가 다른 파티션에 위치하여 코파티셔닝이 되어있지 않은 상태입니다.
왜 코파티셔닝을 해야 하는가?
우선 Kafka Streams에서의 KStream-KTable 조인은, 다음과 같은 메커니즘으로 동작합니다.
KStream
- 실시간으로 들어오는 이벤트 스트림입니다.
- 각 파티션마다 병렬로 이벤트를 처리하고, 로컬 상태 저장소(local state store)를 사용하여 필요한 데이터를 관리합니다.
KTable
- key값에 대하여 가장 최신 값만 유지하는 상태를 테이블로 표현한 것입니다.
- 특정 파티션에 로컬 상태 저장소로 저장되어 있으며, KStream 측에서 동일한 키를 조회할 때마다 해당 KTable 데이터를 빠르게 가져와서 조인합니다.
문제는 KStream과 KTable이 모두 같은 키(기업 코드 등)에 대해서 조인을 해야 하는데, 이 키가 서로 동일한 파티션에 있어야만 정확한 매칭과 조회가 가능하다는 데 있습니다.
이를 코파티셔닝(co-partitioning)이라 하며, "동일한 키는 KStream과 KTable 각각에서 반드시 동일한 파티션 번호로 라우팅되어야 한다." 라는 조건이 필수적입니다.
현재 상황을 통해 구체적으로 KStream-Ktable 간 조인을 알아보자.
Kafka Streams는 토픽의 파티션 수(5개)에 맞추어 병렬 처리를 위한 Task를 생성합니다.
그리고 각 Task는 각각의 로컬 상태 저장소를 가지고 있습니다. (총 5개의 task가 각각의 로컬 상태 저장소를 갖습니다)
- KStream(5개 파티션)은 5개의 Task가 있습니다. 각 Task는 서로 다른 파티션의 메시지를 담당합니다.
- KTable(1개 파티션)은 1개의 Task만 있습니다. 모든 Valuation 데이터를 이 단 하나의 파티션(Task)에서 관리합니다.
그러면 동일 기업 키를 가진 메시지가 KStream 쪽에서 파티션 0, 1, 2, 3, 4 중 어디론가 들어왔을 때, KTable의 로컬 상태 저장소와 정확히 어떻게 조인이 될까요?
1. Task 0에서 어떤 기업의 주가 데이터가 들어와서, 해당 기업 키에 대한 KTable 조인하여 시그널 기업에 부합하는지 조건 검사를 합니다.
2. 상태 저장소("signal-company-state-store")에 “시그널 기업 데이터”가 없으면 시그널 기업 메시지를 생성하고, 상태 저장소에 해당 메시지를 생성합니다.
3. 몇 초 뒤, Task 1로 같은 기업 키가 들어옵니다.
4. Task 1에는 Task 0에서 기록해놓은 로컬 상태 저장소("signal-company-state-store")가 공유되지 않습니다. (다른 파티션, 다른 Task 이므로 서로 다른 로컬 스토어임)
5. 따라서 Task 1에서는 “해당 기업 코드가 시그널 발생 이력이 없네?”라고 판단하고 시그널을 또 발생시킵니다. -> 중복 메시지 발생
결과적으로 KStream의 어떤 Task에서는 "이미 한 번 시그널 보낸 기업"이라도 자신의 로컬 상태 저장소에는 해당 데이터가 없어서 다시 시그널을 발생시키게 됩니다.
코파티셔닝이 깨졌을 때 흔히 나타나는 문제이며 실제로 현재 겪은 문제인 "같은 기업 시그널이 중복 생성되는 이슈"의 근본적인 원인이였습니다.
이때 문득 드는 의문점은 "동일 키 = 동일 파티션 저장이 아닌가?" 였습니다.
"동일 키에 대하여 동일 파티션으로 저장되기 때문에 로컬 저장소가 task별로 있어도 필터링이 되어야 하는게 아닌가?" 라는 생각을 했습니다.
예를 들어 기업 키가 "A00123" 인 기업이 KStream으로 들어왔을 때, 1번 파티션(A00123의 해시값이 1이라고 가정)에 들어와서 시그널 기업으로 포착되어 해당 1번 Task의 로컬 상태 저장소에 저장되었습니다.
이후 몇초 뒤 동일 기업 키인 "A00123"도 같은 파티션1으로 들어와 1번 Task의 로컬 상태 저장소를 보고 시그널 기업이 저장되어 있으니 중복 처리가 되어야 하지 않나? 라는 생각을 했습니다.
그렇다면 왜 ‘동일 키’가 다른 파티션에서 들어오는 상황이 발생할까?
“동일 키가 항상 동일 파티션으로 저장되지 않는 케이스"가 존재한다고 합니다.
대표적인 케이스로 파티션 개수 불일치로 인해 재파티셔닝이 일어나거나, 파티션이 달라지는 경우입니다.
Kafka Streams 애플리케이션 내부에서 KStream-KTable 조인을 수행 시 파티션 개수가 다른 경우, 실제로 동일 키가 어떤 파티션으로 매핑될지 Stream 처리 과정에서 재파티셔닝 연산이 발생할 수 있습니다.
즉, 재파티셔닝으로 인해 같은 키임에도 다른 파티션 번호로 재분배될 수 있습니다.
이론적으로는 기존에 생각했던 ‘동일 키 → 동일 파티션 저장’이 kakfa의 파티셔닝 규칙이지만 "파티션 개수 불일치", "재파티셔닝" 등등 동일 키가 다른 파티션으로 저장되는 요인이 존재할 수 있습니다.
중복 메시지를 필터링 하는 해결책은?
계속 언급했듯이 KStream-KTable 조인에서는 동일 키가 동일 파티션 번호로 저장되게 하기 위해 코파티셔닝을 하거나 GlobalStateStore를 활용해야 합니다.
- KTable 파티션을 5개로 늘려서 KStream과 동일 개수로 맞추고, 파티셔닝 전략도 동일하게 설정하는 방법
- GlobalKTable을 활용하여 애플리케이션 전역으로 데이터 복제
우선 가장 정석적인 방법으로 알려져있는 것은 코파티셔닝을 통한 해결책이였습니다.
하지만 리파티셔닝이나 파티셔닝 전략같은 추가적인 작업이 필요한 것 보다 현재 가장 빨리 구현할 수 있는 GlobalKTable을 활용하여 문제를 해결하기로 결정했습니다. (스트림즈 처리에 미숙해 개발 시간을 많이 소요한 상태였습니다..)
GlobalKTable을 사용하는 경우 모든 파티션 데이터가 각 Task로 전부 복제되므로 코파티셔닝이 필요 없어 매우 편하지만 데이터가 모든 Task에 복제 되기 때문에 부하가 커진다는 단점이 존재합니다.
하지만 각 시그널 기업으로 생성되는 데이터도 실제로 부담이 적은 양의 데이터만 생성되기 때문에(토픽 메시지 삭제 기간도 짧게 설정) GlobalKTable을 사용해도 무방하다고 생각하였습니다.
예시 코드
// 기존 상태 저장소를 글로벌 상태 저장소로 변경
public void topology(StreamsBuilder streamsBuilder) {
streamsBuilder.globalTable("valuation-signal-company", Consumed.with(...),
Materialized.<SignalKey, SignalValue, KeyValueStore<Bytes, byte[]>>as(Stores.inMemoryKeyValueStore("filtered-signal-company-state-store"))
.withKeySerde()
.withValueSerde()
);
}
GlobalKTable을 사용해도 중복 메시지가 발생..
그런데 예상했던것과 다르게 GlobalKTable을 사용해도 중복 시그널 기업이 발생하였습니다.
실제로 시그널 기업 생성에 대하여 로그를 확인해보니 아래와 같았습니다.
timestamp: "14:17:47.488", key: "014680" 1차 시그널 기업 생성
timestamp: "14:17:47.548", key: "014680" 2차 시그널 기업 생성 -> 중복 메시지 필터링 처리 안됌
두 메시지의 시간 차를 보면 0.06s 간격을 두고 "014680" 기업의 시그널 메시지가 들어왔습니다.
(실제로 주가 정보 호출은 5초에 한번 이루어지지만 전체 종목에 대한 주가 정보를 받고 다시 각각의 기업 정보로 쪼개어 별도의 토픽에 publish 하는 과정을 비동기로 처리하기 때문에 동일 기업에 대하여 다른 값이 5초가 아닌 더 짧은 시간 차를 두고 들어올 수 있습니다.)
즉, 짧은 시간 (0.06s) 에 들어온 동일 기업 메시지에 대하여 중복 필터링을 하지 못하고 중복 메시지가 발생하였습니다.
왜 이런일이 발생했을까?
결론부터 말하면 GlobalKTable은 각 인스턴스마다 토픽을 비동기로 읽어 로컬 스토어에 업데이트하는 구조라 각 Task의 GlobalStateStore에 메시지가 업데이트 되는데 지연이 발생하였기 때문입니다.
아래 스택오버플로우를 보면 “카프카 스트림이 토픽을 다시 poll()하고 로컬 RocksDB를 업데이트할 때까지 항상 약간의 지연 시간이 있으며 이는 비동기 처리의 근본적인 문제” 라는 코멘트가 있습니다.
https://stackoverflow.com/questions/54104583/kafka-globalktable-latency-issue
즉, 시그널 메시지가 저장되는 토픽(valuation-signal-company)을 GlobalKTable로 선언했을 때, 처음 시그널 기업을 포착되고 valuation-signal-company 토픽에 해당 메시지를 저장하여도, 스트림즈 애플리케이션에서 이를 소비하여 로컬 상태 저장소에 업데이트 할때 약간의 지연이 발생하여 몇몇 기업에 대하여 필터링이 안된 것이였습니다.
실시간성을 100% 보장하지 못하는 비동기적 딜레이가 존재할 수 있어, 시그널 중복 필터링에서 문제가 발생할 수 있습니다.
(GlobalStateStore는 읽기 전용(ReadOnlyKeyValueStore)으로 설계되었기 때문에 프로세서에서 상태 저장소를 가져와 시그널 메시지가 생성되었을 때 직접 put 할 수 없습니다.)
결국엔 코파티셔닝
빠르게 구현하기 위해 GlobalKTable을 사용했지만 latency 이슈로 인해 코파티셔닝과 State Store를 사용하여 중복 메시지 체크와 상태 저장소 업데이트를 동일한 프로세스 단계에서 실행 하는 것이 중복 메시지를 정확히 필터링 할 수 있다고 생각했습니다.
예시 코드
public void topology(StreamsBuilder streamsBuilder) {
streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("signal-company-state-store")
)); // 상태 저장소 선언 -> 실제로는 더 다양한 설정 옵션 추가
KTable<NewKey, Value> ktable = streamsBuilder.stream("topic-valuation")
.selectKey((key, value) -> new NewKey(key, value)) // -> 모든 스트림에서 동일한 키를 사용하여 파티셔닝을 일관되게 유지
.repartition(Repartitioned.with(...)
.withNumberOfPartitions(5)) // 코파티셔닝을 위한 파티션 개수 조정 (1 -> 5)
// withStreamPartitioner()으로 파티셔닝 전략 지정 가능 여기서는 해시 기반의 기본 파티셔닝 전략 사용
.toTable(...);
streamsBuilder.stream("topic-company")
.selectKey((key, value) -> new NewKey(key, value))
.join(ktable, new Joiner())
.process(new Processor())
.to("topic-signal-company")
}
// Joiner 및 Processor는 기존 코드와 동일
로컬 상태 저장소 업데이트는 프로세서내에서 시그널 기업 메시지가 생성되었을떄 바로 put으로 업데이트해주고, 코파티셔닝을 통해 동일 키가 동일 파티션에 들어감을 보장해줌으로써 중복 메시지가 발생하지 않도록 해결할 수 있었습니다.
정리
정리하면 이번 중복 메시지 이슈는 KStream-KTable 간 조인에서 파티션 불일치(코파티셔닝 불일치) 때문에 동일 키가 다른 로컬 상태 저장소(Task)로 들어와, 한 번 필터링된 기업임에도 다시 시그널이 생성된 것이 근본 원인입니다.
이를 해결하는 대표적인 방법은 아래 두 가지입니다.
1. 코파티셔닝
- KStream의 파티션 수에 맞춰 KTable도 동일한 파티션 수로 만들어줍니다.
- Repartition 연산 등을 통해 같은 키는 반드시 동일 파티션으로 들어가도록 보장하면, 로컬 상태 저장소를 이용해 중복 메시지를 정확히 필터링할 수 있습니다.
2. GlobalKTable 활용
- KTable 전체 데이터를 각 Task 로컬 스토어에 전부 복제합니다.
- 코파티셔닝 과정을 따로 맞출 필요는 없지만 GlobalKTable은 각 인스턴스마다 토픽을 별도의 쓰레드로 비동기적으로 읽어 로컬 상태 저장소에 업데이트하기 때문에 해당 데이터가 로컬 저장소에 반영되기까지 약간의 latency가 발생하여 100% 완전한 실시간성을 보장하기 어렵습니다.
결국, 동일 키에 대한 중복 필터링을 확실히 하려면, 코파티셔닝이 가장 확실한 접근이라고 생각되며, 필요 시에는 repartition을 통한 파티션 수 맞추기, 동일 파티셔닝 전략 지정 등을 활용할 수 있습니다.