UI for Apache Kafka is a free, open-source web UI
to monitor and manage Apache Kafka clusters.

 

Apache Kafka용 UI는 Apache Kafka 클러스터를 모니터링하고 관리하기 위한 무료 오픈 소스 웹 UI입니다.
대시보드를 사용하면 브로커, 주제, 파티션, consumer와 같은 Kafka 클러스터의 메트릭을 추적할 수 있습니다.
Apache Kafka용 UI는 Kafka 데이터를 시각화합니다. 로컬 또는 클라우드에서 실행할 수 있습니다.

 

특징

  • 다중 클러스터 관리 — 모든 클러스터를 한 곳에서 모니터링 및 관리
  • 메트릭 대시보드를 통한 성능 모니터링 — 대시보드로 주요 Kafka 메트릭 추적
  • Kafka Broker 보기 — 주제 및 파티션 할당, 컨트롤러 상태 보기
  • Kafka 주제 보기 — 파티션 수, 복제 상태 및 사용자 정의 구성 보기
  • 소비자 그룹 보기 — 파티션별 고정 오프셋, 결합 및 파티션별 지연 보기
  • 메시지 찾아보기 — JSON, 일반 텍스트 및 Avro 인코딩으로 메시지 찾아보기
  • 동적 주제 구성 — 동적 구성으로 새 주제 생성 및 구성
  • 구성 가능한 인증 — Github/Gitlab/Google OAuth 2.0(옵션)으로 설치 보안

시작하기

Apache Kafka용 UI를 실행하려면 사전 빌드된 Docker 이미지를 사용하거나 로컬에 빌드할 수 있습니다.

 

[ 수행환경 ]

- Microsoft Windows 10 Pro(10.0.19041 N/A 빌드 19041)

- WSL 2

- zookeeper : 3.6.3

- kafka broker : 

 

 

 

Docker Image 실행

Apache Kafka용 UI의 공식 Docker 이미지는 다음 위치에서 호스팅됩니다.hub.docker.com/r/provectuslabs/kafka-ui.
백그라운드에서 Docker 컨테이너를 시작합니다.

docker run -p 8080:8080 \
	-e KAFKA_CLUSTERS_0_NAME=local \
	-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 \
	-d provectuslabs/kafka-ui:latest


http://localhost:8080.에서 웹 UI에 접속합니다.
환경 변수를 사용한 추가 구성은 환경 변수 참조합니다.- see environment variables

 

위와 같이 환경변수를 설정했을 때, 카프카 UI 접속 시 UI 화면은 보이지만, borker, topic 등에 대한 정보가 정상적으로 보이지 않았습니다. 그래서 2가지를 수정했는데, 수정 후 재기동 하니 정상적으로 보였습니다.

 

첫번 째 수정한 부분

카프카 브로커의 server.properties 에서 listeners 내용을 수정했습니다. 

기존 0.0.0.0 에서 WSL 의 IP Address로 변경했습니다. 수정 후 카프카 브로커를 재기동합니다.

# listeners=PLAINTEXT://0.0.0.0:9092

## 아래처럼 WSL IP Address로 수정
listeners=PLAINTEXT://172.24.121.239:9092

 

두번 째 수정한 부분

카프카 UI 의 환경변수 중 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=0.0.0.0:9092 부분을 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=172.24.118.82:9093 로 수정했습니다. 여기서의 IP는 WSL 의 IP 이자 카프카 브로커에서 설정한 listeners IP와 동일한 IP 입니다. 

수정 후 실행된 컨테이너가 있으면 중지시키고 카프카 UI 컨테이너를 실행합니다.

여기에서 8080 port가 사용 중이라고 나올 경우, 카프카 브로커에서 admin port를 사용할 가능성이 크므로 카프카 브로커의 admin port를 변경하던지, UI 컨테이너의 앞 쪽 port를 변경합니다. 필자의 경우에 port 충돌이 발생해서 카프카 브로커의 admin port를 disable 시켰습니다.

docker run -p 8080:8080 \
	-e KAFKA_CLUSTERS_0_NAME=local \
	-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=172.24.118.82:9092 \
	-d provectuslabs/kafka-ui:latest

[ 참고 ]

필자의 경우에는 zookeeper cluster와도 연동되어 있고 podman을 사용해서 아래와 같이 수행했습니다. 주키퍼를 사용하는 경우 참고하면 될 것 같습니다.

podman run -p 8080:8080 --name ui_kafka --rm \
	-e KAFKA_CLUSTERS_0_NAME=local \
	-e KAFKA_CLUSTERS_0_ZOOKEEPER=172.24.118.82:2181,172.24.118.82:2182,172.24.118.82:2183 \
	-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=172.24.118.82:9093,172.24.118.82:9094,172.24.118.82:9095 \
	-d provectuslabs/kafka-ui:latest

 

 

 

위 사항을 수정한 후 UI for Apache Kafka 에 접속한 화면입니다. clusters, Brokers, Topics 등에 대한 정보가 잘 표시됩니다.

Topic을 하나 생성한 후 카프카 브로커에서 조회해 봅니다. topic 이름은 ui_topic 이름으로 설정하고 partitions은 3, Replication Factor도 3으로 설정한 후 생성했습니다.

카프카 브로커 명령어로 topic을 조회해 봅니다. 3대 모두 정상적으로 조회가 되고 있습니다.

$ bin/kafka-topics.sh --list --bootstrap-server 172.24.118.82:9092
ui_topic
$ bin/kafka-topics.sh --list --bootstrap-server 172.24.118.82:9093
ui_topic
$ bin/kafka-topics.sh --list --bootstrap-server 172.24.118.82:9094
ui_topic

카프카 브로커 명령어로 topic을 생성한 후 UI 화면으로 조회해 봅니다.

$ bin/kafka-topics.sh create --topic my-kafka-topic --bootstrap-server 172.24.118.82:9092 --part
itions 3 --replication-factor 3

Created topic my-kafka-topic.

아래 화면처럼 정상적으로 조회됩니다.

 

참고 : UI for Apache kafka 환경변수

yml 파일의 각 변수를 환경 변수로 설정할 수 있습니다. 예를 들어 환경 변수를 사용하여 name 매개변수를 설정하려면 KAFKA_CLUSTERS_2_NAME과 같이 작성할 수 있습니다.

SERVER_SERVLET_CONTEXT_PATH URI basePath
LOGGING_LEVEL_ROOT Setting log level (trace, debug, info, warn, error). Default: info
LOGGING_LEVEL_COM_PROVECTUS Setting log level (trace, debug, info, warn, error). Default: debug
SERVER_PORT Port for the embedded server. Default: 8080
KAFKA_ADMIN-CLIENT-TIMEOUT Kafka API timeout in ms. Default: 30000
KAFKA_CLUSTERS_0_NAME Cluster name
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS Address where to connect
KAFKA_CLUSTERS_0_ZOOKEEPER Zookeeper service address
KAFKA_CLUSTERS_0_KSQLDBSERVER KSQL DB server address
KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable
KAFKA_CLUSTERS_0_SCHEMAREGISTRY SchemaRegistry's address
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME SchemaRegistry's basic authentication username
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD SchemaRegistry's basic authentication password
KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE How keys are saved to schemaRegistry
KAFKA_CLUSTERS_0_JMXPORT Open jmxPosrts of a broker
KAFKA_CLUSTERS_0_READONLY Enable read-only mode. Default: false
KAFKA_CLUSTERS_0_DISABLELOGDIRSCOLLECTION Disable collecting segments information. It should be true for confluent cloud. Default: false
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME Given name for the Kafka Connect cluster
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS Address of the Kafka Connect service endpoint
KAFKA_CLUSTERS_0_JMXSSL Enable SSL for JMX? true or false. For advanced setup, see kafka-ui-jmx-secured.yml
KAFKA_CLUSTERS_0_JMXUSERNAME Username for JMX authentication
KAFKA_CLUSTERS_0_JMXPASSWORD Password for JMX authentication

[ 설치환경 ]

- Microsoft Windows 10 Pro(10.0.19041 N/A 빌드 19041)

- WSL 2(Ubuntu 20.04.3 LTS)

- java :  11.0.3 ( 버전확인 : java -version)

- Kafka :  3.0.0 ( 버전확인 : kafka-topics.sh --version )

- Zookeeper : 

 

 

이 문서서는 로컬 머신에 Kafka 클러스터를 설치 및 실행하는 방법을 보여주고 Kafka 아키텍처에 대한 몇 가지 중요한 개념을 설명합니다.

Apache Kafka 는 분산 스트리밍 플랫폼입니다. 분산 메시지 브로커에서 데이터 스트림 처리를 위한 플랫폼으로 사용할 수 있습니다.

카프카 설치

먼저 Kafka 실행 파일을 실행하려면 Java가 설치되어 있어야 합니다. 본 문서에서는 openjdk-11 을 설치했습니다.

sudo apt-get install openjdk-11-jdk

 


공식 다운로드 페이지에서 Kafka의 바이너리를 다운로드하고 원하는 위치에 tar 파일의 압축을 풉니다.

tar -xvzf kafka_2.13-3.0.0.tgz

kafka_2.13-3.0.0이라는 폴더가 생성되며

kafka_2.13-3.0.0 하위 디렉토리에 bin, config, libs, license, site-docs 디렉토리가 생성됩니다.

 

시스템 아키텍처

클러스터를 실행하기 위해 아래와 같은 프로세스들을 시작해야 합니다.

  1. Zookeeper : 클러스터 노드 간의 상태를 유지하기 위해 Kafka에서 사용
  2. Kafka brokers : 데이터를 저장하고 내보내는 파이프라인의 "파이프"
  3. Producers : 클러스터에 데이터를 입력
  4. Consumers : 클러스터로부터 데이터를 조회

이 다이어그램의 각 블록은 네트워크의 다른 시스템에 구성할 수도 있습니다.

 

주키퍼 시작

Zookeeper 는 coordination을 위해 서버 상태를 저장하는 데, 키-값 형태로 저장하는 저장소입니다.

Kafka를 실행하려면 Zookeeper 서버가 필요하므로 먼저  Zookeeper 인스턴스를 시작해야 합니다.

kafka_2.13-3.0.0 안에 몇 가지 유용한 파일이 있습니다.

  • bin/zookeeper-server-start.sh : 서버 실행 쉘
  • config/zookeeper.properties : Zookeeper 서버가 실행할 구성환경 파일

 

아래 명령어로 주키퍼 서버를 실행합니다. kafka_2.13-3.0.0/ 디렉토리에서 실행해야 합니다.

bin/zookeeper-server-start.sh config/zookeeper.properties

 

주키퍼 서버를 백그라운드로 실행하려면 -daemon 옵션을 추가하여 기동합니다.

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

 

설치 중간에 Zookeeper 아스키아트 문자도 보입니다.

foreground로 실행되니 최종 "Creating new log file" 보이면 다음 단계를 진행합니다.

[2022-01-01 00:34:06,106] INFO   ______                  _                                           (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO  |___  /                 | |                                          (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __    (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |     (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO                                               | |                      (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,110] INFO                                               |_|                      (org.apache.zoo
.....   .....
[2022-01-01 00:34:10,369] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
[2022-01-01 00:34:10,373] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
[2022-01-01 01:08:12,522] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2022-01-01 01:08:14,923] WARN fsync-ing the write ahead log in SyncThread:0 took 2389ms which will adversely effect operation latency.File size is 67108880 bytes. See the ZooKeeper troubleshooting guide (org.apache.zookeeper.server.persistence.FileTxnLog)

config/zookeeper.properties 파일을 보면 clientPort 속성이 2181로 설정된 것을 볼 수 있습니다. 이 속성은 Zookeeper 서버가 현재 수신 대기 중인 포트입니다.

# File: kafka_2.13-3.0.0/config/zookeeper.properties

# the port at which the clients will connect
clientPort=2181

 

Kafka Broker 실행하기

Kafka 브로커는 클러스터의 핵심이며 데이터가 저장되고 배포되는 파이프라인 역할을 합니다.
Zookeeper를 시작한 방법과 유사하게,

브로커 서버를 시작하고(bin/kafka-server-start.sh) 구성(config/server.properties)하기 위한 두 개의 파일이 있습니다.
kafka의 분산환경 구성을 해야하기 때문에 다이어그램과 같이 3개의 브로커를 시작하겠습니다.
config/server.properties 파일을 열면 여러 구성 옵션이 표시됩니다(지금은 대부분 무시할 수 있음).
그러나 각 브로커 인스턴스에 대해 고유해야 하는 세 가지 속성이 있습니다.

File: kafka_2.13-3.0.0/config/server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# The address the socket server listens on. It will get the value returned from 
listeners=PLAINTEXT://:9092

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181

* 주키퍼 서버들 멀티로 구성한 경우에는 zookeeper.connect 에 나열해주면 됩니다.

예시) zookeeper.connect=localhost:2181, localhost:2182, localhost:2183

서버가 3개이므로 각 서버에 대해 각각의 구성 파일을 만들겠습니다.

config/server.properties 파일을 복사하고 각 서버 인스턴스에 대해 각각의 property 파일을 만듭니다.

cp config/server.properties config/server.1.properties
cp config/server.properties config/server.2.properties
cp config/server.properties config/server.3.properties

 

파일의 각 복사본에 대해 3가지 속성을 각각 고유하게 변경합니다.

server.1.properties

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1

server.2.properties

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs2

server.3.properties

broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/tmp/kafka-logs3

log 디렉토리를 생성합니다.

mkdir /tmp/kafka-logs1
mkdir /tmp/kafka-logs2
mkdir /tmp/kafka-logs3

이제 broker instances를 실행할 수 있습니다. 

서로 다른 터미널 세션에서 아래 명령을 각각 실행합니다. 본 문서의 모든 명령어는 kafka_2.13-3.0.0/ 디렉토리에서 수행해야 합니다. foreground로 실행되니 프로세스를 중지하면 안됩니다.

bin/kafka-server-start.sh config/server.1.properties
bin/kafka-server-start.sh config/server.2.properties
bin/kafka-server-start.sh config/server.3.properties
 

백그라운드로 실행하려면 -daemon 옵션을 추가합니다.

bin/kafka-server-start.sh -daemon config/server.1.properties
bin/kafka-server-start.sh -daemon config/server.2.properties
bin/kafka-server-start.sh -daemon config/server.3.properties

 

브로커가 성공적으로 시작되면 'started' 메시지가 표시되어야 합니다.

.....
[2022-01-01 01:08:20,944] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,944] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,945] INFO Kafka startTimeMs: 1640966900932 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,948] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
.....

Topics 생성

클러스터에 데이터를 입력하기 전에 데이터가 속할 topic을 생성해야 합니다. 새로운 세션창에서 다음 명령을 실행합니다.

'my-kafka-topic' Topic 이 생성되었다는 메시지가 출력됩니다.

bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 3
Created topic my-kafka-topic

여기에서 몇 가지 옵션을 설명하겠습니다.

  • 파티션을 사용하면 데이터를 분할할 브로커 수를 결정할 수 있습니다. 일반적으로 브로커 수와 동일하게 설정합니다. 3개의 브로커를 설정했으므로 이 옵션을 3으로 설정합니다.
  • replication-factor는 원하는 데이터 복사본의 수를 나타냅니다(브로커 중 하나가 다운되는 경우에도 다른 브로커에 데이터가 남아 있음). 이 값을 3로 설정했으므로 데이터는 브로커에 복사본 두 개를 더 갖습니다. 

그림으로 표시하면 아래와 같습니다. 색깔이 있는 파티션은 leader이고 색이없는 파티션은 follower입니다.

bootstrap-server는 활성 Kafka 브로커 중 하나의 주소를 가리킵니다. 모든 브로커는 Zookeeper를 통해 서로에 대해 알고 있으므로 어느 브로커를 선택하든 상관 없습니다.

 

Kafka 버전 2.x.x 이하를 사용하는 경우 --bootstrap-server localhost:9093 대신
--zookeeper localhost:2181(Zookeeper 인스턴스의 주소)을 사용해야 합니다.

 

Topics 조회

사용 가능한 모든 주제(Topics)를 나열하려면 bin/kafka-topics.sh 을 실행하면 됩니다. 이 경우 이전 섹션에서 만든 하나의 주제를 출력합니다.

bin/kafka-topics.sh --list --bootstrap-server localhost:9093

my-kafka-topic

 

주제에 대한 자세한 내용을 보려면 동일한 명령과 함께 --describe 인수를 사용할 수 있습니다.

bin/kafka-topics.sh --describe --topic my-kafka-topic --bootstrap-server localhost:9093

 

주제의 파티션 및 복제본에 대한 세부 정보를 출력합니다. Partition, Leader/follower, Replicas, Isr(In Sync Replica) 정보를 보여줍니다. 여기서 ISR은 kafka 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 나타냅니다. 만일, 브로커 중 1대의 서버가 중지된 상태라면 Isr 은 2개 만 표시됩니다. 3번 브로커 서버가 중지되었다면 Leader는 1 또는 2가 되고 Isr 은 1,2 가 됩니다.

Topic: my-kafka-topic   TopicId: 2c7cvTC1QGKy2bu18revoA PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: my-kafka-topic   Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: my-kafka-topic   Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: my-kafka-topic   Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

 

Topics 삭제

이전버전 : 

이전 버전에서는 kafka server.properties파일에 설정을 하고 kafka broker를 재기동해야 topic을 삭제할 수 있습니다.

# config/server.properties파일
delete.topic.enable = true

server.properties에 delete.topic.enable=true 설정을 하고 kafka borker를 재기동 한 다음, kafka-topics.sh로 삭제를 해주면 됩니다. 

$  bin/kafka-topics.sh --delete --topic my-kafka-topic --bootstrap-server localhost:9093

최신버전 : (3.0 version)

KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다. 
브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)

elete.topic.enable 기본 설정은 true입니다.

$  bin/kafka-topics.sh --delete --topic my-kafka-topic --bootstrap-server localhost:9093

 

동적 파라미터 설정

 

topic config 확인 :

bin/kafka-configs.sh --bootstrap-server localhost:9093  --entity-type topics --entity-name movie --describe


broker config 확인 :

bin/kafka-configs.sh --bootstrap-server localhost:9093  --entity-type brokers --entity-name 1 --describe

 

동적 파라미터 변경 및 topic 삭제 :

  1. bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-default --alter --add-config delete.topic.enable=true
  2. bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-kafka-topic
  3. bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-default --alter --delete-config delete.topic.enable

==> 오류(apache kafka 에는 동적으로 변경된다고 설명되어 있는데 오류 발생

1.

Caused by: org.apache.kafka.common.errors.InvalidRequestException: Invalid config value for resource ConfigResource(type=BROKER, name=''): Cannot update these configs dynamically: Set(delete.topic.enable)

2.

Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)

3.

Invalid config(s): delete.topic.enable

 

아직 동적 config 설정이 정상작동 하지 않는 것 같다. 패치버전이나 더 상위 버전이 나오는 다시 확인해 본다.

 

Zookeeper 쉘을 이용한 방법

* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.

zookeeper-shell.sh 를 이용하여서 znode를 삭제할 수 있습니다.

rmr명령은 더이상 사용되진 않는다. delete나 deleteall로 삭제합니다. delete 를 사용할 경우 message가 남아있는 경우 삭제가 되지 않습니다. 이럴 경우, deleteall로 삭제할 수 있습니다.

$ bin/zookeeper-shell.sh localhost:2181
ls /brokers/topics 
## 실행결과
[__consumer_offsets, movie, music, my-kafka-topic]

rmr /brokers/topics/my-kafka-topic
## 실행결과. rmr은 사용할 수 없다.
Command not found: Command not found rmr 

## The zkCli provides the rmr (deprecated) or deleteall command for this purpose


delete /brokers/topics/my-kafka-topic
## 실행결과
Node not empty: /brokers/topics/my-kafka-topic

deleteall /brokers/topics/my-kafka-topic

ls /brokers/topics 
[__consumer_offsets, movie, music]

quit

## 삭제된 topic을 다시 생성해본다.

$ bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2
## 오류가 발생한다. topic을 kafka-topics.sh 로 조회하면 목록이 보인다.
## zookeeper와 kafka broker 간이 연동이 잘 안되는 것 같다.
Error while executing topic command : Topic 'my-kafka-topic' already exists

* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.

 

Producers: Topic에 메시지 보내기

"생산자(Producer)"는 데이터를 Kafka 클러스터에 넣는 프로세스입니다.

bin 디렉토리의 명령어는 콘솔에 텍스트를 입력할 때마다 클러스터에 데이터를 입력하는 콘솔 생성자(Producer)를 제공합니다.

콘솔 생산자(Producer)를 시작하려면 다음 명령을 실행합니다.

bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-kafka-topic
  • Broker-list는 생산자가 방금 프로비저닝한 브로커의 주소를 가리킵니다.
  • topic은 데이터가 입력하려는 주제(topic)를 지정합니다.

이제 데이터를 입력하고 Enter 키를 치면, Kafka 클러스터에 텍스트를 입력할 수 있는 명령 프롬프트가 표시됩니다.

  • 컨슈머가 데이터를 가져가도 Topic 데이터는 삭제되지 않습니다.
>first insert
>두번째 입력

프로듀서가 데이터를 보낼 때 '파티션키'를 지정해서 보낼 수 있습니다.

파티션키를 지정하지 않으면, 라운드로빈 방식으로 파티션에 저장하고,

파티션키를 지정하면, 파티셔너가 파티션키의 HASH 값을 구해서 특정 파티션에 할당합니다.

 

카프카에서 kafka-console-producer.sh로 Consumer에게 메세지를 보낼 때 기본적으로 key값이 null로 설정되어 있습니다. 이럴 때 설정에서 parse.key 및 key.separator 속성을 설정하면 key와 value가 포함된 메시지를 보낼 수 있습니다.

$ /bin/kafka-console-producer.sh \
  --broker-list localhost:9093 \
  --topic my-kafka-topic \
  --property "parse.key=true" \
  --property "key.separator=:"
  • parse.key : key와 value 파싱 활성화 여부 
  • key.separator : key와 value를 구분해주는 구분자
  • print.key : 화면에 key 를 보여줄 지 여부를 지정
  • print.value : 화면에 value 를 보여줄 지 여부를 지정

파일로 메시지 보내기

kafka-console-producer --broker-list localhost:9092 --topic test_topic < file.log

 

Consumers

Kafka consumers 를 사용하여 클러스터에서 데이터를 읽을 수 있습니다. 이 경우 이전 섹션에서 생성한 데이터를 읽습니다.
consumer를  시작하려면 다음 명령을 실행합니다. 아래와 같이 위에서 pruducer가 입력한 데이터가 출력됩니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning

first insert
두번째 입력
  • bootstrap-server는 클러스터의 브로커 중 하나일 수 있습니다.
  • Topic은 생산자가 클러스터에 데이터를 입력한 Topic(주제)과 동일해야 합니다.
  • from-beginning은 클러스터에 현재 가지고 있는 모든 메시지를 원한다고 클러스터에 알립니다.
  • 컨슈머 그룹이 다른 새로운 컨슈머가 auto.offset.reset=earliest 설정으로 데이터를 0번부터 가져갈 수 있습니다. 설정하지 않으면 새롭게 토픽에 생성된 메세지만 읽어옵니다.

위의 명령을 실행하면 콘솔에 로그온한 생산자가 입력한 모든 메시지가 즉시 표시됩니다.
소비자가 실행되는 동안 생산자가 메시지를 입력하면 실시간으로 콘솔에 출력되는 것을 볼 수 있습니다.

 

이런 식으로 Kafka는 지속적인 메시지 대기열처럼 작동하여 소비자가 아직 읽지 않은 메시지를 저장하고 소비자가 실행되는 동안 새 메시지가 오는 대로 전달합니다.

 

카프카에서는 consumer 그룹이라는 개념이 있는데 --consumer-property group.id=group-01 형식으로 consumer 그룹을 지정할 수 있습니다. 카프카 브로커는 컨슈머 그룹 중 하나의 컨슈머에게만 이벤트를 전달합니다. 동일한 이벤트 처리를 하는 컨슈머를 clustering 한 경우에 컨슈머 그룹으로 지정하면 클러스터링된 컨슈머 중 하나의 서버가 데이터를 수신합니다.

 

key와 value를 콘솔창에 표시하기 위해서는 --property print.key=true --property key.separator=: 를 설정합니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning \
   --property print.key=true --property key.separator=:

"key":"second key value"
"secone message":null
key3:third

 

Consumer Group

기존의 Message Queue 솔루션에서는 컨슈머가 메시지를 가져가면, 해당 메세지를 큐에서 삭제된다. 즉, 하나의 큐에 대하여 여러 컨슈머가 붙어서 같은 메세지를 컨슈밍할 수 없다. 하지만 Kafka는, 컨슈머가 메세지를 가져가도 큐에서 즉시 삭제되지 않으며, 하나의 토픽에 여러 컨슈머 그룹이 붙어 메세지를 가져갈 수 있다.

또한 각 consumer group마다 해당 topic의 partition에 대한 별도의 offset을 관리하고, group에 컨슈머가 추가/제거 될 때마다 rebalancing을 하여 group 내의 consumer에 partition을 할당하게 된다. 이는 컨슈머의 확장/축소를 용이하게 하고, 하나의 MQ(Message Queue)를 컨슈머 별로 다른 용도로 사용할 수 있는 확장성을 제공한다.

 

아래 명령어로 컨슈머 그룹을 지정합니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --group [그룹 이름]
  • 그룹에 속하는 컨슈머가 여러개이면 로드밸런싱을 통해 자동으로 메세지를 분배

2. 그룹 정보 확인

kafka-console-groups.sh --bootstrap-server localhost:9092 --describe --group [그룹 이름]
  • 위에 대한 명령어로 아래의 정보들을 얻을 수 있다.
    -
    • CURRENT-OFFSET : Consumer Group이 Kafka에서 읽은 offset
    • LOG-END-OFFSET : 해당 topic, partition의 마지막 offset
    • LAG : LOG-END-OFFSET 과 CURRENT-OFFSET의 차이
  • LAG의 경우 topic의 partition단위로 읽어야 할 남은 데이터 수를 의미한다

 

Offsets

오프셋 리셋

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [그룹 이름] --reset-offsets --to-earlist --execute --topic [토픽 이름]
  • 오프셋을 리셋하여 데이터를 다시 읽어드릴 때 사용
  • reset-offsets의 옵션으로 --to-earlist , --to-datetime, --from-file, --to-current, --shift-by 등이 있다
    --shift-by -2 를 하면 오프셋이 2칸 전으로 이동한다
  • execute의 옵션으로 --all-topics 와 --topic [그룹 이름] 이 있다

 

복제 테스트 : 브로커가 Offline 되면 어떤 일이?

이제 시스템에 Kafka 클러스터가 설정되었으므로 데이터 복제를 테스트해 보겠습니다.

실행한 3개의 브로커 중 첫 번째 브로커를 Control + C 키로 종료합니다. 

# 첫번째 브로커 중지

[2022-01-01 02:09:13,095] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-01-01 02:09:13,097] INFO Broker and topic stats closed (kafka.server.BrokerTopicStats)
[2022-01-01 02:09:13,098] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 02:09:13,099] INFO [KafkaServer id=1] shut down completed (kafka.server.KafkaServer)


실행한 3개의 브로커 중 하나를 종료해도 클러스터가 클러스터가 제대로 실행되는지 아래 명령어를 실행합니다.
다른 터미널 창에서 다른 소비자를 시작해 보겠습니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic my-kafka-topic --from-beginning --group group2

real-time data
두번째 입력
awesome!!!
first insert

 

여기에 추가한 것은 이 소비자를 다른 소비자와 구별하는 그룹 옵션입니다. 명령을 실행하면 처음부터 입력된 모든 메시지가 표시됩니다. 동일한 소비자인 경우에는 이미 데이터를 읽었기 때문에 데이터가 보지이 않습니다.


중개인 중 하나가 중지되었지만 데이터는 손실되지 않았습니다. 이는 이전에 설정한 복제 계수(replication-factor)를 2로 설정하여 데이터 사본이 여러 브로커에 존재하도록 했기 때문입니다.

 

? 의문점 : 위에서 보는 것처럼 입력한 순서대로 출력되지 않았습니다. 한글때문인지 아니면 원래 순서가 없는 것인지 정확한 이유는 파악하지 못했습니다. 추후 확인이 필요한 사안입니다.

 

 

다른 브로커를 중단하면 어떻게 될까요? 

본 문서에서는 두번 째 브로커도 중지하고 테스트 해보겠습니다. 

이 경우에는 데이터 조회가 되지 않네요? 브로커를 3개로 구성한 경우 2개의 브로커가 장애가 발생한 경우 정상 작동하지 않습니다. 즉, 브로커의 과반수 이상에 장애가 발생하면 정상 작동하지 않습니다.

Kafka 생산자 및 소비자 구현

클러스터를 실행하고 나면 애플리케이션 코드에서 생산자와 소비자를 구현할 수 있습니다.
Golang 또는 Node.js에서 Kafka 생산자 및 소비자를 구현하는 방법은 링크된 문서를 참고하시면 됩니다.

 

<참고사이트>

https://www.sohamkamani.com/install-and-run-kafka-locally/

 

 

 

<참고사이

 

[ 참고 자료 ]

https://medium.com/@kiranps11/kafka-and-zookeeper-multinode-cluster-setup-3511aef4a505

https://www.instaclustr.com/blog/the-power-of-kafka-partitions-how-to-get-the-most-out-of-your-kafka-cluster/

 

+ Recent posts