[ 설치환경 ]
- Microsoft Windows 10 Pro(10.0.19041 N/A 빌드 19041)
- WSL 2(Ubuntu 20.04.3 LTS)
- java : 11.0.3 ( 버전확인 : java -version)
- Kafka : 3.0.0 ( 버전확인 : kafka-topics.sh --version )
- Zookeeper :
이 문서서는 로컬 머신에 Kafka 클러스터를 설치 및 실행하는 방법을 보여주고 Kafka 아키텍처에 대한 몇 가지 중요한 개념을 설명합니다.
Apache Kafka 는 분산 스트리밍 플랫폼입니다. 분산 메시지 브로커에서 데이터 스트림 처리를 위한 플랫폼으로 사용할 수 있습니다.
카프카 설치
먼저 Kafka 실행 파일을 실행하려면 Java가 설치되어 있어야 합니다. 본 문서에서는 openjdk-11 을 설치했습니다.
sudo apt-get install openjdk-11-jdk
공식 다운로드 페이지에서 Kafka의 바이너리를 다운로드하고 원하는 위치에 tar 파일의 압축을 풉니다.
tar -xvzf kafka_2.13-3.0.0.tgz
kafka_2.13-3.0.0이라는 폴더가 생성되며
kafka_2.13-3.0.0 하위 디렉토리에 bin, config, libs, license, site-docs 디렉토리가 생성됩니다.
시스템 아키텍처
클러스터를 실행하기 위해 아래와 같은 프로세스들을 시작해야 합니다.
- Zookeeper : 클러스터 노드 간의 상태를 유지하기 위해 Kafka에서 사용
- Kafka brokers : 데이터를 저장하고 내보내는 파이프라인의 "파이프"
- Producers : 클러스터에 데이터를 입력
- Consumers : 클러스터로부터 데이터를 조회
이 다이어그램의 각 블록은 네트워크의 다른 시스템에 구성할 수도 있습니다.
주키퍼 시작
Zookeeper 는 coordination을 위해 서버 상태를 저장하는 데, 키-값 형태로 저장하는 저장소입니다.
Kafka를 실행하려면 Zookeeper 서버가 필요하므로 먼저 Zookeeper 인스턴스를 시작해야 합니다.
kafka_2.13-3.0.0 안에 몇 가지 유용한 파일이 있습니다.
- bin/zookeeper-server-start.sh : 서버 실행 쉘
- config/zookeeper.properties : Zookeeper 서버가 실행할 구성환경 파일
아래 명령어로 주키퍼 서버를 실행합니다. kafka_2.13-3.0.0/ 디렉토리에서 실행해야 합니다.
bin/zookeeper-server-start.sh config/zookeeper.properties
주키퍼 서버를 백그라운드로 실행하려면 -daemon 옵션을 추가하여 기동합니다.
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
설치 중간에 Zookeeper 아스키아트 문자도 보입니다.
foreground로 실행되니 최종 "Creating new log file" 보이면 다음 단계를 진행합니다.
[2022-01-01 00:34:06,106] INFO ______ _ (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO |___ / | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO / / ___ ___ | | __ ___ ___ _ __ ___ _ __ (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO / / / _ \ / _ \ | |/ / / _ \ / _ \ | '_ \ / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO / /__ | (_) | | (_) | | < | __/ | __/ | |_) | | __/ | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO /_____| \___/ \___/ |_|\_\ \___| \___| | .__/ \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,110] INFO |_| (org.apache.zoo
..... .....
[2022-01-01 00:34:10,369] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
[2022-01-01 00:34:10,373] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
[2022-01-01 01:08:12,522] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2022-01-01 01:08:14,923] WARN fsync-ing the write ahead log in SyncThread:0 took 2389ms which will adversely effect operation latency.File size is 67108880 bytes. See the ZooKeeper troubleshooting guide (org.apache.zookeeper.server.persistence.FileTxnLog)
config/zookeeper.properties 파일을 보면 clientPort 속성이 2181로 설정된 것을 볼 수 있습니다. 이 속성은 Zookeeper 서버가 현재 수신 대기 중인 포트입니다.
# File: kafka_2.13-3.0.0/config/zookeeper.properties
# the port at which the clients will connect
clientPort=2181
Kafka Broker 실행하기
Kafka 브로커는 클러스터의 핵심이며 데이터가 저장되고 배포되는 파이프라인 역할을 합니다.
Zookeeper를 시작한 방법과 유사하게,
브로커 서버를 시작하고(bin/kafka-server-start.sh) 구성(config/server.properties)하기 위한 두 개의 파일이 있습니다.
kafka의 분산환경 구성을 해야하기 때문에 다이어그램과 같이 3개의 브로커를 시작하겠습니다.
config/server.properties 파일을 열면 여러 구성 옵션이 표시됩니다(지금은 대부분 무시할 수 있음).
그러나 각 브로커 인스턴스에 대해 고유해야 하는 세 가지 속성이 있습니다.
File: kafka_2.13-3.0.0/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# The address the socket server listens on. It will get the value returned from
listeners=PLAINTEXT://:9092
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
* 주키퍼 서버들 멀티로 구성한 경우에는 zookeeper.connect 에 나열해주면 됩니다.
예시) zookeeper.connect=localhost:2181, localhost:2182, localhost:2183
서버가 3개이므로 각 서버에 대해 각각의 구성 파일을 만들겠습니다.
config/server.properties 파일을 복사하고 각 서버 인스턴스에 대해 각각의 property 파일을 만듭니다.
cp config/server.properties config/server.1.properties
cp config/server.properties config/server.2.properties
cp config/server.properties config/server.3.properties
파일의 각 복사본에 대해 3가지 속성을 각각 고유하게 변경합니다.
server.1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1
server.2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs2
server.3.properties
broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/tmp/kafka-logs3
log 디렉토리를 생성합니다.
mkdir /tmp/kafka-logs1
mkdir /tmp/kafka-logs2
mkdir /tmp/kafka-logs3
이제 broker instances를 실행할 수 있습니다.
서로 다른 터미널 세션에서 아래 명령을 각각 실행합니다. 본 문서의 모든 명령어는 kafka_2.13-3.0.0/ 디렉토리에서 수행해야 합니다. foreground로 실행되니 프로세스를 중지하면 안됩니다.
bin/kafka-server-start.sh config/server.1.properties
bin/kafka-server-start.sh config/server.2.properties
bin/kafka-server-start.sh config/server.3.properties
백그라운드로 실행하려면 -daemon 옵션을 추가합니다.
bin/kafka-server-start.sh -daemon config/server.1.properties
bin/kafka-server-start.sh -daemon config/server.2.properties
bin/kafka-server-start.sh -daemon config/server.3.properties
브로커가 성공적으로 시작되면 'started' 메시지가 표시되어야 합니다.
.....
[2022-01-01 01:08:20,944] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,944] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,945] INFO Kafka startTimeMs: 1640966900932 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,948] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
.....
Topics 생성
클러스터에 데이터를 입력하기 전에 데이터가 속할 topic을 생성해야 합니다. 새로운 세션창에서 다음 명령을 실행합니다.
'my-kafka-topic' Topic 이 생성되었다는 메시지가 출력됩니다.
bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 3
Created topic my-kafka-topic
여기에서 몇 가지 옵션을 설명하겠습니다.
- 파티션을 사용하면 데이터를 분할할 브로커 수를 결정할 수 있습니다. 일반적으로 브로커 수와 동일하게 설정합니다. 3개의 브로커를 설정했으므로 이 옵션을 3으로 설정합니다.
- replication-factor는 원하는 데이터 복사본의 수를 나타냅니다(브로커 중 하나가 다운되는 경우에도 다른 브로커에 데이터가 남아 있음). 이 값을 3로 설정했으므로 데이터는 브로커에 복사본 두 개를 더 갖습니다.
그림으로 표시하면 아래와 같습니다. 색깔이 있는 파티션은 leader이고 색이없는 파티션은 follower입니다.
bootstrap-server는 활성 Kafka 브로커 중 하나의 주소를 가리킵니다. 모든 브로커는 Zookeeper를 통해 서로에 대해 알고 있으므로 어느 브로커를 선택하든 상관 없습니다.
Kafka 버전 2.x.x 이하를 사용하는 경우 --bootstrap-server localhost:9093 대신
--zookeeper localhost:2181(Zookeeper 인스턴스의 주소)을 사용해야 합니다.
Topics 조회
사용 가능한 모든 주제(Topics)를 나열하려면 bin/kafka-topics.sh 을 실행하면 됩니다. 이 경우 이전 섹션에서 만든 하나의 주제를 출력합니다.
bin/kafka-topics.sh --list --bootstrap-server localhost:9093
my-kafka-topic
주제에 대한 자세한 내용을 보려면 동일한 명령과 함께 --describe 인수를 사용할 수 있습니다.
bin/kafka-topics.sh --describe --topic my-kafka-topic --bootstrap-server localhost:9093
주제의 파티션 및 복제본에 대한 세부 정보를 출력합니다. Partition, Leader/follower, Replicas, Isr(In Sync Replica) 정보를 보여줍니다. 여기서 ISR은 kafka 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 나타냅니다. 만일, 브로커 중 1대의 서버가 중지된 상태라면 Isr 은 2개 만 표시됩니다. 3번 브로커 서버가 중지되었다면 Leader는 1 또는 2가 되고 Isr 은 1,2 가 됩니다.
Topic: my-kafka-topic TopicId: 2c7cvTC1QGKy2bu18revoA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: my-kafka-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: my-kafka-topic Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: my-kafka-topic Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topics 삭제
이전버전 :
이전 버전에서는 kafka server.properties파일에 설정을 하고 kafka broker를 재기동해야 topic을 삭제할 수 있습니다.
# config/server.properties파일
delete.topic.enable = true
server.properties에 delete.topic.enable=true 설정을 하고 kafka borker를 재기동 한 다음, kafka-topics.sh로 삭제를 해주면 됩니다.
$ bin/kafka-topics.sh --delete --topic my-kafka-topic --bootstrap-server localhost:9093
최신버전 : (3.0 version)
KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다.
브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)
elete.topic.enable 기본 설정은 true입니다.
$ bin/kafka-topics.sh --delete --topic my-kafka-topic --bootstrap-server localhost:9093
동적 파라미터 설정
topic config 확인 :
bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type topics --entity-name movie --describe
broker config 확인 :
bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 1 --describe
동적 파라미터 변경 및 topic 삭제 :
- bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-default --alter --add-config delete.topic.enable=true
- bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-kafka-topic
- bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-default --alter --delete-config delete.topic.enable
==> 오류(apache kafka 에는 동적으로 변경된다고 설명되어 있는데 오류 발생
1.
Caused by: org.apache.kafka.common.errors.InvalidRequestException: Invalid config value for resource ConfigResource(type=BROKER, name=''): Cannot update these configs dynamically: Set(delete.topic.enable)
2.
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
3.
Invalid config(s): delete.topic.enable
아직 동적 config 설정이 정상작동 하지 않는 것 같다. 패치버전이나 더 상위 버전이 나오는 다시 확인해 본다.
Zookeeper 쉘을 이용한 방법
* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.
zookeeper-shell.sh 를 이용하여서 znode를 삭제할 수 있습니다.
rmr명령은 더이상 사용되진 않는다. delete나 deleteall로 삭제합니다. delete 를 사용할 경우 message가 남아있는 경우 삭제가 되지 않습니다. 이럴 경우, deleteall로 삭제할 수 있습니다.
$ bin/zookeeper-shell.sh localhost:2181
ls /brokers/topics
## 실행결과
[__consumer_offsets, movie, music, my-kafka-topic]
rmr /brokers/topics/my-kafka-topic
## 실행결과. rmr은 사용할 수 없다.
Command not found: Command not found rmr
## The zkCli provides the rmr (deprecated) or deleteall command for this purpose
delete /brokers/topics/my-kafka-topic
## 실행결과
Node not empty: /brokers/topics/my-kafka-topic
deleteall /brokers/topics/my-kafka-topic
ls /brokers/topics
[__consumer_offsets, movie, music]
quit
## 삭제된 topic을 다시 생성해본다.
$ bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2
## 오류가 발생한다. topic을 kafka-topics.sh 로 조회하면 목록이 보인다.
## zookeeper와 kafka broker 간이 연동이 잘 안되는 것 같다.
Error while executing topic command : Topic 'my-kafka-topic' already exists
* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.
Producers: Topic에 메시지 보내기
"생산자(Producer)"는 데이터를 Kafka 클러스터에 넣는 프로세스입니다.
bin 디렉토리의 명령어는 콘솔에 텍스트를 입력할 때마다 클러스터에 데이터를 입력하는 콘솔 생성자(Producer)를 제공합니다.
콘솔 생산자(Producer)를 시작하려면 다음 명령을 실행합니다.
bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-kafka-topic
- Broker-list는 생산자가 방금 프로비저닝한 브로커의 주소를 가리킵니다.
- topic은 데이터가 입력하려는 주제(topic)를 지정합니다.
이제 데이터를 입력하고 Enter 키를 치면, Kafka 클러스터에 텍스트를 입력할 수 있는 명령 프롬프트가 표시됩니다.
- 컨슈머가 데이터를 가져가도 Topic 데이터는 삭제되지 않습니다.
>first insert
>두번째 입력
프로듀서가 데이터를 보낼 때 '파티션키'를 지정해서 보낼 수 있습니다.
파티션키를 지정하지 않으면, 라운드로빈 방식으로 파티션에 저장하고,
파티션키를 지정하면, 파티셔너가 파티션키의 HASH 값을 구해서 특정 파티션에 할당합니다.
카프카에서 kafka-console-producer.sh로 Consumer에게 메세지를 보낼 때 기본적으로 key값이 null로 설정되어 있습니다. 이럴 때 설정에서 parse.key 및 key.separator 속성을 설정하면 key와 value가 포함된 메시지를 보낼 수 있습니다.
$ /bin/kafka-console-producer.sh \
--broker-list localhost:9093 \
--topic my-kafka-topic \
--property "parse.key=true" \
--property "key.separator=:"
- parse.key : key와 value 파싱 활성화 여부
- key.separator : key와 value를 구분해주는 구분자
- print.key : 화면에 key 를 보여줄 지 여부를 지정
- print.value : 화면에 value 를 보여줄 지 여부를 지정
파일로 메시지 보내기
kafka-console-producer --broker-list localhost:9092 --topic test_topic < file.log
Consumers
Kafka consumers 를 사용하여 클러스터에서 데이터를 읽을 수 있습니다. 이 경우 이전 섹션에서 생성한 데이터를 읽습니다.
consumer를 시작하려면 다음 명령을 실행합니다. 아래와 같이 위에서 pruducer가 입력한 데이터가 출력됩니다.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning
first insert
두번째 입력
- bootstrap-server는 클러스터의 브로커 중 하나일 수 있습니다.
- Topic은 생산자가 클러스터에 데이터를 입력한 Topic(주제)과 동일해야 합니다.
- from-beginning은 클러스터에 현재 가지고 있는 모든 메시지를 원한다고 클러스터에 알립니다.
- 컨슈머 그룹이 다른 새로운 컨슈머가 auto.offset.reset=earliest 설정으로 데이터를 0번부터 가져갈 수 있습니다. 설정하지 않으면 새롭게 토픽에 생성된 메세지만 읽어옵니다.
위의 명령을 실행하면 콘솔에 로그온한 생산자가 입력한 모든 메시지가 즉시 표시됩니다.
소비자가 실행되는 동안 생산자가 메시지를 입력하면 실시간으로 콘솔에 출력되는 것을 볼 수 있습니다.
이런 식으로 Kafka는 지속적인 메시지 대기열처럼 작동하여 소비자가 아직 읽지 않은 메시지를 저장하고 소비자가 실행되는 동안 새 메시지가 오는 대로 전달합니다.
카프카에서는 consumer 그룹이라는 개념이 있는데 --consumer-property group.id=group-01 형식으로 consumer 그룹을 지정할 수 있습니다. 카프카 브로커는 컨슈머 그룹 중 하나의 컨슈머에게만 이벤트를 전달합니다. 동일한 이벤트 처리를 하는 컨슈머를 clustering 한 경우에 컨슈머 그룹으로 지정하면 클러스터링된 컨슈머 중 하나의 서버가 데이터를 수신합니다.
key와 value를 콘솔창에 표시하기 위해서는 --property print.key=true --property key.separator=: 를 설정합니다.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning \
--property print.key=true --property key.separator=:
"key":"second key value"
"secone message":null
key3:third
Consumer Group
기존의 Message Queue 솔루션에서는 컨슈머가 메시지를 가져가면, 해당 메세지를 큐에서 삭제된다. 즉, 하나의 큐에 대하여 여러 컨슈머가 붙어서 같은 메세지를 컨슈밍할 수 없다. 하지만 Kafka는, 컨슈머가 메세지를 가져가도 큐에서 즉시 삭제되지 않으며, 하나의 토픽에 여러 컨슈머 그룹이 붙어 메세지를 가져갈 수 있다.
또한 각 consumer group마다 해당 topic의 partition에 대한 별도의 offset을 관리하고, group에 컨슈머가 추가/제거 될 때마다 rebalancing을 하여 group 내의 consumer에 partition을 할당하게 된다. 이는 컨슈머의 확장/축소를 용이하게 하고, 하나의 MQ(Message Queue)를 컨슈머 별로 다른 용도로 사용할 수 있는 확장성을 제공한다.
아래 명령어로 컨슈머 그룹을 지정합니다.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --group [그룹 이름]
- 그룹에 속하는 컨슈머가 여러개이면 로드밸런싱을 통해 자동으로 메세지를 분배
2. 그룹 정보 확인
kafka-console-groups.sh --bootstrap-server localhost:9092 --describe --group [그룹 이름]
- 위에 대한 명령어로 아래의 정보들을 얻을 수 있다.
-- CURRENT-OFFSET : Consumer Group이 Kafka에서 읽은 offset
- LOG-END-OFFSET : 해당 topic, partition의 마지막 offset
- LAG : LOG-END-OFFSET 과 CURRENT-OFFSET의 차이
- LAG의 경우 topic의 partition단위로 읽어야 할 남은 데이터 수를 의미한다
Offsets
오프셋 리셋
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [그룹 이름] --reset-offsets --to-earlist --execute --topic [토픽 이름]
- 오프셋을 리셋하여 데이터를 다시 읽어드릴 때 사용
- reset-offsets의 옵션으로 --to-earlist , --to-datetime, --from-file, --to-current, --shift-by 등이 있다
- --shift-by -2 를 하면 오프셋이 2칸 전으로 이동한다 - execute의 옵션으로 --all-topics 와 --topic [그룹 이름] 이 있다
복제 테스트 : 브로커가 Offline 되면 어떤 일이?
이제 시스템에 Kafka 클러스터가 설정되었으므로 데이터 복제를 테스트해 보겠습니다.
실행한 3개의 브로커 중 첫 번째 브로커를 Control + C 키로 종료합니다.
# 첫번째 브로커 중지
[2022-01-01 02:09:13,095] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-01-01 02:09:13,097] INFO Broker and topic stats closed (kafka.server.BrokerTopicStats)
[2022-01-01 02:09:13,098] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 02:09:13,099] INFO [KafkaServer id=1] shut down completed (kafka.server.KafkaServer)
실행한 3개의 브로커 중 하나를 종료해도 클러스터가 클러스터가 제대로 실행되는지 아래 명령어를 실행합니다.
다른 터미널 창에서 다른 소비자를 시작해 보겠습니다.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic my-kafka-topic --from-beginning --group group2
real-time data
두번째 입력
awesome!!!
first insert
여기에 추가한 것은 이 소비자를 다른 소비자와 구별하는 그룹 옵션입니다. 명령을 실행하면 처음부터 입력된 모든 메시지가 표시됩니다. 동일한 소비자인 경우에는 이미 데이터를 읽었기 때문에 데이터가 보지이 않습니다.
중개인 중 하나가 중지되었지만 데이터는 손실되지 않았습니다. 이는 이전에 설정한 복제 계수(replication-factor)를 2로 설정하여 데이터 사본이 여러 브로커에 존재하도록 했기 때문입니다.
? 의문점 : 위에서 보는 것처럼 입력한 순서대로 출력되지 않았습니다. 한글때문인지 아니면 원래 순서가 없는 것인지 정확한 이유는 파악하지 못했습니다. 추후 확인이 필요한 사안입니다.
다른 브로커를 중단하면 어떻게 될까요?
본 문서에서는 두번 째 브로커도 중지하고 테스트 해보겠습니다.
이 경우에는 데이터 조회가 되지 않네요? 브로커를 3개로 구성한 경우 2개의 브로커가 장애가 발생한 경우 정상 작동하지 않습니다. 즉, 브로커의 과반수 이상에 장애가 발생하면 정상 작동하지 않습니다.
Kafka 생산자 및 소비자 구현
클러스터를 실행하고 나면 애플리케이션 코드에서 생산자와 소비자를 구현할 수 있습니다.
Golang 또는 Node.js에서 Kafka 생산자 및 소비자를 구현하는 방법은 링크된 문서를 참고하시면 됩니다.
<참고사이트>
https://www.sohamkamani.com/install-and-run-kafka-locally/
<참고사이
[ 참고 자료 ]
https://medium.com/@kiranps11/kafka-and-zookeeper-multinode-cluster-setup-3511aef4a505
'Apache Kafka' 카테고리의 다른 글
카프카 프로듀서 파티셔너 종류 및 정리(2.5.0 기준) (0) | 2022.01.07 |
---|---|
카프카 컨슈머 파티셔너 종류 및 정리(2.5.0 기준) (0) | 2022.01.07 |
카프카-파이썬 연동 (0) | 2022.01.07 |
아파치 카프카 시작하기 (0) | 2021.12.31 |
아파치 카프카 (0) | 2021.12.31 |