본문 바로가기!

K8S Operator

[K8S Operator] Kafka & Strimzi Operator

728x90
반응형

1. Kafka 란?

  • 대용량의 데이터 스트림을 처리하는 분산형 스트리밍
  • 기능
    • 매우 높은 처리량과 짧은 대기 시간으로 데이터를 공유하는 마이크로서비스 및 기타 애플리케이션
    • 메시지 순서 보장
    • 애플리케이션 상태를 재구성하기 위해 데이터 저장소에서 메시지 되감기/재생
    • 키-값 로그를 사용할 때 오래된 레코드를 제거하기 위한 메시지 압축
    • 클러스터 구성의 수평적 확장성
    • 내결함성을 제어하기 위한 데이터 복제
    • 즉각적인 액세스를 위해 대용량 데이터 보존

 

  • 스트림 데이터
    • Kafka는 스트림 데이터 처리에 강점을 가지고 있음
    • 스트림 데이터는 데이터의 연속적인 흐름을 말하며, 이는 실시간으로 발생하거나 쌓여가는 데이터를 의미
    • 기존 데이터와의 차이는 주로 데이터의 처리 방식에 있으며 스트림 데이터는 순차적으로 처리

 

  • 카프카 주요 요소
    • 주키퍼 ZooKeeper : 카프카의 메타데이터 관리  브로커의 정상 상태 점검 health check 을 담당
      • IMPORTANT : KRaft mode is not ready for production in Apache Kafka or in Strimzi - Link
    • 카프카 Kafka 또는 카프카 클러스터 Kafka cluster : 여러 대의 브로커를 구성한 클러스터를 의미
    • 브로커 broker : 카프카 애플리케이션이 설치된 서버 또는 노드를 말함
    • 프로듀서 producer : 카프카로 메시지를 보내는 역할을 하는 클라이언트를 총칭
    • 컨슈머 consumer : 카프카에서 메시지를 꺼내가는 역할을 하는 클라이언트를 총칭
    • 토픽 topic : 카프카는 메시지 피드들을 토픽으로 구분하고, 각 토픽의 이름은 카프카 내에서 고유함
    • 파티션 partition : 병렬 처리 및 고성능을 얻기 위해 하나의 토픽을 여러 개로 나눈 것을 말함
    • 세그먼트 segment : 프로듀서가 전송한 실제 메시지가 브로커의 로컬 디스크에 저장되는 파일
    • 메시지 message 또는 레코드 record : 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각을 말함

 

  • 카프카는 데이터를 받아서 전달하는 데이터 버스 data bus 의 역할
  • 카프카에 데이터 (메시지) 를 만들어서 주는 쪽은 프로듀서 producer 라 부르고, 데이터를 빼내서 소비하는 쪽은 컨슈머 consumer 라 함
  • 주키퍼는 카프카의 정상 동작을 보장하기 위해 메타데이터 metadata (브로커들의 노드 관리 등) 를 관리하는 코디네이터 coordinator 임
  • 프로듀서와 컨슈머는 클라이언트이며, 애플리케이션은 카프카와 주키퍼임
  • 카프카는 프로듀서와 컨슈머 중앙에 위치하여, 프로듀서로부터 전달 받은 메시지들을 저장하고 컨슈머에 메시지를 전달함
  • 컨슈머는 카프카에 저장된 메시지를 꺼내오는 역할을 함
  • 브로커 broker 는 애플리케이션이 설치된 서버 또는 노드를 의미
  • 리플리케이션 replication : 각 메시지들을 여러 개로 복제해서 카프카 클러스터 내 브로커들에 분산시키는 동작을 의미
    • 하나의 브로커가 종료되더라도 카프카는 안정성을 유지할 수 있음
    • 토픽 생성 명령어 중 replication-factor 3 이면, 원본을 포함한 리플리케이션(토픽의 파티션)이 총 3개를 의미함
      • 원본(토픽의 파티션)을 리더 reader, 리플리케이션을 팔로워 follower
      • 리더는 프로듀서, 컨슈머로부터 오는 모든 읽기/쓰기를 처리, 팔로워는 리더로부터 복제 유지
  • 파티션 partition : 토픽 하나를 여러 개로 나눠 병렬 처리가 가능하게 만들 것을 의미
    • 나뉜 파티션의 수 만큼 컨슈머를 연결 할 수 있어서, 병렬 처리가 가능함
    • 파티션 수는 초기 생성 시 언제든지 늘릴 수 있지만 늘린 파티션은 줄일 수 없음, 컨슈머 LAG 모니터링으로 판단 할 것
    • 컨슈머 LAG (지연) = ‘프로듀서가 보낸 메시지 갯수(카프카에 남아 있는 메시지 갯수)’ - 컨슈머가 가져간 메시지 갯수’
  • 세그먼트 segment : 토픽의 파티션에 저장된 메시지들이 세그먼트라는 로그 파일의 형태로 브로커의 로컬 디스크에 저장됨
  • 오프셋 offset : 파티션에 메시지가 저장되는 위치, 오프셋은 순차적으로 증가(0, 1, 2...)

 

1-1) 메시지 브로커(RabbitMQ) 와 이벤트 브로커(Kafka)차이

  • 메시지 브로커
    • 프로듀서가 전달한 메시지를 큐에 저장하고, 저장된 데이터를 컨슈머가 가져갈 수 있도록 해주는 브로커로 보통 서로 다른 시스템사이에서 데이터를 비동기 형태로 처리하기 위해 사용
    • ex) RabbitMQ, SQS(AWS)
    • 컨슈머가 큐에서 데이터를 가져가면 즉시 큐에서 데이터가 삭제되거나, 짧은 시간내에 큐에서 데이터가 삭제되는 특징이 있음
  • 이벤트 브로커
    • 이벤트 브로커는 기본적으로 메시지 브로커의 큐 기능을 가지고 있음
    • 프로듀서가 전달한 이벤트를 처리 후에 바로 삭제하지 않고 저장하여, 컨슈머가 특정 시점부터 이벤트를 다시 컨슘(consume) 할 수 있음 
      • ex) 장애가 일어난 시점에서 그 이후 이벤트 재처리 가능
    • ex) Kafka, Kinesis(AWS)
    • 대용량 처리에 있어 메시지 브로커보다 많은 양의 데이터 처리 가능
RabbitMQ Kafka
경량 메시지 큐 시스템으로 메시지 전송과 큐 관리에 중점 대규모 데이터 스트리밍 플랫폼으로 이벤트 브로커 역할도 수행
주로 메시지 큐를 통한 통신에 사용 대용량의 데이터 스트림을 처리하고, 분산 시스템에서의 활용이 주목받고 있음

 

 

2. Strimzi Operator란?

https://strimzi.io/docs/operators/latest/overview.html

  • Strimzi Operator는 Kubernetes 클러스터에서 Apache Kafka를 운영, 배포 및 관리하기 위한 오퍼레이터
  • 주요 기능
    • Kafka 클러스터 배포:
      • Strimzi Operator는 Kafka 클러스터를 배포하기 위한 커스텀 리소스 정의(CRD)를 제공하며 이를 통해 클러스터의 크기, 구성 등을 설정가능.
    • 토픽 및 파티션 관리:
      • Kafka 토픽과 파티션의 생성, 확장, 축소, 삭제 등과 같은 작업
    • Broker 및 Zookeeper 구성:
      • Kafka 브로커 및 Zookeeper 구성과 관련된 파라미터를 변경하고 관리
    • 접근 제어 및 보안:
      • TLS 및 인증과 같은 보안 설정을 간편하게 구성
    • 스케일링:
      • Strimzi Operator는 Kafka 클러스터의 스케일링을 지원하며, 브로커 또는 파티션 수를 동적으로 조절
    • Monitoring 및 Logging:
      • Prometheus와 Grafana를 통한 모니터링 설정과 함께 로깅 설정을 쉽게 구성
    • MirrorMaker 및 Connect 설정:
      • MirrorMaker와 Kafka Connect를 설정하고 관리하여 데이터의 복제 및 외부 시스템과의 연결을 지원

 

3. Strimzi Operator 설치 (Helm chart)

# 네임스페이스 생성
kubectl create namespace kafka

# helm repo 추가
helm repo add strimzi https://strimzi.io/charts/
helm show values strimzi/strimzi-kafka-operator

# helm chart install 
helm install kafka-operator strimzi/strimzi-kafka-operator \
--version 0.38.0 \
--namespace kafka

# 배포된 helm chart 리소스 확인
kubectl get deploy,pod -n kafka
kubectl get-all -n kafka

# 오퍼레이터가 지원하는 카프카 버전 확인
kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3
STRIMZI_KAFKA_IMAGES:                               3.5.0=quay.io/strimzi/kafka:0.38.0-kafka-3.5.0
                                                    3.5.1=quay.io/strimzi/kafka:0.38.0-kafka-3.5.1
                                                    3.6.0=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0

# 배포한 리소스 확인 : CRDs - 각각이 제공 기능으로 봐도됨!
kubectl get crd | grep strimzi
kafkabridges.kafka.strimzi.io                2023-11-17T09:24:35Z
kafkaconnectors.kafka.strimzi.io             2023-11-17T09:24:35Z
kafkaconnects.kafka.strimzi.io               2023-11-17T09:24:35Z
kafkamirrormaker2s.kafka.strimzi.io          2023-11-17T09:24:35Z
kafkamirrormakers.kafka.strimzi.io           2023-11-17T09:24:35Z
kafkanodepools.kafka.strimzi.io              2023-11-17T09:24:35Z
kafkarebalances.kafka.strimzi.io             2023-11-17T09:24:35Z
kafkas.kafka.strimzi.io                      2023-11-17T09:24:35Z
kafkatopics.kafka.strimzi.io                 2023-11-17T09:24:35Z
kafkausers.kafka.strimzi.io                  2023-11-17T09:24:35Z
strimzipodsets.core.strimzi.io               2023-11-17T09:24:35Z

# (참고) CRD 상세 정보 확인
kubectl describe crd kafkas.kafka.strimzi.io
kubectl describe crd kafkatopics.kafka.strimzi.io

 

 

3-1) 카프카 클러스터 생성 (Zookeeper) : 기존 Statefulsets 대신 → StrimziPodSets 이 기본 설정

  • StrimziPodSets(sps) 특징 : sts 중간 파드 바로 삭제 불가, 파드 Spec 동일해야됨(볼륨, CPU, Memory), 아키텍처 구성 용이
# 새로운 터미널에 해당 명령어를 통한 모니터링 진행
watch kubectl get kafka,strimzipodsets,pod,svc,endpointslice,pvc -n kafka
kubectl logs deployment/strimzi-cluster-operator -n kafka -f

# 카프카 클러스터 YAML 파일 다운로드
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-1.yaml
cat kafka-1.yaml | yh

# 카프카 클러스터 배포 : 카프카(브로커 3개), 주키퍼(3개), entityOperator 디플로이먼트
## 배포 시 requiredDuringSchedulingIgnoredDuringExecution 지원 >> preferredDuringSchedulingIgnoredDuringExecution 미지원...
kubectl apply -f kafka-1.yaml -n kafka

# 배포된 리소스들 확인
kubectl get-all -n kafka

# 배포된 리소스 확인 : 주키퍼 설치 완료 후 >> 카프카 브로커 설치됨
kubectl get kafka -n kafka
kubectl get cm,secret -n kafka

# 배포된 리소스 확인 : 카프카/주키퍼 strimzipodsets 생성 확인 >> sts 스테이트풀렛 사용 X
kubectl get strimzipodsets -n kafka

# 노드 정보 확인
kubectl describe node | more
kubectl get node --label-columns=topology.ebs.csi.aws.com/zone
kubectl describe pv | grep 'Node Affinity:' -A2

# 배포된 리소스 확인 : 배포된 파드 생성 확인
kubectl get pod -n kafka -l app.kubernetes.io/name=kafka
kubectl get pod -n kafka -l app.kubernetes.io/name=zookeeper
kubectl get pod -n kafka -l app.kubernetes.io/instance=my-cluster

# 배포된 리소스 확인 : 서비스 Service(Headless) 등 생성 확인 - listeners(3개)
kubectl get svc,endpointslice -n kafka

# 배포된 리소스 확인 : 카프카/주키퍼 파드 저장소 확인
kubectl get pvc,pv -n kafka
kubectl df-pv

# 배포된 리소스 확인 : 컨피그맵 확인
kubectl get cm -n kafka

# 컨피그맵 상세 확인
kubectl describe cm -n kafka strimzi-cluster-operator
kubectl describe cm -n kafka my-cluster-zookeeper-config
kubectl describe cm -n kafka my-cluster-entity-topic-operator-config
kubectl describe cm -n kafka my-cluster-entity-user-operator-config
kubectl describe cm -n kafka my-cluster-kafka-0
kubectl describe cm -n kafka my-cluster-kafka-1
kubectl describe cm -n kafka my-cluster-kafka-2

 

 

# kafka 클러스터 Listeners 정보 확인 : 각각 9092 평문, 9093 TLS, 세번째 정보는 External 접속 시 NodePort 정보
kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq

[
  {
    "addresses": [
      {
        "host": "my-cluster-kafka-bootstrap.kafka.svc",
        "port": 9092
      }
    ],
    "bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9092",
    "name": "plain"
  },
  {
    "addresses": [
      {
        "host": "my-cluster-kafka-bootstrap.kafka.svc",
        "port": 9093
      }
    ],
    "bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9093",
    "name": "tls"
  },
  {
    "addresses": [
      {
        "host": "ip-192-168-3-190.ap-northeast-2.compute.internal",
        "port": 32709
      },
      {
        "host": "ip-192-168-2-210.ap-northeast-2.compute.internal",
        "port": 32709
      },
      {
        "host": "ip-192-168-1-27.ap-northeast-2.compute.internal",
        "port": 32709
      }
    ],
    "bootstrapServers": "ip-192-168-1-27.ap-northeast-2.compute.internal:32709,ip-192-168-2-210.ap-northeast-2.compute.internal:32709,ip-192-168-3-190.ap-northeast-2.compute.internal:32709",
    "name": "external"
  }
]

 

 

3-2) 테스트용 파드 생성 후 카프카 클러스터 정보 확인

# 파일 다운로드
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml
cat myclient.yaml | yh

# 데몬셋으로 myclient 파드 배포 : 어떤 네임스페이스에 배포되는가?
VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -
kubectl get pod -l name=kafkaclient -owide

# Kafka client 에서 제공되는 kafka 관련 도구들 확인
kubectl exec -it ds/myclient -- ls /opt/bitnami/kafka/bin

# 카프카 파드의 SVC 도메인이름을 변수에 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
echo "export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092" >> /etc/profile

# 브로커 정보
kubectl exec -it ds/myclient -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS

# 브로커에 설정된 각종 기본값 확인 : --broker --all --describe 로 조회
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 1 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 2 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe

# 토픽 리스트 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list

# 토픽 리스트 확인 (kubectl native) : PARTITIONS, REPLICATION FACTOR
kubectl get kafkatopics -n kafka

 

 

 

728x90
반응형