- 카프카(kafka) 구성 요소 특징 정리2025년 07월 27일
- 알쓸개잡
- 작성자
- 2025.07.27.:38
반응형아파치 카프카는 분산형 메시지 플랫폼으로 실시간 데이터 스트리밍 및 처리를 위해서 설계되었다. 카프카는 대규모 데이터의 높은 처리량과 낮은 지연시간(latency)을 요구하는 애플리케이션에서 널리 사용되며 데이터의 생산자와 소비자 간의 효율적인 데이터 전송을 가능하게 하며 점점 그 사용에 대한 필요성이 늘어나고 있는 듯하다.
이번 포스팅에서는 유데미 강의를 통해서 배운 내용을 토대로 카프카를 구성하는 토픽, 파티션, 프로듀서, 컨슈머에 대한 특징 및 동작 방식에 대해서 정리하고자 한다.
기본 요소
토픽 (topic)
카프카 토픽은 카프카에서 데이터를 조작하고 분류하며 관리하기 위한 논리적 채널이다. 프로듀서와 컨슈머 사이에서 데이터를 효율적이고 확장 가능하게 전송할 수 있도록 설계된 핵심 구성요소다.
파티션(partition)과 오프셋(offset)
토픽은 여러 개의 파티션으로 구성되어 있다.
- 각 파티션의 메시지들은 순차적으로 저장된다.
- 카프카 토픽은 immutable 하므로 한번 데이터가 파티션에 저장되면 수정할 수 없다.
- 데이터는 지정된 시간 동안 유지된다. (기본 7일)
- 오프셋은 파티션마다 존재하며 스트림의 현재 위치를 나타낸다. 각 파티션의 offset 값은 1씩 증가한다.
- 키가 제공되지 않으면 메시지는 라운드로빈 혹은 sticky partitioner 방식에 의해서 토픽에 저장된다.
프로듀서 (producer)
- 프로듀서는 토픽에 데이터를 전송(기록)한다.
- 프로듀서는 어느 파티션에 메시지가 기록될지 알고 있다. 기록할 파티션을 미리 결정하는 것은 프로듀서다.
- 프로듀서는 메시지와 함께 키를 전송할 수 있다. 키가 지정되지 않으면 데이터는 각 파티션의 라운드로빈 방식으로 전송된다.
- 키가 제공되면 메시지는 항상 동일 파티션에 전송된다. (해싱)
- 특정 ID를 갖는 데이터를 추적해야 할 때 메시지 순서를 보장하기 위해서는 ID를 키로 지정하여 동일 파티션에 저장하도록 한다.
Kafka Message Serializer (메시지 직렬화)
- 카프카는 오직 바이트 스트림을 프로듀서로부터 수신하고 바이트 스트림을 컨슈머에게 전송한다.
- 메시지 직렬화는 객체/데이터를 바이트로 변환하는 것을 의미한다.
- Common 직렬화기
- String (include JSON)
- Int, Float
- Avro
- Protobuf
Kafka Message Key Hashing (카프카 메시지 키 해싱)
- 키 해싱은 키를 어느 파티션에 매핑할지 결정하는 과정이다.
- 디폴트 카프카 파티셔너에서 키는 murmur2 algorithm을 사용하여 해싱한다.
- targetPartition = Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)
- murmur2 알고리즘과 모듈러 연산을 통해서 메시지가 저장되는 파티션이 결정된다.
Consumers (컨슈머)
- 컨슈머는 이름으로 정의된 토픽으로부터 메시지를 읽는다. (pull model)
- 브로커가 장애인 경우 컨슈머는 어떻게 복구할지 안다.
- 각각의 파티션에서 offset 값이 작은 것에서 큰 순서로 데이터를 읽는다.
Kafka Message Deserializer (메시지 역직렬화)
- 컨슈머는 바이터 스트림으로 전달된 데이터를 역직렬화해야 한다.
- Common 역직렬화기
- String (include JSON)
- Int, Float
- Avro
- ProtoBuf
- 직렬화/역직렬화 타입은 토픽 라이프사이클 동안에는 변경되면 안 된다. (대신에 새로운 토픽을 만들어라)
Consumer Group (컨슈머 그룹)
- 애플리케이션 내의 모든 컨슈머들은 컨슈머 그룹을 구성하여 데이터를 읽는다.
- 그룹 내의 각각의 컨슈머는 파티션으로부터 데이터를 읽는다.
- 동일 토픽에 대해서 여러 개의 컨슈머 그룹을 가질 수 있다.
- group.id라는 컨슈머 속성을 사용하여 컨슈머 그룹을 구분 짓는다.
Kafka Brokers
- 카프카 클러스터는 다수의 브로커로 이루어져 있다.
- 브로커는 카프카 서버를 의미한다.
- 각 브로커는 ID(정수값)을 가지고 있다.
- 각 브로커는 토픽과 파티션을 포함하고 있다.
- 클러스터에 연결하기 위해서 하나의 브로커에 대한 연결정보만 알면 된다. (브로커 간의 연결)
- 3개의 브로커로 시작하는 것이 좋지만 큰 시스템의 경우 100개 이상의 브로커로 구성된 클러스터도 있다.
Consumer Offsets
- 카프카는 컨슈머 그룹이 읽은 오프셋들을 저장한다.
- 카프카 토픽 내에서 commit 된 오프셋들은 consumer_offsets에 저장 (__consumer_offsets는 컨슈머 그룹의 오프셋을 저장하는 내부 토픽이다.)
- 컨슈머가 어떤 토픽의 어떤 파티션까지 메시지를 읽었는지 기록한다.
- 그룹 내의 컨슈머가 카프카로부터 데이터를 수신했을 때 주기적으로 offset들을 커밋한다.
- 만약 컨슈머가 죽으면 커밋된 컨슈머 오프셋부터 다시 데이터를 가져온다.
출처) https://learn.conduktor.io/kafka/kafka-consumer-groups-and-consumer-offsets/ Consumer Offset 커밋 방식
컨슈머가 어느 위치까지 처리를 완료했는지 기록하는 메커니즘.
delivery semantics for consumers를 구현하는 도구로써 역할을 하는 개념으로 보면 된다.
- 자동 커밋 (default)
- 카프카 컨슈머가 주기적으로 poll() 시점에서 자동으로 오프셋을 커밋
- enable.auto.commit=true (기본값)
- auto.commit.interval.ms (기본 5000ms) 간격으로 자동 커밋
- 메시지 처리가 끝나기 전에 커밋될 수 있어 데이터 유실 가능성 있음.
- 동기 커밋
- consumer.commitSync() 사용
- 메시지 처리 후 명시적으로 오프셋 커밋
- 커밋이 성공할 때까지 블로킹 -> 안정성 높음, 성능 저하 가능
- 재시도 로직 구현 가능
- 메시지 유실 위험이 낮음
- 네트워크 지연 속도 저하 우려가 있음
- 매 메시지마다 커밋 시 성능 저하 심함 -> 보통 배치 단위로 커밋
- 비동기 커밋
- consumer.commitAsync() 사용
- 비동기로 커밋 -> 성능 향상 가능
- 커밋 실패 시 재시도 어려움 (콜백 구현 가능)
- 순서 보장이 안됨 -> 최종 커밋 값이 낮아질 수 있음
- 비동기 -> 지연감소, 성능 향상
- 콜백을 통해서 실패 로깅 가능
- 순서 보장이 안되어 커밋 값 역전의 위험이 있음
- 실패 시 안전한 재시도 구현 어려움
Delivery semantics for consumers
producer -> broker -> consumer 전달되는 과정에서 메시지가 몇 번 처리될 수 있는가를 나타내는 개념이다.
- At least once (최소 한 번)
- 메시지가 최소 한 번은 전달된다.
- 메시지 중복이 되면 안 되는 경우 컨슈머 메시지 처리 로직에서 중복 처리를 방지하도록 해야 한다.
- 메시지를 처리한 후 오프셋 커밋 -> 장애 시 메시지가 중복 처리될 수 있다.
- 데이터 유실은 없음 -> 중복 가능
- enable.auto.commit=false
- ex)
- 5개의 배치 메시지를 가져온 뒤에 3번째 메시지까지 처리한 후 consumer 재연결이 발생하면 다시 1번째 메시지부터 가져온다. 1, 2, 3 메시지 중복 발생.
- Exactly once (정확히 1번)
- 메시지가 정확히 한 번만 처리된다.
- 메시지 손실과 중복 처리가 방지된다.
- kafka의 트랜잭션 기능을 통해서 구현된다.
- 메시지의 생산, 브로커 저장, 소비자의 커밋이 트랜잭션 단위로 처리된다.
- idempotent 프로듀서 (멱등 생산자)
- 동일한 메시지를 중복으로 전송해도 카프카가 자동으로 중복 메시지를 제거한다.
- enable.idempotence=true
- Transactional consumer
- consumer가 정확히 한번 처리한 메시지만 커밋된다.
- isolation.level=read_committed
- 오프셋 커밋과 메시지 처리를 원자적(atomic)으로 수행
- 구현 방식
- 프로듀서 enable.idempotence=true
- kafka streams or transaction API 사용
- 오프셋 커밋을 트랜잭션에 포함 -> 메시지 처리와 오프셋 저장을 하나의 원자적 연산으로 보장
- At Most Once (최대 1번)
- 메시지가 최대 한 번 전달된다.
- 오프셋을 메시지 처리 전에 커밋 -> 메시지 처리 도중 장애 발생 시 유실
- 데이터 유실 가능성은 있지만 중복 처리는 없음
- ex)
- 5개의 배치 메시지를 가져온 뒤에 오프셋을 커밋한다.
- 3번째 메시지까지 처리한 후 consumer 재연결이 발생하면 6번째 메시지부터 가져온다.
- 4,5번 메시지는 손실된다.
Brokers and Topics
- 토픽 A 3개의 파티션, 토픽 B 2개의 파티션이 있다고 가정하자.
- 토픽과 파티션은 전체 브로커에 대해서 분산된다.
출처) https://learn.conduktor.io/kafka/kafka-brokers/ - 모든 브로커는 bootstrap server라고 부른다.
- 각 브로커는 모든 broker, topics, partitions (metadata) 정보를 알고 있다.
출처) https://learn.conduktor.io/kafka/kafka-brokers/ Replication factor
- 토픽의 각 파티션을 몇 개의 브로커에 복제(replication)할지를 나타내는 값이다.
- 데이터의 고가용성과 내결함성을 보장하는 메커니즘이다.
- replication factor는 브로커의 개수를 넘을 수 없다.
- 브로커가 다운되었을 때 다른 브로커가 서비스 할 수 있도록 한다. (고가용성)
- 토픽 A가 2개의 파티션을 가지고 있고 replication factor 값이 2라고 했을 때 복제는 다음과 같다.
출처) https://learn.conduktor.io/kafka/kafka-topic-replication/ 토픽 내구성
- 토픽에 replication factor가 3으로 지정되어 있고 브로커가 3개인 경우 2개의 브로커가 다운돼도 내구성을 가질 수 있다.
- 즉 replication factor를 N으로 지정한 경우 N-1개의 브로커가 다운되도 복구가 가능하다.
Partition Leader
- 오직 하나의 브로커만 주어진 파티션에 대한 리더가 될 수 있다.
- 프로듀서는 리더 파티션인 브로커에게만 데이터를 전송한다.
- 다른 브로커들은 복제 데이터를 가진다.
- 그러므로 각 파티션은 하나의 리더와 여러 복제가 있게 된다.
출처) https://learn.conduktor.io/kafka/kafka-topic-replication/ ISR (In-Sync Replicas)
ISR은 파티션의 리더 브로커와 최신 상태인 복제본을 의미한다.
리더와 최신 상태가 아닌 복제본은 동기화되지 않은 것이다.프로듀서와 컨슈머 기본동작
- 카프카 프로듀서는 파티션에 대한 리더 브로커에게만 데이터를 전송한다.
- 카프카 컨슈머는 기본적으로 파티션에서 데이터를 리더 브로커에게서 읽는다.
- 만약 리더 브로커가 다운되면 복제본을 가지고 있는 다른 브로커가 파티션 리더가 된다.
출처) https://learn.conduktor.io/kafka/kafka-topic-replication/ 카프카 컨슈머 replica fetch (카프카 2.4+)
- 카프카 2.4부터 컨슈머는 가장 가까운 replica로부터 데이터를 읽을 수 있게 되었다.
- 이를 통해 latency가 개선되고 클라우드를 사용하는 환경이라면 네트워크 비용을 줄일 수 있다.
출처) https://learn.conduktor.io/kafka/kafka-topic-replication/ 프로듀서 acknowledgements
프로듀서는 쓰기 데이터에 대한 응답을 받을지 선택할 수 있다.
- acks=0 : 프로듀서는 acknowledgement에 대한 대기를 하지 않는다. (데이터 손실 가능)
- 데이터 전송이 성공하면 성공으로 간주한다.
- 처리량이 가장 많다.
출처) https://learn.conduktor.io/kafka/kafka-producer-acks-deep-dive/ - acks=1 : 프로듀서는 리더 acknowledgement에 대해서 대기한다. (제한적인 데이터 손실)
- 리플리케이션 파티션에는 데이터가 복사되었는지 보장할 수 없다.
- 카프카 1.0 ~ 2.8까지 이 설정이 기본값이었다.
출처) https://learn.conduktor.io/kafka/kafka-producer-acks-deep-dive/ - acks=all (acks=-1) : 프로듀서는 리더와 replica에 대해서 acknowledgement를 대기한다. (데이터 손실 없음)
- 함께 따라다니는 설정이 있다. min.insync.replicas (기본값 1)
- min.insync.replicas 는 토픽 설정이다.
- min.insync.replicas=1인 경우 리더만 ack를 보낸다. (acks=1과 같은 효과)
- min.insync.replicas=2인 경우 리더와 최소한 1개의 replica가 성공을 하면 응답을 보내야 한다.
- 파티션 리더와 모든 replica의 응답을 기다린다.
- 파티션 리더를 포함하여 replication ISR 모두 데이터를 받았을 때 ack를 보낸다.
- 3개의 브로커로 구성되어 있고 replication factor가 2인 경우 min.insync.replicas는 최소한 2로 지정하는 것이 좋다. 적어도 1개의 replica가 받을 수 있는 걸 보장하니까.
- 위의 케이스에서 min.insync.replicas=3인 경우에는 브로커 하나라도 다운되는 안 되는 상황인데 이는 사용하지 않는 것이 좋다.
- replication factor=N, min.insync.replicas=M 인 경우 N-M 개의 브로커가 다운돼도 안정적으로 운영이 가능하다는 의미다.
출처) https://learn.conduktor.io/kafka/kafka-producer-acks-deep-dive/ - 안정성은 0 < 1 < all, 성능은 0 > 1 > all
Zookeeper
- 브로커를 관리한다. (브로커 리스트 유지)
- 파티션에 대한 리더 선출을 결정하는데 관여한다.
- 변화에 대한 알림을 카프카로 전송한다.
- 토픽 생성, 브로커 다운, 브로커 기동, 토픽 삭제 등
- 카프카 2.X 버전부터 주키퍼 없이 실행할 수 있다.
- 카프카 3.X 버전도 주키퍼 없이 실행할 수 있다. - kafka kraft
- 카프카 4.X는 주키퍼를 완전히 제거할 것이다.
- 주키퍼는 홀수개의 서버로 운영되도록 디자인되었다.
- 주키퍼는 하나의 리더 서버(write)와 나머지 팔로워 서버(read)를 가지고 있다.
출처) https://learn.conduktor.io/kafka/zookeeper-with-kafka/ 주키퍼를 사용해야 하는가?
- 카프카 4.0 버전 이전까지는 주키퍼 없이 프로덕션에서 사용할 수 없다.
- 카프카 클라이언트는 오랜 시간에 걸쳐서 주키퍼 대신에 브로커로 연결하도록 기능들을 이전해 왔다.
- 카프카 0.10 버전부터 컨슈머는 offset을 카프카와 주키퍼에 저장한다. 클라이언트는 주키퍼에 접속하지 않아야 한다.
- 카프카 2.2 버전부터는 kafka-topics.sh CLI 명령은 토픽 관리를 위해서 주키퍼가 아닌 카프카 브로커들을 참조한다. 주키퍼 CLI argument들은 deprecated 되었다.
- 모든 API 명령은 주키퍼에서 카프카를 사용하도록 마이그레이션 되고 있다.
- 카프카 클라이언트로써 절대 주키퍼를 사용하지 않도록 구성하라. 주키퍼는 사라질 테니까.
카프카 KRaft
- 2020년에 아파치 카프카 프로젝트는 주키퍼 제거 작업을 시작했다. KIP-500
- 주키퍼는 클러스터에 100000개 이상의 파티션을 스케일링하는데 문제가 있었다.
- 주키퍼를 제거함으로써 카프카는 수백만 개의 파티션으로 확장할 수 있고 관리하기 쉬워졌다.
카프카 프로듀서
- 카프카 프로듀서의 기본 파티셔너는 스티키 파티셔너다. (partitioner.class=null) - 2.4부터 적용된 파티셔닝 전략
- 스티키 파티셔너는 프로듀서 설정의 batch.size(기본 16KB) 만큼 여러 메시지를 묶어서 전송한다.
- 배치로 전송된 메시지는 동일한 파티션에 기록된다.
- 카프카 멱등 프로듀서 설정을 true로 지정 시 아래 설정이 함께 적용된다. (enable.idempotence=true)
- retries=Integer.MAX_VALUE
- max.in.flight.request.per.connection=1 (kafka == 0.11)
- max.in.flight.request.per.connection=5 (kafka >= 1.0)
- acks=all (-1)
max.in.flight.request.per.connection 설정
- 하나의 연결에서 동시에 처리할 수 있는 batch (record 아님)의 최대 개수를 설정한다.
- 값이 높을수록 높은 처리량을 보여준다.
- 메시지 순서가 뒤바뀔 수 있다.
batch 1, batch 2
batch 1 실패 -> 재시도
batch 2 성공 -> batch 2가 먼저 도착 -> 순서 역전 발생- 디폴트 설정
- kafka 3.0
- acks=all (-1)
- enable.idempotence=true (멱등 프로듀서)
- kafka 2.8
- acks=1
- enable.idempotence=false
- kafka 3.0
가능한 safe producer를 사용할 것을 권장한다.
항상 업그레이드된 kafka client를 사용하기를 권장한다.
안정한 kafka producer 설정
- 카프카 3.0부터는 기본적으로 안전하다. 카프카 3.0 이전 버전의 경우 클라이언트 버전을 업그레이드하거나 아래 설정을 지정한다.
- acks=all (-1)
- ack 응답을 받기 전에 메시지가 모든 파티션에 저장이 되었음을 보장한다.
- min.async.replicas=2 (broker/topic level)
- replication factor가 3인 경우 최소한 하나의 replica 파티션에 데이터가 저장됨을 보장한다.
- enable.idempotence=true
- 데이터 중복 저장을 방지한다.
- retries=MAX_INT
- delivery.timeout.ms 시간이 될 때까지 재시도를 한다.
- delivery.timeout.ms=120000
- 2분 동안 메시지 전송 시도(재시도 포함)를 한다.
- max.inflight.request.per.connection=5
- batch 메시지 간의 순서 보장은 할 수 없다. (순서가 중요하지 않은 경우 사용하면 좋다)
- 메시지 전송 성능 이점이 있다.
- max.inflight.request.per.connection=1, enable.idempotence=true
- 메시지 순서 보장 + 중복 방지 처리
메시지 압축
- 일반적으로 메시지는 텍스트로 전송된다.
- 메시지를 압축해서 전송하는 것은 성능과 공간 측면에서 좋다.
- 메시지 압축은 프로듀서에서 압축하는 방법과 브로커에서 압축하는 방법이 있다.
- 압축 방식은 none(default), gzip, lz4, snappy, zstd (kafka 2.1) 이 지원된다.
- 메시지 압축은 사이즈가 큰 배치 메시지 전송에 효과적이다.
프로듀서에서 메시지 압축
- 프로듀서에서 배치 메시지를 전송할 때 배치 메시지 전체를 압축하여 전송한다.
- 프로듀서에서 메시지를 압축하면 더 작은 크기의 배치 메시지를 전송할 수 있다.
- 그러므로 네트워크 latency가 작아져 더 빠른 전송을 할 수 있다.
- 더 많은 전송을 처리할 수 있고 카프카에서는 더 작은 디스크 공간을 사용한다.
- 프로듀서와 컨슈머에서 압축/해제를 위해 더 많은 cpu 자원을 소모한다.
- linger.ms, batch.size 설정을 통해서 압축률과 처리량을 높일 수 있다.
- linger.ms
- batch가 가득 차지 않았더라도 이 시간(밀리초) 동안 기다리서 더 많은 레코드를 모을지 결정
- 기본값 : 0ms (즉시 전송)
- 값이 클수록 batch 사이즈가 커짐
- linger.ms 가 너무 커지면 지연 시간은 올라가고 실시간성이 낮아짐으로 설정 시 적절한 시간 설정을 해야 한다.
- linger.ms 와 batch.size 설정은 메시지를 모아 하나의 배치를 생성하는데 영향을 미치는 설정이다.
- batch.size는 프로듀서가 메시지를 모아 하나의 배치를 생성할 수 있는 최대 사이즈를 지정한다.
- 크기가 작으면 배치를 자주 생성하여 네트워크 요청이 증가될 수 있다.
- 크기가 크면 더 많은 메시지를 하나의 배치로 묶어 전송하므로 더 효율적이다.
- linger.ms는 배치 크기만큼 데이터를 모으기 위해서 기다리는 시간을 의미한다.
- linger.ms
브로커 / 토픽 레벨 압축
- log.compression.type=producer(default)는 프로듀서에서 압축한 메시지를 브로커에서 수신하면 재압축 없이 그대로 로그를 기록한다.
- log.compression.type=none은 모든 배치들을 브로커에서 압축 해제하며 이는 비효율적이다.
- log.compression.type=lz4 (압축 포맷 지정)
- compression 설정이 프로듀서 설정과 일치하면 데이터는 그대로 저장된다.
- compression 설정이 프로듀서 설정과 다르면 배치들은 브로커에서 압축 해제되고 지정된 압축 알고리즘으로 다시 재압축된다.
- 권장 설정은 프로듀서에서 데이터를 압축하고 브로커의 디폴트 설정 log.compression.type=producer를 유지하는 것이다.
프로듀서 default partitioner when key != null
- 전송 데이터에 키를 지정하면 키해싱을 통해서 동일 키에 대해서는 동일한 파티션에 메시지가 저장된다.
- 키 해싱 알고리즘은 murmur2 알고리즘을 사용하는데 식은 다음과 같다.
- target partition = Math.abs(Utils.murmur2(keyBytes)) % (numPartitions)
- 토픽에 파티션이 추가되면 파티션 수로 모듈러 연산을 수행하므로 동일키라고 하더라도 이전 데이터가 저장되는 파티션 위치와 달라질 수 있다.
- 위와 같은 상황에서는 파티션을 추가하는 것보다 새로운 토픽을 만드는 것이 좋다.
프로듀서 default partitioner when key == null
- 카프카 2.3부터 이전 버전은 라운드 로빈 방식으로 메시지를 파티션에 할당한다.
- 순서대로 파티션에 메시지가 할당되는 방식
- 배치당 파티션이므로 배치당 메시지가 하나다.
- 전송량이 많아지므로 네트워크 부하와 함께 성능적으로 불리하다.
- 카프카 2.4부터는 스티키 파티셔너가 사용된다.
- 스티키 파티셔너는 성능 개선과 특히 key가 null 일 때 높은 처리량을 보여준다.
- 배치가 다 차거나 linger.ms 시간이 되면 여러 메시지를 하나의 배치로 묶어 하나의 파티션에 전송한다.
- 그러므로 기본적으로 스티키 파티셔너를 사용하는 것이 좋고 이를 위해서는 카프카 클라이언트 버전을 2.4 이상 버전으로 사용하는 것이 좋다.
max.block.ms, buffer.memory
- buffer.memory=3355432 (32MB)
- 프로듀서가 브로커에게 메시지를 전송할 때 저장하는 버퍼 크기
- 브로커가 받을 수 없는 상황이 되면 메시지 버퍼에 메시지가 쌓이게 된다.
- 버퍼가 모두 찬 경우 send()를 호출하면 블록이 된다.
- max.block.ms=60000
- send() 호출 시 블록 타임아웃 설정이다.
- 버퍼가 풀 상태일 때 send()를 호출하여 block 상태가 된 경우 지정된 시간(ms) 동안 블록이 해제되지 않으면 예외가 발생한다.
- 예외가 발생했다는 의미는 브로커가 더 이상 메시지를 받을 수 없는 상태라고 본다.
카프카 컨슈머
카프카 컨슈머 그룹과 파티션 rebalancing
- 여러 컨슈머 사이에서 구독하는 파티션이 동적으로 이동하는 것을 rebalnace라고 한다.
- 그룹 내의 컨슈머가 제거되거나 추가되는 경우 파티션 재조정이 일어난다.
- 토픽에 새로운 파티션이 추가되는 경우에도 파티션 재조정이 일어난다.
Eager Rebalance (적극적 rebalancing)
- rebalance가 발생할 때 모든 컨슈머는 중지되고 각 컨슈머의 파티션 할당은 해제된다.
- 컨슈머 그룹은 rejoin 되고 무작위로 새롭게 파티션 할당이 된다.
- 컨슈머가 원래 할당된 파티션으로 할당된다는 보장이 없다.
Cooperative Rebalance (점진적 rebalancing)
- 파티션을 여러 subset으로 나눠서 한 컨슈머에서 다른 컨슈머로 subset을 할당한다.
- 컨슈머 중지 없이 일어난다.
- 컨슈머가 할당된 파티션이 다른 컨슈머로 이동할 수 있다.
컨슈머 관련 설정
partition.assignment.strategy
- 컨슈머 그룹 내의 각 컨슈머가 구독한 토픽의 파티션을 어떤 방식으로 할당할지를 정의한다.
- RangeAssignor (적극적) : 파티션을 범위 기반으로 연속적으로 할당하는 방식
- RoundRobin(적극적) : 파티션을 라운드로빈 방식으로 순차적으로 할당하는 방식
- StickyAssignor(적극적)
- 파티션 할당의 안정성을 중시하는 전략으로 파티션 할당이 가능한 변경되지 않도록 함.
- 컨슈머 그룹 구성이 변경되더라도 기존 할당이 유지되도록 설계됨.
- CooperativeStickyAssignor (점진적)
- StikcyAssignor과 비슷하지만 점진적으로 파티션을 재할당하여 그룹 안정성을 더욱 강화한 전략
- 기존 파티션을 유지하면서 새로운 컨슈머가 추가될 때 필요한 최소한의 파티션만 재할당
컨슈머 오프셋 reset 동작
컨슈머 오프셋은 consumer가 특정 토픽의 파티션에서 어느 위치의 메시지까지 읽었는지를 나타낸다. 카프카는 컨슈머가 토픽 데이터를 읽고 처리하는 동안 오프셋을 관리하며, 이 오프셋은 컨슈머 그룹 단위로 유지된다.
오프셋은 컨슈머가 실패하거나 재시작될 때 데이터를 어디서부터 다시 읽을지 결정하는데 중요한 역할을 한다. 오프셋 reset 방식을 통해서 오프셋을 재설정할 수 있다.
오프셋 reset 재설정 상황
1. 컨슈머 그룹 추기 실행
1. 새로 생성된 컨슈머 그룹이 토픽에 처음 연결될 때 오프셋이 지정되지 않았으므로 카프카는 오프셋 초기값을 결정해야 한다.
2. 저장된 오프셋이 없는 경우
1. 토픽의 오프셋 데이터가 브로커의 보존 정책(retention policy)에 의해서 삭제된 경우
3. 오프셋이 유효하지 않은 경우
1. 토픽의 메시지가 삭제되어 저장된 오프셋이 유효하지 않게 된 경우
- auto.offset.reset=latest
- 로그의 마지막 다음 메시지부터 읽는다.
- auto.offset.reset=earliest
- 로그의 처음부터 메시지를 읽는다. 가장 처음부터 일 수도 있고 이전에 컨슈머가 읽은 offset부터 읽는다.
- 주요 설정
auto.offset.reset=earliest #offset 재설정 정책 enable.auto.commit=true #자동 커밋 활성화 auto.commit.interval=5000 #자동 커밋 주기 (5초) retention.ms=604800000 #메시지 보존 기간 (7일)
컨슈머 liveliness (컨슈머 생존성)
카프카에서 컨슈머 liveliness는 컨슈머 그룹의 각 컨슈머가 정상적으로 작동하고 있는지 브로커가 판단하는 메커니즘을 의미한다.
- 컨슈머가 카프카에서 alive 상태로 간주되기 위해서
- 주기적으로 heartbeat를 전송해야 한다.
- poll 호출로 메시지를 지속적으로 읽기 요청을 해야 한다.
- 설정된 주기동안 heartbeat가 전송되지 않거나 poll을 호출하지 않으면 카프카는 해당 컨슈머를 비활성 상태로 판단하여 파티션 rebalancing 이 일어나게 된다.
동작방식
카프카는 컨슈머 그룹 coodinator를 통해 컨슈머의 생존성을 관리한다.
- heartbeat
- 컨슈머는 주기적으로 coordinator에게 heartbeat를 전송하여 활성 상태임을 알린다.
- heartbeat는 다음 두 가지를 통해서 전송된다.
- poll 메서드 호출 시.
- 별도의 heartbeat 전송 스레드를 통해.
- 세션 타임아웃
- 컨슈머가 일정 시간 동안 heartbeat를 전송하지 않으면 비활성 상태로 간주한다.
- session.timeout.ms
- 컨슈머가 일정 시간 동안 heartbeat를 전송하지 않으면 비활성 상태로 간주한다.
- rebalancing
- 컨슈머가 비활성 상태로 간주되면 카프카는 해당 컨슈머에 할당된 파티션을 다른 활성 컨슈머로 재할당 한다.
컨슈머 주요 설정
- heartbeat.interval.ms=3000 (default) : 컨슈머가 heartbeat을 전송하는 주기
- heartbeat.interval.ms는 항상 session.timeout.ms 보다 작아야 한다.
- 보통 session.timeout.ms 설정 시간의 1/3 정도로 설정한다.
- session.timeout.ms=10000 : 지정된 시간 동안 heartbeat가 전송되지 않으면 비활성으로 간주(10초)
- 카프카 3.0 이상 버전부터는 디폴트 값은 45초. 그 이전은 10초.
- max.poll.interval.ms=300000 : 지정된 시간 동안 컨슈머가 메시지를 polling 하지 않으면 비활성으로 간주
- max.poll.records=500 : 카프카가 한 번의 poll()로 가져올 수 있는 최대 메시지 수
- fetch.min.bytes=1 : 컨슈머가 데이터를 요청할 때 브로커는 fetch.min.bytes 에 설정된 크기 이상의 데이터가 준비될 때까지 대기한다.
- fetch.min.wait.ms=500 : fetch.min.bytes 만큼의 데이터가 준비되지 않았더라도 이 설정 시간이 지나면 메시지를 전송한다.
- max.partition.fetch.bytes=1048576 (1MB) : 컨슈머가 하나의 파티션에서 한 번에 가져올 수 있는 최대 바이트 크기를 정의하는 설정이다. 컨슈머가 가져올 데이터의 크기를 제한하여 메모리 사용량을 제어하고 대용량 데이터를 처리하는 애플리케이션의 안정성을 높이는데 유용하다.
- 한 번에 가져올 수 있는 메시지 개수가 max.poll.records 설정값 보다 많은 경우 max.poll.records 설정 값만큼 가져온다.
- fetch.max.bytes=50MB : 컨슈머가 한 번의 poll 호출로 가져올 수 있는 데이터의 전체 크기를 정의하는 설정이다.
- 컨슈머가 처리할 수 있는 데이터의 크기를 초과하지 않도록 보장한다.
- fetch.max.bytes >= max.partition.fetch.bytes * <파티션 수>의 관계를 유지해야 한다.
- 추천 설정
# 실시간 처리 max.partition.fetch.bytes=512KB max.poll.records=100 # 배치 처리 max.partition.fetch.bytes=4BM max.poll.records=1000
- 카프카 라이브러리를 사용하는 경우 heartbeat는 내부적으로 전송처리를 한다.
- 컨슈머가 주기적으로 폴링을 하는 것은 애플리케이션 코드 내에서 구현을 해야 한다.
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record: records) { log.info("Key: " + record.key() + ", Value: " + record.value()); log.info("Partition: " + record.partition() + ", Offset: " + record.offset()); } }
컨슈머 파티션 리더 기본 동작
- 기본적으로 컨슈머는 파티션 리더로부터 데이터를 읽는다.
- 파티션 replica로부터 데이터를 읽을 수도 있다. (2.4+) - 컨슈머 렉 인식
컨슈머 렉 인식
컨슈머가 데이터를 읽을 때 데이터의 물리적 위치(렉 정보)를 고려하여 가장 가까운 레플리카에서 데이터를 읽도록 구성하는 기능이다. 네트워크 효율성을 높이고 데이터의 읽기 시의 latency를 줄이며 클러스터의 안정성을 향상할 수 있다.
- 카프카 클러스터는 브로커가 물리적으로 다른 렉이나 데이터 센터에 배포될 수 있도록 구성할 수 있다.
- Rack Awareness는 데이터의 복제와 소비 시 렉 또는 지역 간 네트워크 비용을 최소하 하기 위한 정책이다.
- 프로듀서 : 데이터 복제를 다양한 렉에 분산하여 고가용성을 보장
- 컨슈머 : 특정 렉에서 가장 가까운 복제본에서 데이터를 읽도록 설정
- Rack Awareness를 활성화하면 Consumer는 동일한 렉(데이터 센터)에 있는 복제본에서 데이터를 읽는다.
- 컨슈머는 아래 우선순위로 데이터를 읽는다.
- 동일 렉의 리더 복제본
- 동일 렉의 팔로워 복제본
- 다른 렉의 리더 복제본
- 설정
- 컨슈머의 랙 정보와 브로커의 랙 정보가 구성되어 있어야 한다.
- 브로커와 컨슈머가 동일한 랙에 있는 경우 네트워크 오버헤드 없이 데이터를 소비할 수 있다.
AWS AZ(Available Zone) ID를 예로 들면 (usw2-az1) 각 브로커에 랙 정보를 설정한다. (2.4+)
rack.id=usw2-az1 replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicataSelector
컨슈머가 어느 렉에 있는지 카프카에게 알리기 위해 렉 정보를 설정한다.
client.rack=usw2-az1
Static Group Membership
- 기존 카프카의 기본 그룹 멤버십 관리 방식은 dynamic group membership.
- dynamic group membership은 컨슈머가 재시작하거나 세션이 만료되면 컨슈머 그룹에서 제거되고, 그룹 재조정이 발생한다. 이로 인해 파티션 할당이 변경되고 처리 지연이나 중복 작업의 문제가 발생할 수 있다.
- static group membership은 이를 개선하여 컨슈머가 동일한 멤버 ID를 유지하도록 함으로써 불필요한 그룹 재조정을 방지한다.
- 관련설정
- group.instance.id
- 컨슈머의 고유 ID를 정의하며, 컨슈머가 동일한 ID로 그룹에 가입할 수 있도록 보장한다.
- 컨슈머가 재시작되거나 세션이 만료되더라도, 기존 group.instance.id를 기반으로 동일한 멤버 ID로 재가입한다.
- 각각의 컨슈머에 서로 다른 ID를 지정한다.
- 재연결시 동일한 group.instance.id로 재가입하면 kafka는 해당 컨슈머가 기존 멤버임을 인식하고 파티션 할당을 유지한다.
동일 topic, groupid를 가진 컨슈머가 동작하는 애플리케이션 인스턴스가 여러 개 있을 때 자동으로 각 애플리케이션의 컨슈머와 파티션은 rebalancing 된다.
예를 들어 파티션 3개가 있는 토픽을 구독하고 있는 컨슈머 1개짜리 컨슈머 그룹을 동작시키는 A 애플리케이션 인스턴스가 단독으로 실행되고 있을 때는 해당 컨슈머 그룹이 3개의 파티션을 모두 구독하고 있다.
동일 B 애플리케이션 인스턴스가 새롭게 실행될 때 A, B 인스턴스의 각 컨슈머 그룹이 구독하는 파티션은 재조정된다.
ex)
A 단독 실행
A 인스턴스 컨슈머 그룹 : 파티션 1, 2, 3 모두 할당
B 추가 실행
A 인스턴스 컨슈머 그룹 : 파티션 1, 2 할당
B 인스턴스 컨슈머 그룹 : 파티션 3 할당
C 추가 실행
A 인스턴스 컨슈머 그룹 : 파티션 1 할당
B 인스턴스 컨슈머 그룹 : 파티션 2 할당
C 인스턴스 컨슈머 그룹 : 파티션 3 할당
위와 같이 컨슈머 그룹에 대한 rebalancing 은 동적으로 수행된다.
정리된 내용을 두서없이 정리해 보았다.
끝.
반응형다음글이전글이전 글이 없습니다.댓글