Kafka Streams API를 이용하여 실시간 데이터 파이프라인을 구축하는 중 직면한 문제를 기록했다.
우선 실시간 파이프라인의 대략적인 플로우를 설명하면 A라는 토픽에 데이터가 특정 시간 동안 얼마나 들어오는지 계산하여 B라는 토픽에 데이터를 적재하는 토폴로지를 구성하고 있다. 여기서 핵심은 특정 시간 동안 내가 원하는 데이터가 몇 개가 들어오는지 파악하는 것이다.
현 상황
우선 A라는 토픽에서 데이터를 읽어와 조건에 맞춰 filter() 메서드를 이용해 필터링 처리 한 이후 groupBy() 메서드로 특정 값 기준으로 데이터를 그룹화시킨 다음 count 메서드로 개수를 산정했다.
이렇게 하면 A라는 토픽에 들어오는 데이터 중 filter 조건에 맞는 데이터의 개수를 전부 카운트하게 된다.
이때, 특정 시간 동안이라는 조건이 하나 더 있는데, 이는 window라는 개념으로 해결할 수 있다. window의 종류에는 hopping, tumbling, session, sliding window가 존재하는데 자세한 내용은 confluent의 문서를 참고했다.
나는 여러 window 중 tumbling window를 사용하여 데이터가 들어와 윈도우가 생성될 때, 윈도우가 겹치지 않도록 했다.
windowedBy() 메서드를 사용하여 window의 크기를 지정해 주면 데이터가 들어오는 시점에 내가 지정해 준 크기의 윈도우가 생성이 된다. 생성된 윈도우 크기만큼 데이터를 저장하고 있다가 윈도우가 종료되는 시점에 해당 데이터들을 토픽에 보내도록 했다.
windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
이때 suppress() 메서드를 사용하지 않으면 윈도우가 종료되지 않아도 스트림 처리를 해버려서 토픽에 실시간으로 보내게 되기 때문에 suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))로 윈도우가 종료되면 데이터를 한 번에 flush 하도록 했다.
여기까지 나의 예상 동작은 A토픽에서 데이터를 읽어오면 필터를 통과한 데이터의 시간을 기준으로 지정해준 크기의 window가 생성되어 window가 종료될 때까지 기다리고 있다가 종료되는 시점에 저장된 데이터들을 한 번에 B토픽에 전송하는 동작을 기대했다.
하지만, 실제로는 window가 종료되어도 B토픽에 데이터를 전송하지 않고, 기다리고 있다가 A토픽에서 데이터를 새로 읽어오는 시점 (즉, 새로운 window가 생성되는 시점)에 이전 window에 저장된 데이터가 B토픽에 전송하는 것이었다.
문제 상황
지정해 준 크기의 window가 종료되었을 때 해당 시간 동안 누적된 데이터가 토픽에 flush 하는 것이 아닌 윈도우가 종료되고 새로운 데이터가 들어와 window가 생성되는 시점에 이전 데이터들을 토픽에 전송하여 window가 종료되는 시점에 최종 집계 결과를 바로 확인할 수 없었다.
문제 해결
그럼 지정해 준 크기의 window가 종료되는 시점에 데이터가 지정해준 토픽에 바로 전송하도록 하지는 못하는 것일까??
위 질문에 대한 Best practice는 모르겠지만, 여러 문서를 찾아본 결과 직접적으로 바로 전송하도록 하는 기능이 존재하는 것 같지는 않다.
아래 내용은 suppress는 집계된 결과를 같은 key를 가진 데이터가 올 때까지 flush 하지 않는다는 내용이다.
Based on the above definition from the documentation, we expect every day an aggregated statistics message generated (aligned to the UTC) just after the grace period. But, there is a caveat.
The suppress will not flush the aggregated record till it encounters the same group-by key
하지만 윈도우가 종료된 시점에 바로 데이터를 flush 한 것처럼 보이도록 할 수는 있었다.
window가 종료되었을 때 저장된 데이터를 B 토픽에 flsuh 하기 위해 트리거 데이터(dummy data)를 A토픽에 보내는 것이다. 즉, 특정 주기마다 더미 데이터를 A 토픽에 보내 윈도우가 종료된 데이터가 있으면 B 토픽 전송하는 것이다.
위 방법에 대해 이해하기 쉬운 사진이 있어 가져왔다.
더미 데이터(Heartbeat)가 특정 주기마다 계속 전송이 되어도 윈도우가 닫히지 않은 경우(t0 ~ t5)에는 suppress에 의해 토픽에 전송되지 않는다. t0 ~ t5 window에서 생성된 데이터는 Data Point 4 데이터가 전송되는 시점에 flush 된다. 만일 Heartbeat 데이터가 주기적으로 보내지지 않았다면, t5 ~ t10 window에 존재하는 데이터인 Data Point 5는 이후에 들어오는 데이터가 없기 때문에 토픽에 보내지지 않았을 것이다. 위 사진의 경우 마지막 Heartbeat 데이터가 전송된 시점에 t5 ~ t10 window의 데이터가 flush 된다.
더미 데이터를 A토픽에 계속해서 보내기 때문에 B토픽에 보내기 전에 더미 데이터를 필터링하는 조건을 추가해서 전송해야 한다.
Reference
'Kafka' 카테고리의 다른 글
메시지 큐와 카프카 (0) | 2024.05.15 |
---|