Kubernetes

[스터디] DOIK - Kafka & Strimzi Operator 배포 및 테스트

mokpolar 2022. 6. 25. 18:38
반응형

지난 번에 이어 쿠버네티스 데이터베이스 오퍼레이터 스터디에서 진행한 학습의 실습한 내용을 기록한다. 

이번 내용은 Strimzi Operator를 통해 Kafka를 쿠버네티스 상에 배포하고 테스트한 내용이다. 

 

Strimzi 설명

Stirimzi Operator의 기능

Strimzi는 쿠버네티스 환경에서 Kafka를 운영하기 위해서 만들어진 Operator이다. 

Operator로서 동작하기 때문에 Kafka의 많은 동작을 쿠버네티스에서 사용할 수 있도록 해준다. 

 

간략하게 카프카 클러스터/구성요소 배포 및 관리, 카프카 접속 설정, 카프카 업그레이드, 브로커 brokers 관리, 토픽 topic 과 유저 user 생성 및 관리를 한다고 할 수 있다. 

 

상세한 목록은 다음과 같다. 

  • Manages the Kafka Cluster - Deploys and manages all of the components of this complex application, including dependencies like Apache ZooKeeper® that are traditionally hard to administer.
  • Includes Kafka Connect - Allows for configuration of common data sources and sinks to move data into and out of the Kafka cluster.
  • Topic Management - Creates and manages Kafka Topics within the cluster.
  • User Management - Creates and manages Kafka Users within the cluster.
  • Connector Management - Creates and manages Kafka Connect connectors.
  • Includes Kafka Mirror Maker 1 and 2 - Allows for mirroring data between different Apache Kafka® clusters.
  • Includes HTTP Kafka Bridge - Allows clients to send and receive messages through an Apache Kafka® cluster via the HTTP protocol.
  • Includes Cruise Control - Automates the process of balancing partitions across an Apache Kafka® cluster.
  • Prometheus monitoring - Built-in support for monitoring using Prometheus.

 

Strimzi Architecture

Strimzi Operator의 구성요소는 아래와 같으며

  • Cluster Operator : Deploys and manages Apache Kafka clusters, Kafka Connect, Kafka MirrorMaker, Kafka Bridge, Kafka Exporter, Cruise Control, and the Entity Operator
  • Entity Operator : Comprises the Topic Operator and User Operator
  • Topic Operator : Manages Kafka topics
  • User Operator : Manages Kafka users

그 구조는 아래와 같다. 

 

Strimzi Operator 설치 및 테스트 수행

Strimzi Operator 설치

Helm을 이용해서 Strimzi 오퍼레이터를 설치해 준다. 

# 네임스페이스 생성
kubectl create namespace kafka
namespace/kafka created

# strimzi operator Repo 추가
helm repo add strimzi https://strimzi.io/charts/
###"strimzi" has been added to your repositories###

# strimzi operator Helm 정보 확인
helm show values strimzi/strimzi-kafka-operator

# 차트 설치 : 마스터노드에 오퍼레이터 파드 설치
printf 'tolerations: [{key: node-role.kubernetes.io/master, operator: Exists, effect: NoSchedule}]\n' | \
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.29.0 --namespace kafka \
  --set nodeSelector."kubernetes\.io/hostname"=k8s-m --values /dev/stdin
  
### 0.29.0 버전 설치 완료 ###

# 배포한 리소스 확인 : Operator 디플로이먼트(파드)
kubectl get deploy,pod -n kafka

### 아래와 같이 정보 확인 ###
NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/strimzi-cluster-operator   0/1     1            0           39s

NAME                                            READY   STATUS    RESTARTS   AGE
pod/strimzi-cluster-operator-555b78d767-cq4d4   0/1     Running   0          39s

# 오퍼레이터가 지원하는 카프카 버전 확인 : 3.0.0 ~ 3.2.0
kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A4

### 아래와 같이 카프카 버전 확인 가능 ###
      STRIMZI_KAFKA_IMAGES:                               3.0.0=quay.io/strimzi/kafka:0.29.0-kafka-3.0.0
                                                          3.0.1=quay.io/strimzi/kafka:0.29.0-kafka-3.0.1
                                                          3.1.0=quay.io/strimzi/kafka:0.29.0-kafka-3.1.0
                                                          3.1.1=quay.io/strimzi/kafka:0.29.0-kafka-3.1.1
                                                          3.2.0=quay.io/strimzi/kafka:0.29.0-kafka-3.2.0

# 배포한 리소스 확인 : CRDs - 각각이 제공 기능으로 봐도됨!
kubectl get crd

### 아래와 같이 CRD 확인 가능 ###
NAME                                  CREATED AT
kafkabridges.kafka.strimzi.io         2022-06-25T09:36:22Z
kafkaconnectors.kafka.strimzi.io      2022-06-25T09:36:22Z
kafkaconnects.kafka.strimzi.io        2022-06-25T09:36:22Z
kafkamirrormaker2s.kafka.strimzi.io   2022-06-25T09:36:22Z
kafkamirrormakers.kafka.strimzi.io    2022-06-25T09:36:22Z
kafkarebalances.kafka.strimzi.io      2022-06-25T09:36:22Z
kafkas.kafka.strimzi.io               2022-06-25T09:36:21Z
kafkatopics.kafka.strimzi.io          2022-06-25T09:36:22Z
kafkausers.kafka.strimzi.io           2022-06-25T09:36:22Z
strimzipodsets.core.strimzi.io        2022-06-25T09:36:22Z

 

이제 카프카 클러스터를 생성해 주어야 한다. 3.2.0 버전이다. 

 

배포하는 yaml 내용을 살펴보겠다. 

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
      - name: external
        port: 9094
        type: nodeport
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: app.kubernetes.io/name
                      operator: In
                      values:
                        - kafka
                topologyKey: "kubernetes.io/hostname"
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: app.kubernetes.io/name
                      operator: In
                      values:
                        - zookeeper
                topologyKey: "kubernetes.io/hostname"
  entityOperator:
    topicOperator: {}
    userOperator: {}

 

 

저번 처럼 gif로 만들지는 않았지만 잘 배포가 되고 있는 모습을 볼 수 있다. 

 

 

배포내용을 확인해보자. 

kubectl get kafka -n kafka
NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
my-cluster   3                        3                     True

kubectl get sts -n kafka -owide
NAME                   READY   AGE   CONTAINERS   IMAGES
my-cluster-kafka       3/3     10m   kafka        quay.io/strimzi/kafka:0.29.0-kafka-3.2.0
my-cluster-zookeeper   3/3     11m   zookeeper    quay.io/strimzi/kafka:0.29.0-kafka-3.2.0

 

아래처럼 listener정보를 확인할 수 있다. 

각 포트는 9092 평문,  9093 TLS, 30356 NodePort 외부접속용이다.

kubectl get kafka -n kafka my-cluster -o jsonpath={.status} | jq -r ".listeners"
[
  {
    "addresses": [
      {
        "host": "my-cluster-kafka-bootstrap.kafka.svc",
        "port": 9092
      }
    ],
    "bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9092",
    "name": "plain",
    "type": "plain"
  },
  {
    "addresses": [
      {
        "host": "my-cluster-kafka-bootstrap.kafka.svc",
        "port": 9093
      }
    ],
    "bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9093",
    "name": "tls",
    "type": "tls"
  },
  {
    "addresses": [
      {
        "host": "192.168.20.103",
        "port": 31781
      },
      {
        "host": "192.168.10.101",
        "port": 31781
      },
      {
        "host": "192.168.10.102",
        "port": 31781
      }
    ],
    "bootstrapServers": "192.168.20.103:31781,192.168.10.101:31781,192.168.10.102:31781",
    "name": "external",
    "type": "external"
  }
]

 

 

Operator로 설치한 Kafka 테스트

 

이제 테스트용 파드를 생성해보겠다. 

파드를 통해서 kafka 동작을 테스트한다. 

 

아래 manifest로 테스트용 파드를 만든다.

apiVersion: v1
kind: Pod
metadata:
  name: ${PODNAME}
  labels:
    app: myclient
spec:
  nodeName: k8s-m
  containers:
  - name: ${PODNAME}
    image: bitnami/kafka:3.2
    command: ["tail"]
    args: ["-f", "/dev/null"]

 

해당 bitnami/kafka:3.2 이라는 이미지 안에는 Kafka Client 에서 제공하는 kafka 관련 도구들이 들어있다.

kubectl exec -it myclient1 -- ls /opt/bitnami/kafka/bin
connect-distributed.sh	      kafka-consumer-perf-test.sh  kafka-producer-perf-test.sh	       kafka-verifiable-consumer.sh
connect-mirror-maker.sh       kafka-delegation-tokens.sh   kafka-reassign-partitions.sh        kafka-verifiable-producer.sh
connect-standalone.sh	      kafka-delete-records.sh	   kafka-replica-verification.sh       trogdor.sh
kafka-acls.sh		      kafka-dump-log.sh		   kafka-run-class.sh		       windows
kafka-broker-api-versions.sh  kafka-features.sh		   kafka-server-start.sh	       zookeeper-security-migration.sh
kafka-cluster.sh	      kafka-get-offsets.sh	   kafka-server-stop.sh		       zookeeper-server-start.sh
kafka-configs.sh	      kafka-leader-election.sh	   kafka-storage.sh		       zookeeper-server-stop.sh
kafka-console-consumer.sh     kafka-log-dirs.sh		   kafka-streams-application-reset.sh  zookeeper-shell.sh
kafka-console-producer.sh     kafka-metadata-shell.sh	   kafka-topics.sh
kafka-consumer-groups.sh      kafka-mirror-maker.sh	   kafka-transactions.sh

 

 

Kafka의 Service는 my-cluster-kafka-bootstrap.kafka.svc 이렇게 생겼으니 

변수에 지정해주고 브로커의 정보를 확인한다.

SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092

kubectl exec -it myclient1 -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS
my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null) -> (
	Produce(0): 0 to 9 [usable: 9],
	Fetch(1): 0 to 13 [usable: 13],
	ListOffsets(2): 0 to 7 [usable: 7],
...
	DescribeTransactions(65): 0 [usable: 0],
	ListTransactions(66): 0 [usable: 0],
	AllocateProducerIds(67): 0 [usable: 0]
)
my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 1 rack: null) -> (
	Produce(0): 0 to 9 [usable: 9],
	Fetch(1): 0 to 13 [usable: 13],
	ListOffsets(2): 0 to 7 [usable: 7],
...
	DescribeTransactions(65): 0 [usable: 0],
	ListTransactions(66): 0 [usable: 0],
	AllocateProducerIds(67): 0 [usable: 0]
)
my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null) -> (
	Produce(0): 0 to 9 [usable: 9],
	Fetch(1): 0 to 13 [usable: 13],
	ListOffsets(2): 0 to 7 [usable: 7],
...
	ListTransactions(66): 0 [usable: 0],
	AllocateProducerIds(67): 0 [usable: 0]
)

 

이제 아래와 같이 토픽 리스트를 확인할 수 있다.

물론 Operator로 설치되어 Kafka의 동작들을 쿠버네티스로 할 수 있기 때문에 kafkatopic이라는 CR로 토픽을 확인하는 일도 가능해진다. 

kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --list
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic


# Operator가 제공하는 CRD로도 토픽정보를 확인할 수 있다.
kubectl get kafkatopics -n kafka
NAME                                                                                               CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           3                    True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            3                    True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            3                    True

 

이제  (테스트용 파드에 들어있던) kafka-console-producer.sh 와 kafka-console-consumer.sh 라는 도구를 들여다보자. 

###kafka-console-producer.sh###

#!/bin/bash

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

###kafka-console-consumer.sh###

#!/bin/bash

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

 

마찬가지로 KafkaTopic이라는 이미 정의된 CRD를 통해서 kubectl 로 카프카의 토픽을 생성할 수 있다. 

역시 이 부분이 Operator 혹은 CRD로 사용하는 kafka의 장점이라고 할 수 있다. 

# mytopic.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: ${TOPICNAME}
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 1
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
    min.insync.replicas: 2%

KafkaTopic CR을 통해서 카프카 토픽을 생성해보자.

그러면 마찬가지로 토픽이 생성된 것을 kubectl로 확인할 수 있다.  

TOPICNAME=mytopic1 envsubst < mytopic.yaml | kubectl apply -f - -n kafka

kubectl get kafkatopics -n kafka                                                            k8s-m: Sat Jun 25 21:52:38 2022

NAME                                                                                               CLUSTER      PARTITIONS   REPLICATIO
N FACTOR   READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           3
           True
mytopic1                                                                                           my-cluster   1            3
           True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            3
           True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            3
           True

 

결국 아래와 같이 producer 도구를 통해 토픽에 데이터를 넣고 consumer 도구를 통해 그것을 확인할 수 있다. 

kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1
>hello world!!

kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning
hello world!!

 

 

카프카캣 Kafkacat

 

이번에 알게 된 kafkacat은 카프카를 쉽게 테스트하고 디버깅하는 도구이다. 

kafkacat 명령어로 데이터를 넣고 빼는 등의 동작을 할 수 있고 메타데이터를 확인할 수 있다.

 

카프카캣을 설치하자.

apt install kafkacat -y

 

아래처럼 노드포트를 지정하고 kafkacat을 실행해볼 수 있다.

NODEPORT=$(kubectl get svc -n kafka my-cluster-kafka-external-bootstrap -o jsonpath={.spec.ports[0].nodePort})

kafkacat -L -b 127.0.0.1:$NODEPORT

Metadata for all topics (from broker -1: 127.0.0.1:31781/bootstrap):
 3 brokers:
  broker 0 at 192.168.20.103:32520 (controller)
  broker 2 at 192.168.10.101:30512
  broker 1 at 192.168.10.102:30398
 4 topics:
  topic "mytopic1" with 1 partitions:
    partition 0, leader 1, replicas: 1,0,2, isrs: 1,0,2
  topic "__strimzi_store_topic" with 1 partitions:
    partition 0, leader 2, replicas: 2,1,0, isrs: 2,1,0
  topic "__strimzi-topic-operator-kstreams-topic-store-changelog" with 1 partitions:
    partition 0, leader 2, replicas: 2,1,0, isrs: 2,1,0
  topic "__consumer_offsets" with 50 partitions:
    partition 0, leader 1, replicas: 1,2,0, isrs: 1,2,0
    partition 1, leader 0, replicas: 0,1,2, isrs: 0,1,2
    partition 2, leader 2, replicas: 2,0,1, isrs: 2,0,1
    partition 3, leader 1, replicas: 1,0,2, isrs: 1,0,2
...

 

이제 아까 토픽에 넣었던 데이터를 카프카캣을 통해 확인해보자. 

kafkacat -b 127.0.0.1:$NODEPORT -t mytopic1 -C
hello world!!

 

마찬가지로 카프카캣을 통해 데이터를 넣을 수도 있다.

kafkacat -b 127.0.0.1:$NODEPORT -t mytopic1 -P
hi!! mokpolar!!!!


kafkacat -b 127.0.0.1:$NODEPORT -t mytopic1 -C
hello world!!
hi!! mokpolar!!!!!

 

카프카는 메시지 키를 통해서 정해진 파티션에 메시지를 전달하는 것을 보장한다.

Operator를 통해 설치한 Kubernetes 위의 카프카가 마찬가지로 파티션 단위로 잘 동작하는지 확인해보자. 

 

이를 위해 먼저 새롭게 mytopic2를 생성하고, 파티션 수를 2개로 늘리자. 

# kafka-topics.sh
#!/bin/bash

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"



kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2

 

그러면 파티션 수가 늘어난 것을 볼 수 있다.

kafkacat -L -b 127.0.0.1:$NODEPORT -t mytopic2
Metadata for mytopic2 (from broker -1: 127.0.0.1:31781/bootstrap):
 3 brokers:
  broker 0 at 192.168.20.103:32520 (controller)
  broker 2 at 192.168.10.101:30512
  broker 1 at 192.168.10.102:30398
 1 topics:
  topic "mytopic2" with 2 partitions:
    partition 0, leader 0, replicas: 0,1,2, isrs: 0,1,2
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,2,0

이제 각각의 파티션에 메시지 키를 통해 값을 넣고 파티션들이 잘 분산해서 갖고가는지를 확인해보자. 

# 파티션 1에서 값을 갖고 가는지 확인
kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --partition 0 --property print.key=true --property key.separator=":"

# 파티션 2에서 값을 갖고 가는지 확인
kubectl exec -it myclient3 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --partition 1 --property print.key=true --property key.separator=":" 

# 메시지 키를 통해 데이터를 집어넣기
kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 --property "parse.key=true" --property "key.separator=:"

 

마무리

카프카를 쿠버네티스 상에서 Strimzi를 통해 Operator 형태로 설치하고 간단하게 동작 테스트를 해보았다.

현재 카프카를 업무로 사용하고 있지는 않지만 이렇게 Operator 방식으로 사용하면 접근성이 굉장히 낮아지고 관리가 쉬울 것이라는 생각이 들었다.



다만 역시 회사에서, 그리고 프로덕션 레벨에서 사용하기 위한 부하 테스트 등은 추가로 해봐야겠다. 

 

Reference

https://strimzi.io/docs/operators/in-development/overview.html#overview-components_str

https://www.confluent.io/blog/best-kafka-tools-that-boost-developer-productivity/#kafkacat

 

반응형