참고 사이트: https://redis.io/docs/latest/develop/data-types/streams/
1 Redis Streams의 기본 특징
append-only log처럼 동작한다.
메시지는 항상 뒤에 추가되고, 기존 데이터를 수정하지 않는다.
하지만, 기존 append-only log를 넘어서 더 다양한 기능을 제공한다.Streams도 자료구조이다.
redis에서 streams도 하나의 자료구조이고, 각 streams는 key로 식별된다.
→ key(topic) 별로 streams 자료구조가 생김.ID 생성 규칙
Streams에 추가되는 각 메시지를 Entry라고 한다.
→ 이 Entry는 ID와 Field-Value Map으로 구성된다.
→ 각 엔트리(메시지)는 값으로 Map을 가짐.이 Entry에는 Entry ID가 있고, 기본적으로 시간 기반으로 생성된다.
→ 항상 증가하게 ID가 할당된다.생성 규칙에는
*
을 통한 자동 생성 방식이 있고, 수정 지정하는 방식이 있다.
→ 수동으로 추가할 때는 해당 Stream 안에서 이전 ID보다 큰 ID로 지정해야 한다.
(그러지 않을 경우 에러가 발생한다.)
2 메시지 생산자/소비자 모델
기본 API
XADD
Stream에 새로운 Entry를 추가
XADD mystream * field1 value1 field2 value2 ...
XREAD
특정 Stream에서 메시지 읽기(소비)
→ 어디부터 읽을지 기준 ID를 줌.
→ 해당 ID보다 큰 Entry를 읽게 됨.XREAD STREAMS <key> <ID>
→0
: Stream 맨 처음부터 /$
: 지금까지 쌓인 마지막 이후부터(새로운 것만)XREAD COUNT <개수> <key> <ID>
→COUNT
로 몇 개를 읽을 지 지정할 수 있음.XREAD ... BLOCK
→ 메시지 큐처럼 사용하려는 경우 새 메시지를 기다리게 함.XRANGE
주어진 ID 범위 안에 있는 Entry 목록 조회
(XREVRANGE
는 최신부터 해서 거꾸로 읽는다.)XRANGE <key> 0-0 +
→0-0
: 맨 처음 /+
: 맨 끝
→ 전체 조회XRANGE <key> - +
→ 전체 조회
→-
: 가장 작은 ID(시작) /+
: 가장 큰 ID(끝)XLEN
Stream 안에 들어있는 Entry 개수 확인
(삭제된 엔트리는 개수로 세지 않는다.)XLEN <key>
XDEL
특정 Entry ID를 지정해서 엔트리를 삭제
XDEL <key> <ID>
주의점
Entry를 지워도, ID는 남아있다. (구멍)
XTRIM
오래된 Entry들을 잘라내어 Stream 길이를 제한
XTRIM <key> MAXLEN 1000
→ 가장 오래된 메시지를 잘라내고, 최신 1000개만 유지.
→ 메시지가 무한히 쌓이는 것을 방지 & 메모리 사용량 제어
XREAD
vsXREADGROUP
차이XREAD
단순 Stream에서 엔트리 읽기
메시지를 가져가도 Redis는 따로 상태를 관리하지 않음.
→ 같은 메시지를 여러 소비자가 중복으로 읽을 수 있음.XREADGROUP
컨슈머 그룹(Consumer Group) 기반 읽기
XGROUP CREATE
로 그룹을 만들고,XREADGROUP GROUP <group> <consumer>
형태로 사용.
→ 그룹 단위로 메시지를 분배 & 각 메시지가 어느 소비자에게 갔는지 Redis가 관리.
→ 메시지를 처리한 후에XACK
로 확인(Ack)을 보내야 한다.
(보내지 않으면 Pending 상태로 남음)
Blocking read (
BLOCK
) 동작 방식과 타임아웃 전략XREAD
나XREADGROUP
에BLOCK <ms>
옵션을 주면, 지정한 시간(ms)동안 새 메시지가 올 때까지 대기한다.
→ 시간 안에 메시지가 오면 반환하고, 오지 않으면 빈 응답을 반환.- 타임아웃을 없애기
Blocking 시간을 0ms로 지정하면 타임아웃 없이 기다린다.
⇒XREAD BLOCK 0 STREAMS <key> $
→ 최신 메시지를 기다리는 명령어
- 타임아웃을 없애기
"at least once" / "at most once" 전달의 의미
At Least Once (최소한 한 번)
메시지가 누락되지 않도록 보장
→ 재전송 될 수 있음 → 중복 소비 가능성 있음
→ 멱등성 필요Redis Streams의
XREADGROUP + Pending + XACK
모델은 이 방식이다.
→ 중복은 가능하지만, 유실은 없다.At Most Once (최대 한 번)
Redis Streams에서 단순
XREAD
를 쓰면 이런 특성과 가깝지만, 메시지가 유실될 수 있다.
3 Consumer Group
컨슈머 그룹 특징
각 컨슈머는 그룹 내에서 이름(문자열)로 식별됨. (클라이언트가 지정하고, 유일해야 함)
각 메시지는 각 그룹으로 브로드캐스트 되고, 각 그룹에서 하나의 컨슈머에게 전달됨.
각 그룹은 아직 소비되지 않은 첫 번째 ID를 관리함.
→ 새 메시지 요청 시, 해당 포인터 이후의 메시지만 전달됨.XACK
를 호출해서 처리가 정상적으로 완료되었음을 알려야 함.
(Ack되지 않으면 해당 메시지는 Pending 상태로 남음)그룹은 잘 전달됐지만, Ack되지 않은 메시지들을 추적(Pending Entries List, PEL)함.
메시지가 각 그룹의 어떤 컨슈머에게 전달됐는지 기록함.
→ 스트림 메시지 조회 시에 각 컨슈머에게 전달된 메시지만 볼 수 있음.하나의 스트림에 여러 그룹이 가능하고, 각 그룹은 서로 독립적으로 메시지를 소비함. (각 그룹은 각기 다른 처리 완료 포인터를 유지)
XREAD
와XREADGROUP
은 병행 가능두 명령은 같은 스트림에 대해 사용 가능하고, 두 방식은 서로를 간섭하지 않음.
API
XGROUP
컨슈머 그룹 생성/삭제/관리
XGROUP CREATE <stream> <groupname> <id>
→<id>
: 이 그룹이 어디서부터 스트림을 읽을지 지정(0
: 처음부터,$
: 이후 들어오는 것부터)XGROUP DESTROY <stream> <groupname>
→ 그룹 자체를 삭제XGROUP DELCONSUMER <stream> <groupname> <consumername>
→ 특정 컨슈머를 그룹에서 제거XREADGROUP
컨슈머 그룹을 통해 스트림에서 읽기
XREADGROUP GROUP <group> <consumer> [COUNT n] [BLOCK ms] STREAMS <stream> <id>
→<group>
컨슈머 그룹으로<consumer>
이름으로 스트림을 읽겠다.
→<id>
는>
로 지정하면 해당 그룹의 포인터 이후의 메시지를 가져온다.
(특정 ID를 지정해서 PEL에 있는 메시지를 읽을 수도 있다.)
→ 이렇게 읽는 순간 그 엔트리는 해당 컨슈머에게 할당되고, 그룹의 PEL에 기록된다.XACK
해당 메시지를 정상적으로 처리 완료했다고 그룹에 알림.
XACK <stream> <group> <id> [id ...]
→ Ack된 메시지는 그룹의 PEL에서 제거된다.
→ 다른 컨슈머에게 재배정되지 않음.XACKDEL
XACK
와XDEL
을 한번에 수행XACK
→ 그룹 PEL에서만 지움 → 스트림 자체에서는 메시지가 남아있음.XDEL
→ 스트림에서 메시지를 지우지만, PEL에서는 참조가 남아있을 수 있음.⇒ 메시지 처리 후에 Ack + 실제 스트림에서 삭제 + 그룹 내부 참조 해제를 한 번에 처리하는 강력한 관리 명령.
4 Pending Entries List (PEL)
Redis Streams에서 Consumer Group안에서 전달은 되었지만, 아직 Ack되지 않은 메시지들을 추적하는 리스트이다.
컨슈머 그룹으로 읽은 모든 메시지(XREADGROUP
으로 읽은 모든 메시지)는 이 PEL에 바로 추가되고, 관리된다.
PEL을 통해 메시지 재처리나 DLQ 등을 구현할 수 있다.
XPENDING
PEL에 있는 메시지를 볼 수 있음
XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]
뒤의
[]
부분을 붙이면, 응답 메시지에서 각 메시지를 컨슈머가 받은 이후에 지난 시간(ms)과 몇 번 전송되었는지에 대한 정보를 추가로 받을 수 있다.XCLAIM
이 명령어는 매우 복잡한 형식으로 이루어진다. 여기서는 일반적인 형식을 보겠다.
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
<key>
에 해당하는 스트림의<group>
의<min-idle-time>
이상의 메시지를<consumer>
에게 넘긴다. 뒤의<ID>
들은 재할당 하려는 엔트리 ID들이다.
→ idle time이 초기화되고, deliveries counter가 증가된다.XAUTOCLAIM
XPENDING
과XCLAIM
을 이용하는 방식을 자동화한 명령어이다. (Redis 6.2에서 추가됨)XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
→XCLAIM
할 엔트리 ID를 직접 지정하지 않아도 조건에 맞는 메시지를 찾아준다.
→ 커서 기반으로 동작한다.커서 기반 읽기
각 PEL은 매우 많이 쌓일 수 있다.
→ 전체를 다 읽는 것은 레디스 특성상 맞지 않음
→ 각 메시지를 읽을 때 다 읽지 않고 지정된 개수만 읽음또한, 응답으로 다음 읽기 시 이어갈 커서(start 포인트)를 제공함.
응답이
0-0
일 경우
→ PEL을 끝까지 다 훑었다는 의미.
(그렇다고 더 이상 Pending할 메시지가 없다는 것을 의미하지는 않음)
Redis Streams에서 DLQ
PEL의 각 메시지는 deliveries counter라는 것이 있어서,
XCLAIM
(또는XAUTOCLAIM
)이나XREADGROUP
된 횟수가 기록된다.이것이 몇 회 이상인 메시지를 골라서 다른 stream으로 보내서 따로 처리하는 방식으로 DLQ를 구현할 수 있다.
PEL 크기 관리와 메모리 영향
처리되지 않은 메시지가 쌓일 수록 PEL의 크기는 점점 커진다.
→ 그렇게 되면XPENDING
이나XAUTOCLAIM
같은 명령이 더 많은 엔트리를 순회해야 하고, 성능 정하로 이어질 수 있다.
→ 또한 메모리 사용량이 증가할 수 있다.
→ OOM 가능성⇒ 모니터링이나 주기적으로 Auto Claim가 실행되어야 한다. + DLQ로 옮기는 등의 작업을 해야 한다.