본문 바로가기
스터디/Redis

[Redis] Redis Streams 공부

by dingwoon 2025. 9. 12.

참고 사이트: https://redis.io/docs/latest/develop/data-types/streams/

1 Redis Streams의 기본 특징

  • append-only log처럼 동작한다.

    메시지는 항상 뒤에 추가되고, 기존 데이터를 수정하지 않는다.
    하지만, 기존 append-only log를 넘어서 더 다양한 기능을 제공한다.

  • Streams도 자료구조이다.

    redis에서 streams도 하나의 자료구조이고, 각 streams는 key로 식별된다.
    → key(topic) 별로 streams 자료구조가 생김.

  • ID 생성 규칙

    Streams에 추가되는 각 메시지를 Entry라고 한다.
    → 이 Entry는 IDField-Value Map으로 구성된다.
    → 각 엔트리(메시지)는 값으로 Map을 가짐.

    이 Entry에는 Entry ID가 있고, 기본적으로 시간 기반으로 생성된다.
    → 항상 증가하게 ID가 할당된다.

    생성 규칙에는 *을 통한 자동 생성 방식이 있고, 수정 지정하는 방식이 있다.
    → 수동으로 추가할 때는 해당 Stream 안에서 이전 ID보다 큰 ID로 지정해야 한다.
    (그러지 않을 경우 에러가 발생한다.)

2 메시지 생산자/소비자 모델

  • 기본 API

    • XADD

      Stream에 새로운 Entry를 추가

      XADD mystream * field1 value1 field2 value2 ...

    • XREAD

      특정 Stream에서 메시지 읽기(소비)
      → 어디부터 읽을지 기준 ID를 줌.
      → 해당 ID보다 큰 Entry를 읽게 됨.

      XREAD STREAMS <key> <ID>
      0: Stream 맨 처음부터 / $: 지금까지 쌓인 마지막 이후부터(새로운 것만)

      XREAD COUNT <개수> <key> <ID>
      COUNT로 몇 개를 읽을 지 지정할 수 있음.

      XREAD ... BLOCK
      → 메시지 큐처럼 사용하려는 경우 새 메시지를 기다리게 함.

    • XRANGE

      주어진 ID 범위 안에 있는 Entry 목록 조회
      (XREVRANGE는 최신부터 해서 거꾸로 읽는다.)

      XRANGE <key> 0-0 +
      0-0: 맨 처음 / +: 맨 끝
      → 전체 조회

      XRANGE <key> - + → 전체 조회
      -: 가장 작은 ID(시작) / +: 가장 큰 ID(끝)

    • XLEN

      Stream 안에 들어있는 Entry 개수 확인
      (삭제된 엔트리는 개수로 세지 않는다.)

      XLEN <key>

    • XDEL

      특정 Entry ID를 지정해서 엔트리를 삭제

      XDEL <key> <ID>

      • 주의점

        Entry를 지워도, ID는 남아있다. (구멍)

    • XTRIM

      오래된 Entry들을 잘라내어 Stream 길이를 제한

      XTRIM <key> MAXLEN 1000
      → 가장 오래된 메시지를 잘라내고, 최신 1000개만 유지.
      → 메시지가 무한히 쌓이는 것을 방지 & 메모리 사용량 제어

  • XREAD vs XREADGROUP 차이

    • XREAD

      단순 Stream에서 엔트리 읽기
      메시지를 가져가도 Redis는 따로 상태를 관리하지 않음.
      → 같은 메시지를 여러 소비자가 중복으로 읽을 수 있음.

    • XREADGROUP

      컨슈머 그룹(Consumer Group) 기반 읽기

      XGROUP CREATE로 그룹을 만들고, XREADGROUP GROUP <group> <consumer> 형태로 사용.
      → 그룹 단위로 메시지를 분배 & 각 메시지가 어느 소비자에게 갔는지 Redis가 관리.
      → 메시지를 처리한 후에 XACK로 확인(Ack)을 보내야 한다.
      (보내지 않으면 Pending 상태로 남음)

  • Blocking read (BLOCK) 동작 방식과 타임아웃 전략

    XREADXREADGROUPBLOCK <ms> 옵션을 주면, 지정한 시간(ms)동안 새 메시지가 올 때까지 대기한다.
    → 시간 안에 메시지가 오면 반환하고, 오지 않으면 빈 응답을 반환.

    • 타임아웃을 없애기
      Blocking 시간을 0ms로 지정하면 타임아웃 없이 기다린다.
      XREAD BLOCK 0 STREAMS <key> $ → 최신 메시지를 기다리는 명령어
  • "at least once" / "at most once" 전달의 의미

    • At Least Once (최소한 한 번)

      메시지가 누락되지 않도록 보장
      → 재전송 될 수 있음 → 중복 소비 가능성 있음
      → 멱등성 필요

      Redis Streams의 XREADGROUP + Pending + XACK 모델은 이 방식이다.
      → 중복은 가능하지만, 유실은 없다.

    • At Most Once (최대 한 번)

      Redis Streams에서 단순 XREAD를 쓰면 이런 특성과 가깝지만, 메시지가 유실될 수 있다.

3 Consumer Group

  • 컨슈머 그룹 특징

    • 각 컨슈머는 그룹 내에서 이름(문자열)로 식별됨. (클라이언트가 지정하고, 유일해야 함)

    • 각 메시지는 각 그룹으로 브로드캐스트 되고, 각 그룹에서 하나의 컨슈머에게 전달됨.

    • 각 그룹은 아직 소비되지 않은 첫 번째 ID를 관리함.
      → 새 메시지 요청 시, 해당 포인터 이후의 메시지만 전달됨.

    • XACK를 호출해서 처리가 정상적으로 완료되었음을 알려야 함.
      (Ack되지 않으면 해당 메시지는 Pending 상태로 남음)

    • 그룹은 잘 전달됐지만, Ack되지 않은 메시지들을 추적(Pending Entries List, PEL)함.

    • 메시지가 각 그룹의 어떤 컨슈머에게 전달됐는지 기록함.
      → 스트림 메시지 조회 시에 각 컨슈머에게 전달된 메시지만 볼 수 있음.

    • 하나의 스트림에 여러 그룹이 가능하고, 각 그룹은 서로 독립적으로 메시지를 소비함. (각 그룹은 각기 다른 처리 완료 포인터를 유지)

    • XREADXREADGROUP은 병행 가능

      두 명령은 같은 스트림에 대해 사용 가능하고, 두 방식은 서로를 간섭하지 않음.

  • API

    • XGROUP

      컨슈머 그룹 생성/삭제/관리

      XGROUP CREATE <stream> <groupname> <id>
      <id>: 이 그룹이 어디서부터 스트림을 읽을지 지정(0: 처음부터, $: 이후 들어오는 것부터)

      XGROUP DESTROY <stream> <groupname>
      → 그룹 자체를 삭제

      XGROUP DELCONSUMER <stream> <groupname> <consumername>
      → 특정 컨슈머를 그룹에서 제거

    • XREADGROUP

      컨슈머 그룹을 통해 스트림에서 읽기

      XREADGROUP GROUP <group> <consumer> [COUNT n] [BLOCK ms] STREAMS <stream> <id>
      <group> 컨슈머 그룹으로 <consumer> 이름으로 스트림을 읽겠다.
      <id>>로 지정하면 해당 그룹의 포인터 이후의 메시지를 가져온다.
      (특정 ID를 지정해서 PEL에 있는 메시지를 읽을 수도 있다.)
      → 이렇게 읽는 순간 그 엔트리는 해당 컨슈머에게 할당되고, 그룹의 PEL에 기록된다.

    • XACK

      해당 메시지를 정상적으로 처리 완료했다고 그룹에 알림.

      XACK <stream> <group> <id> [id ...]
      → Ack된 메시지는 그룹의 PEL에서 제거된다.
      → 다른 컨슈머에게 재배정되지 않음.

    • XACKDEL

      XACKXDEL을 한번에 수행

      XACK → 그룹 PEL에서만 지움 → 스트림 자체에서는 메시지가 남아있음.

      XDEL → 스트림에서 메시지를 지우지만, PEL에서는 참조가 남아있을 수 있음.

      ⇒ 메시지 처리 후에 Ack + 실제 스트림에서 삭제 + 그룹 내부 참조 해제를 한 번에 처리하는 강력한 관리 명령.

4 Pending Entries List (PEL)

Redis Streams에서 Consumer Group안에서 전달은 되었지만, 아직 Ack되지 않은 메시지들을 추적하는 리스트이다.

컨슈머 그룹으로 읽은 모든 메시지(XREADGROUP으로 읽은 모든 메시지)는 이 PEL에 바로 추가되고, 관리된다.

PEL을 통해 메시지 재처리나 DLQ 등을 구현할 수 있다.

  • XPENDING

    PEL에 있는 메시지를 볼 수 있음

    XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

    뒤의 [] 부분을 붙이면, 응답 메시지에서 각 메시지를 컨슈머가 받은 이후에 지난 시간(ms)과 몇 번 전송되었는지에 대한 정보를 추가로 받을 수 있다.

  • XCLAIM

    이 명령어는 매우 복잡한 형식으로 이루어진다. 여기서는 일반적인 형식을 보겠다.

    XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

    <key>에 해당하는 스트림의 <group><min-idle-time> 이상의 메시지를 <consumer>에게 넘긴다. 뒤의 <ID>들은 재할당 하려는 엔트리 ID들이다.
    → idle time이 초기화되고, deliveries counter가 증가된다.

  • XAUTOCLAIM

    XPENDINGXCLAIM을 이용하는 방식을 자동화한 명령어이다. (Redis 6.2에서 추가됨)

    XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
    XCLAIM할 엔트리 ID를 직접 지정하지 않아도 조건에 맞는 메시지를 찾아준다.
    → 커서 기반으로 동작한다.

    • 커서 기반 읽기

      각 PEL은 매우 많이 쌓일 수 있다.
      → 전체를 다 읽는 것은 레디스 특성상 맞지 않음
      → 각 메시지를 읽을 때 다 읽지 않고 지정된 개수만 읽음

      또한, 응답으로 다음 읽기 시 이어갈 커서(start 포인트)를 제공함.

      응답이 0-0일 경우
      → PEL을 끝까지 다 훑었다는 의미.
      (그렇다고 더 이상 Pending할 메시지가 없다는 것을 의미하지는 않음)

  • Redis Streams에서 DLQ

    PEL의 각 메시지는 deliveries counter라는 것이 있어서, XCLAIM(또는 XAUTOCLAIM)이나 XREADGROUP된 횟수가 기록된다.

    이것이 몇 회 이상인 메시지를 골라서 다른 stream으로 보내서 따로 처리하는 방식으로 DLQ를 구현할 수 있다.

  • PEL 크기 관리와 메모리 영향

    처리되지 않은 메시지가 쌓일 수록 PEL의 크기는 점점 커진다.
    → 그렇게 되면 XPENDING이나 XAUTOCLAIM 같은 명령이 더 많은 엔트리를 순회해야 하고, 성능 정하로 이어질 수 있다.
    → 또한 메모리 사용량이 증가할 수 있다.
    → OOM 가능성

    ⇒ 모니터링이나 주기적으로 Auto Claim가 실행되어야 한다. + DLQ로 옮기는 등의 작업을 해야 한다.