파티셔너(Partitioner)란?

프로듀서가 데이터를 보내면 파티셔너를 통해서 카프카 브로커로 데이터가 전송된다.

파티셔너는 데이터를 어떤 파티션에 넣을지를 결정하는 역할을 합니다.

레코드에 포함된 메시지키 값에 따라서 파티션의 위치가 결정됩니다.

프로듀서를 사용할 때 파티셔너를 따로 설정하지 않으면, UniformStickyPartitioner 로 설정됩니다.

 

카프카 프로듀서는 레코드를 전송하기 위해 파티셔너를 제공합니다.



프로듀서 파티셔너 인터페이스에 대한 설명은 아래 링크에서 확인할 수 있습니다.

https://kafka.apache.org/25/javadoc/?org/apache/kafka/clients/producer/Partitioner.html

 

kafka 2.5.0 API

 

kafka.apache.org

 

메시지 키가 있을 때
메시지 키가 있을 경우에는 파티셔너의 종류와 관계없이 동일하게 동작합니다. 메시지 키의 해쉬값을 구해서 해당 해쉬값과 파티션을 매칭하여 적재합니다. 이로 인해 동일한 메시지 키를 가진 레코드는 동일한 파티션에 들어가게 됩니다. 다만 주의할 점은 파티션 개수가 늘어날 때 입니다. 파티션 개수가 늘어나면 메시지 키 해시값과 파티션 매칭이 틀어지게 되어 파티션 증가 전후로 동일한 메시지 키더라도 다른 파티션에 레코드가 들어갈 수 있으므로 주의해야 합니다.

 

UniformStickyPartitioner

메시지 키의 존재 유무에 따라 다르게 동작한다.

메시지 키가 있는 경우

  1. 메시지 키를 가진 레코드는 파티셔너에 의해서 특정한 해시값이 생성된다.
  2. 이 해시값을 기준으로 어느 파티션으로 들어갈지 정해진다.
  3.  
  4.  
  1. 토픽에 파티션이 2개가 있는 경우, 파티셔너의 해시로직에 의해서 키값에 따라 다른 파티션에 들어가게 된다.
    • 동일한 메시지 키값을 가진 레코드는 동일한 해시값을 만들기 때문에 항상 동일한 파티션에 들어가는 것을 보장한다.
    • 이로 인해, “순서를 지켜서 처리할 수 있다”는 장점이 있다.

​ 파티션 한 개의 내부에서는 큐처럼 동작하기 때문에 순서를 지킬 수 있는 것이다.

UniformStickyPartitioner

유니폼 스티키 파티션은 2.4.0 부터 기본 설정으로 사용되는 파티셔너입니다. 이 파티셔너는 스티키 파티셔너라고도 부릅니다. 스티키 파티션은 라운드-로빈 파티셔너와 다르게 프로듀서 내부동작에 특화되어 있습니다. 특히 배치전송에 특화되어 있습니다.

프로듀서는 파티션에 데이터를 전송하기 전에 Accumulator에 데이터를 버퍼로 쌓아 놓고 발송합니다. 스티키 파티셔너를 사용할 경우 Accumulator의 버퍼를 채워서 보내기 때문에 성능향상에서 유리합니다.

https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

KIP-480 implements a new partitioner, which chooses the sticky partition that changes when the batch is full if no partition or key is present. Using the sticky partitioner helps improve message batching, decrease latency, and reduce the load for the broker. Some of the benchmarks which Justine Olshan discusses on the KIP show up to a 50% reduction in latency and 5–15% reduction in CPU utilization.

RoundRobinPartitioner

메시지 키가 없을 경우, 라운드-로빈 방식으로 데이터가 들어오는대로 파티션을 순회하면서 레코드를 넣습니다. 파티션 개수가 늘어날때도 마찬가지로 순회하면서 지속적으로 데이터를 분배하면서 넣습니다.

https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

 

 

사용자 정의 파티셔너

커스텀 파티셔너를 사용자가 만들면 메시지키, 메시지값, 토픽이름에 따라 어떤 파티션으로 보낼 지 정할 수 있습니다.

 

원문참조 : https://blog.voidmainvoid.net/360

 

 

카프카 컨슈머는 토픽의 파티션과 매칭하여 레코드를 가져옵니다. 파티션을 매칭하는 기준은 컨슈머 파티션 어사이너의 기준을 따릅니다. 

컨슈머 파티션어사이너 인터페이스는 아래 링크에서 확인할 수 있습니다. https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html

 

RangeAssignor

기본 설정되는 파티션 어사이너입니다. 토픽의 파티션을 숫자기준으로 나열하고, 컨슈머의 이름을 사전순으로 나열한 뒤에 배정하는 정확히 반으로 나누어 배정합니다. 만약 딱 반으로 안나뉘어지는 홀수개의 파티션을 나눌 경우에는 앞쪽 순서의 컨슈머가 파티션을 더 많이 할당합니다.

예를들어 파티션이 3개인 토픽(p0, p1, p2)과 컨슈머가 2개(c0, c1)가 있다고 가정해 봅시다. 파티션 3개를 컨슈머에 분배하려면 2개와 1개를 배분해야 합니다. 앞쪽 순서의 컨슈머는 파티션을 2개 가지게 됩니다. 이로 인해 아래와 같이 매칭됩니다.

RoundRobinAssignor

라운드-로빈 어사이너는 파티션을 컨슈머에 번갈아가며 할당하는 방식입니다. 

예를들어 파티션이 3개인 토픽(p0, p1, p2)과 컨슈머 2개(c0, c1)가 있다고 가정해 봅시다. 파티션 3개를 컨슈머에 분배하는데 파티션 순서를 번갈아 가면서 할당하게 되어 아래와 같이 매칭됩니다.

라운드-로빈 어사이너를 사용할 경우 특정 상황에서는 일부 컨슈머에 파티션이 몰릴 수도 있습니다. 예를들어 파티션이 1개인 토픽, 2개인 토픽, 3개인 토픽이 있고 3개의 컨슈머가 각각 토픽 1번, 토픽 2번, 토픽 3번을 매칭하면 아래와 같이 라운드-로빈 로직으로 인해 일부 컨슈머에 파티션이 몰릴 수 있으므로 주의해야 합니다.

StickyAssignor

스티키 어사이너는 두가지 목적으로 사용됩니다. 첫번째는 최대한 파티션을 균등하게 매칭하기 위함입니다.  균등하게 매칭이라고 뜻하는것은 아래와 같은 정책을 내포합니다.

- 컨슈머에 매칭된 파티션의 개수가 최대 1개 이상을 넘지 않도록 합니다.
- each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.

두번째는 리밸런싱이 일어날 경우 최대한 파티션의 이동을 줄이는데 목적이 있습니다. 파티션 매칭 이동을 줄이면 리밸런싱 발생시 오버헤드를 줄일 수 있습니다. 

스티키 어사이너를 사용할 경우 최대한 균등하게 파티션을 컨슈머에 나눕니다. 라운드로빈과 비슷하다고 생각할수도 있지만 실제로 동작은 그렇지 않습니다. 그리고 리밸런싱 발생시에 동작이 다름을 확인할 수 있습니다.

예를 들어 3개의 컨슈머가 있고 3개의 토픽(각각 1개, 2개, 3개 파티션 보유)이 있다고 가정해봅시다. 라운드 로빈 어사이너로 분배했을 경우에는 이전 예시와 같이 몰리는 현상이 발생할 수 있지만 스티키 파티셔너의 경우 다르게 동작하여 아래와 같이 매칭되는 것을 확인할 수 있습니다.

추가적으로 스티키 어사이너를 사용할 경우 이미할당된 파티션의 대부분이 남아있다는 특징을 리밸런스 리스너에서 활용할 수 있습니다. 관련 내용은 아래 링크에서 확인할 수 있습니다.

 

관련 내용은 아래 링크에서 확인할 수 있습니다. https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html

 

원문참조 : https://blog.voidmainvoid.net/361

 

카프카(Kafka)에서는 다양한 언어로 데이터를 주고 받는 기능을 제공하는데 본 포스팅은 파이썬(Python)으로 구현하는 프로듀서(producer)/컨슈머(consumer) 즉 데이터를 보내고 받는 방법을 설명한다.

 

파이썬으로 카프카를 호출하는 방법이 대표적으로 2가지 방법이 존재하는 것 같다. 하나는 카프카를 만든 제이 크렙스(Jay Kreps)가 만든 회사인 confluent가 제공하는 라이브러리이고, 다른 하나는 kafka-python이라는 라이브러리를 사용하는 방법이다. 후자인 kafka-python을 범용적으로 많이 사용하는데 성능은 컨플루언트의 라이브러리가 더 좋다.



후자를 사용하는 이유는 단하나 confluent는 c로 만든 라이브러리를 호출하여 사용하는 방식이라 별도의 설치과정이 존재하기 때문이다. 즉 성능을 중시 여긴다면 c로 만든 confluent를 사용하면 될 것이고, 쉽게 사용하는 것을 목적으로 한다면 후자를 사용해도 상관이 없을 것이다.

 

kafka-pyhon 라이브러리 설치

pip install kafka-python

kafka-python 설치 과정

Producer 구현

라이브러리를 설치했으면 아래와 같은 코드를 작성하고 실행해본다.

 

from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

start = time.time()
for i in range(10000):
    data = {'str' : 'result'+str(i)}
    producer.send('test', value=data)
    producer.flush()
print("elapsed :", time.time() - start)

위 코드는 json 형태로 data를 생성하는데 "result"라는 문자열에 + i값을 만번까지 증가시켜서 test라는 토픽으로 보내는 내용이다. 실행을 하게 되면 최종적으로는 만번 전달하는데 걸린 시간이 출력이 되는 것인데 필자는 다음과 같은 속도가 나왔다.

 

프로듀서 옵션

KafkaProducer를 선언할 때 acks가 0으로 확인 없이 전송하며, compression_type이 gzip으로 gzip형태로 압축하여 전송을 한다. 그리고 카프카 서버가 여러대일 경우 bootstrap_servers에 브로커 리스트를 배열 값으로 지정을 하면 되며 자세한 옵션 정보는 아래와 같다.

 

프로듀서 옵션 정보

 

Produce 결과

console로 확인하는 producer 실시간 전송내역들

C:\Users\user\anaconda3\python.exe D:/rainbow/application/producer.py
elapsed : 5.04697060585022

Process finished with exit code 0

만번 전송하는데 5.04초가 걸렸으니 초당 약 2천건 정도 보내졌다는 것을 알 수 있다.

 

 

 

Consumer 구현

이제 데이터를 전송했으면 받는 부분인 consumer를 구현해보도록 한다.

 

from kafka import KafkaConsumer
from json import loads

# topic, broker list
consumer = KafkaConsumer(
    'test',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')),
     consumer_timeout_ms=1000
)

# consumer list를 가져온다
print('[begin] get consumer list')
for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
        message.topic, message.partition, message.offset, message.key, message.value
    ))
print('[end] get consumer list')

Consumer를 선언할 때, test라는 topic을 기준으로 카프카 서버는 localhost를 지정하였다. 

 

Consumer 옵션

옵션 설명
bootstrap_servers 카프카 클러스터들의 호스트와 포트 정보 리스트
auto_offset_reset earliest : 가장 초기 오프셋값
latest : 가장 마지막 오프셋값
none : 이전 오프셋값을 찾지 못할 경우 에러
enable_auto_commit 주기적으로 offset을 auto commit
group_id 컨슈머 그룹을 식별하기 위한 용도
value_deserializer producer에서 value를 serializer를 했기 때문에 사용
consumer_timeout_ms 이 설정을 넣지 않으면 데이터가 없어도 오랜기간 connection한 상태가 된다. 데이터가 없을 때 빠르게 종료시키려면 timeout 설정을 넣는다.

 

Consumer 결과

C:\Users\user\anaconda3\python.exe D:/rainbow/application/consumer.py
[begin] get consumer list
Topic: test, Partition: 0, Offset: 0, Key: None, Value: {'str': 'result0'}
Topic: test, Partition: 0, Offset: 1, Key: None, Value: {'str': 'result1'}
Topic: test, Partition: 0, Offset: 2, Key: None, Value: {'str': 'result2'}
Topic: test, Partition: 0, Offset: 3, Key: None, Value: {'str': 'result3'}
Topic: test, Partition: 0, Offset: 4, Key: None, Value: {'str': 'result4'}
...
Topic: test, Partition: 0, Offset: 9996, Key: None, Value: {'str': 'result9996'}
Topic: test, Partition: 0, Offset: 9997, Key: None, Value: {'str': 'result9997'}
Topic: test, Partition: 0, Offset: 9998, Key: None, Value: {'str': 'result9998'}
Topic: test, Partition: 0, Offset: 9999, Key: None, Value: {'str': 'result9999'}
[end] get consumer list

Process finished with exit code 0

 

위 예제는 간단히 확인해보는 예제로 로그성 데이터가 아니라 중요한 데이터를 가져와야 되는 경우 설정이 중요해질 수 있기 때문에 github에 있는 예제들을 확인하는 것이 좋을 것이다.

 

python-kafka github

https://github.com/dpkp/kafka-python




[ 설치환경 ]

- Microsoft Windows 10 Pro(10.0.19041 N/A 빌드 19041)

- WSL 2(Ubuntu 20.04.3 LTS)

- java :  11.0.3 ( 버전확인 : java -version)

- Kafka :  3.0.0 ( 버전확인 : kafka-topics.sh --version )

- Zookeeper : 

 

 

이 문서서는 로컬 머신에 Kafka 클러스터를 설치 및 실행하는 방법을 보여주고 Kafka 아키텍처에 대한 몇 가지 중요한 개념을 설명합니다.

Apache Kafka 는 분산 스트리밍 플랫폼입니다. 분산 메시지 브로커에서 데이터 스트림 처리를 위한 플랫폼으로 사용할 수 있습니다.

카프카 설치

먼저 Kafka 실행 파일을 실행하려면 Java가 설치되어 있어야 합니다. 본 문서에서는 openjdk-11 을 설치했습니다.

sudo apt-get install openjdk-11-jdk

 


공식 다운로드 페이지에서 Kafka의 바이너리를 다운로드하고 원하는 위치에 tar 파일의 압축을 풉니다.

tar -xvzf kafka_2.13-3.0.0.tgz

kafka_2.13-3.0.0이라는 폴더가 생성되며

kafka_2.13-3.0.0 하위 디렉토리에 bin, config, libs, license, site-docs 디렉토리가 생성됩니다.

 

시스템 아키텍처

클러스터를 실행하기 위해 아래와 같은 프로세스들을 시작해야 합니다.

  1. Zookeeper : 클러스터 노드 간의 상태를 유지하기 위해 Kafka에서 사용
  2. Kafka brokers : 데이터를 저장하고 내보내는 파이프라인의 "파이프"
  3. Producers : 클러스터에 데이터를 입력
  4. Consumers : 클러스터로부터 데이터를 조회

이 다이어그램의 각 블록은 네트워크의 다른 시스템에 구성할 수도 있습니다.

 

주키퍼 시작

Zookeeper 는 coordination을 위해 서버 상태를 저장하는 데, 키-값 형태로 저장하는 저장소입니다.

Kafka를 실행하려면 Zookeeper 서버가 필요하므로 먼저  Zookeeper 인스턴스를 시작해야 합니다.

kafka_2.13-3.0.0 안에 몇 가지 유용한 파일이 있습니다.

  • bin/zookeeper-server-start.sh : 서버 실행 쉘
  • config/zookeeper.properties : Zookeeper 서버가 실행할 구성환경 파일

 

아래 명령어로 주키퍼 서버를 실행합니다. kafka_2.13-3.0.0/ 디렉토리에서 실행해야 합니다.

bin/zookeeper-server-start.sh config/zookeeper.properties

 

주키퍼 서버를 백그라운드로 실행하려면 -daemon 옵션을 추가하여 기동합니다.

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

 

설치 중간에 Zookeeper 아스키아트 문자도 보입니다.

foreground로 실행되니 최종 "Creating new log file" 보이면 다음 단계를 진행합니다.

[2022-01-01 00:34:06,106] INFO   ______                  _                                           (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO  |___  /                 | |                                          (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __    (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |     (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO                                               | |                      (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,110] INFO                                               |_|                      (org.apache.zoo
.....   .....
[2022-01-01 00:34:10,369] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
[2022-01-01 00:34:10,373] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
[2022-01-01 01:08:12,522] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2022-01-01 01:08:14,923] WARN fsync-ing the write ahead log in SyncThread:0 took 2389ms which will adversely effect operation latency.File size is 67108880 bytes. See the ZooKeeper troubleshooting guide (org.apache.zookeeper.server.persistence.FileTxnLog)

config/zookeeper.properties 파일을 보면 clientPort 속성이 2181로 설정된 것을 볼 수 있습니다. 이 속성은 Zookeeper 서버가 현재 수신 대기 중인 포트입니다.

# File: kafka_2.13-3.0.0/config/zookeeper.properties

# the port at which the clients will connect
clientPort=2181

 

Kafka Broker 실행하기

Kafka 브로커는 클러스터의 핵심이며 데이터가 저장되고 배포되는 파이프라인 역할을 합니다.
Zookeeper를 시작한 방법과 유사하게,

브로커 서버를 시작하고(bin/kafka-server-start.sh) 구성(config/server.properties)하기 위한 두 개의 파일이 있습니다.
kafka의 분산환경 구성을 해야하기 때문에 다이어그램과 같이 3개의 브로커를 시작하겠습니다.
config/server.properties 파일을 열면 여러 구성 옵션이 표시됩니다(지금은 대부분 무시할 수 있음).
그러나 각 브로커 인스턴스에 대해 고유해야 하는 세 가지 속성이 있습니다.

File: kafka_2.13-3.0.0/config/server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# The address the socket server listens on. It will get the value returned from 
listeners=PLAINTEXT://:9092

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181

* 주키퍼 서버들 멀티로 구성한 경우에는 zookeeper.connect 에 나열해주면 됩니다.

예시) zookeeper.connect=localhost:2181, localhost:2182, localhost:2183

서버가 3개이므로 각 서버에 대해 각각의 구성 파일을 만들겠습니다.

config/server.properties 파일을 복사하고 각 서버 인스턴스에 대해 각각의 property 파일을 만듭니다.

cp config/server.properties config/server.1.properties
cp config/server.properties config/server.2.properties
cp config/server.properties config/server.3.properties

 

파일의 각 복사본에 대해 3가지 속성을 각각 고유하게 변경합니다.

server.1.properties

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1

server.2.properties

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs2

server.3.properties

broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/tmp/kafka-logs3

log 디렉토리를 생성합니다.

mkdir /tmp/kafka-logs1
mkdir /tmp/kafka-logs2
mkdir /tmp/kafka-logs3

이제 broker instances를 실행할 수 있습니다. 

서로 다른 터미널 세션에서 아래 명령을 각각 실행합니다. 본 문서의 모든 명령어는 kafka_2.13-3.0.0/ 디렉토리에서 수행해야 합니다. foreground로 실행되니 프로세스를 중지하면 안됩니다.

bin/kafka-server-start.sh config/server.1.properties
bin/kafka-server-start.sh config/server.2.properties
bin/kafka-server-start.sh config/server.3.properties
 

백그라운드로 실행하려면 -daemon 옵션을 추가합니다.

bin/kafka-server-start.sh -daemon config/server.1.properties
bin/kafka-server-start.sh -daemon config/server.2.properties
bin/kafka-server-start.sh -daemon config/server.3.properties

 

브로커가 성공적으로 시작되면 'started' 메시지가 표시되어야 합니다.

.....
[2022-01-01 01:08:20,944] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,944] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,945] INFO Kafka startTimeMs: 1640966900932 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,948] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
.....

Topics 생성

클러스터에 데이터를 입력하기 전에 데이터가 속할 topic을 생성해야 합니다. 새로운 세션창에서 다음 명령을 실행합니다.

'my-kafka-topic' Topic 이 생성되었다는 메시지가 출력됩니다.

bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 3
Created topic my-kafka-topic

여기에서 몇 가지 옵션을 설명하겠습니다.

  • 파티션을 사용하면 데이터를 분할할 브로커 수를 결정할 수 있습니다. 일반적으로 브로커 수와 동일하게 설정합니다. 3개의 브로커를 설정했으므로 이 옵션을 3으로 설정합니다.
  • replication-factor는 원하는 데이터 복사본의 수를 나타냅니다(브로커 중 하나가 다운되는 경우에도 다른 브로커에 데이터가 남아 있음). 이 값을 3로 설정했으므로 데이터는 브로커에 복사본 두 개를 더 갖습니다. 

그림으로 표시하면 아래와 같습니다. 색깔이 있는 파티션은 leader이고 색이없는 파티션은 follower입니다.

bootstrap-server는 활성 Kafka 브로커 중 하나의 주소를 가리킵니다. 모든 브로커는 Zookeeper를 통해 서로에 대해 알고 있으므로 어느 브로커를 선택하든 상관 없습니다.

 

Kafka 버전 2.x.x 이하를 사용하는 경우 --bootstrap-server localhost:9093 대신
--zookeeper localhost:2181(Zookeeper 인스턴스의 주소)을 사용해야 합니다.

 

Topics 조회

사용 가능한 모든 주제(Topics)를 나열하려면 bin/kafka-topics.sh 을 실행하면 됩니다. 이 경우 이전 섹션에서 만든 하나의 주제를 출력합니다.

bin/kafka-topics.sh --list --bootstrap-server localhost:9093

my-kafka-topic

 

주제에 대한 자세한 내용을 보려면 동일한 명령과 함께 --describe 인수를 사용할 수 있습니다.

bin/kafka-topics.sh --describe --topic my-kafka-topic --bootstrap-server localhost:9093

 

주제의 파티션 및 복제본에 대한 세부 정보를 출력합니다. Partition, Leader/follower, Replicas, Isr(In Sync Replica) 정보를 보여줍니다. 여기서 ISR은 kafka 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 나타냅니다. 만일, 브로커 중 1대의 서버가 중지된 상태라면 Isr 은 2개 만 표시됩니다. 3번 브로커 서버가 중지되었다면 Leader는 1 또는 2가 되고 Isr 은 1,2 가 됩니다.

Topic: my-kafka-topic   TopicId: 2c7cvTC1QGKy2bu18revoA PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: my-kafka-topic   Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: my-kafka-topic   Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: my-kafka-topic   Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

 

Topics 삭제

이전버전 : 

이전 버전에서는 kafka server.properties파일에 설정을 하고 kafka broker를 재기동해야 topic을 삭제할 수 있습니다.

# config/server.properties파일
delete.topic.enable = true

server.properties에 delete.topic.enable=true 설정을 하고 kafka borker를 재기동 한 다음, kafka-topics.sh로 삭제를 해주면 됩니다. 

$  bin/kafka-topics.sh --delete --topic my-kafka-topic --bootstrap-server localhost:9093

최신버전 : (3.0 version)

KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다. 
브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)

elete.topic.enable 기본 설정은 true입니다.

$  bin/kafka-topics.sh --delete --topic my-kafka-topic --bootstrap-server localhost:9093

 

동적 파라미터 설정

 

topic config 확인 :

bin/kafka-configs.sh --bootstrap-server localhost:9093  --entity-type topics --entity-name movie --describe


broker config 확인 :

bin/kafka-configs.sh --bootstrap-server localhost:9093  --entity-type brokers --entity-name 1 --describe

 

동적 파라미터 변경 및 topic 삭제 :

  1. bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-default --alter --add-config delete.topic.enable=true
  2. bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-kafka-topic
  3. bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-default --alter --delete-config delete.topic.enable

==> 오류(apache kafka 에는 동적으로 변경된다고 설명되어 있는데 오류 발생

1.

Caused by: org.apache.kafka.common.errors.InvalidRequestException: Invalid config value for resource ConfigResource(type=BROKER, name=''): Cannot update these configs dynamically: Set(delete.topic.enable)

2.

Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)

3.

Invalid config(s): delete.topic.enable

 

아직 동적 config 설정이 정상작동 하지 않는 것 같다. 패치버전이나 더 상위 버전이 나오는 다시 확인해 본다.

 

Zookeeper 쉘을 이용한 방법

* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.

zookeeper-shell.sh 를 이용하여서 znode를 삭제할 수 있습니다.

rmr명령은 더이상 사용되진 않는다. delete나 deleteall로 삭제합니다. delete 를 사용할 경우 message가 남아있는 경우 삭제가 되지 않습니다. 이럴 경우, deleteall로 삭제할 수 있습니다.

$ bin/zookeeper-shell.sh localhost:2181
ls /brokers/topics 
## 실행결과
[__consumer_offsets, movie, music, my-kafka-topic]

rmr /brokers/topics/my-kafka-topic
## 실행결과. rmr은 사용할 수 없다.
Command not found: Command not found rmr 

## The zkCli provides the rmr (deprecated) or deleteall command for this purpose


delete /brokers/topics/my-kafka-topic
## 실행결과
Node not empty: /brokers/topics/my-kafka-topic

deleteall /brokers/topics/my-kafka-topic

ls /brokers/topics 
[__consumer_offsets, movie, music]

quit

## 삭제된 topic을 다시 생성해본다.

$ bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2
## 오류가 발생한다. topic을 kafka-topics.sh 로 조회하면 목록이 보인다.
## zookeeper와 kafka broker 간이 연동이 잘 안되는 것 같다.
Error while executing topic command : Topic 'my-kafka-topic' already exists

* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.

 

Producers: Topic에 메시지 보내기

"생산자(Producer)"는 데이터를 Kafka 클러스터에 넣는 프로세스입니다.

bin 디렉토리의 명령어는 콘솔에 텍스트를 입력할 때마다 클러스터에 데이터를 입력하는 콘솔 생성자(Producer)를 제공합니다.

콘솔 생산자(Producer)를 시작하려면 다음 명령을 실행합니다.

bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-kafka-topic
  • Broker-list는 생산자가 방금 프로비저닝한 브로커의 주소를 가리킵니다.
  • topic은 데이터가 입력하려는 주제(topic)를 지정합니다.

이제 데이터를 입력하고 Enter 키를 치면, Kafka 클러스터에 텍스트를 입력할 수 있는 명령 프롬프트가 표시됩니다.

  • 컨슈머가 데이터를 가져가도 Topic 데이터는 삭제되지 않습니다.
>first insert
>두번째 입력

프로듀서가 데이터를 보낼 때 '파티션키'를 지정해서 보낼 수 있습니다.

파티션키를 지정하지 않으면, 라운드로빈 방식으로 파티션에 저장하고,

파티션키를 지정하면, 파티셔너가 파티션키의 HASH 값을 구해서 특정 파티션에 할당합니다.

 

카프카에서 kafka-console-producer.sh로 Consumer에게 메세지를 보낼 때 기본적으로 key값이 null로 설정되어 있습니다. 이럴 때 설정에서 parse.key 및 key.separator 속성을 설정하면 key와 value가 포함된 메시지를 보낼 수 있습니다.

$ /bin/kafka-console-producer.sh \
  --broker-list localhost:9093 \
  --topic my-kafka-topic \
  --property "parse.key=true" \
  --property "key.separator=:"
  • parse.key : key와 value 파싱 활성화 여부 
  • key.separator : key와 value를 구분해주는 구분자
  • print.key : 화면에 key 를 보여줄 지 여부를 지정
  • print.value : 화면에 value 를 보여줄 지 여부를 지정

파일로 메시지 보내기

kafka-console-producer --broker-list localhost:9092 --topic test_topic < file.log

 

Consumers

Kafka consumers 를 사용하여 클러스터에서 데이터를 읽을 수 있습니다. 이 경우 이전 섹션에서 생성한 데이터를 읽습니다.
consumer를  시작하려면 다음 명령을 실행합니다. 아래와 같이 위에서 pruducer가 입력한 데이터가 출력됩니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning

first insert
두번째 입력
  • bootstrap-server는 클러스터의 브로커 중 하나일 수 있습니다.
  • Topic은 생산자가 클러스터에 데이터를 입력한 Topic(주제)과 동일해야 합니다.
  • from-beginning은 클러스터에 현재 가지고 있는 모든 메시지를 원한다고 클러스터에 알립니다.
  • 컨슈머 그룹이 다른 새로운 컨슈머가 auto.offset.reset=earliest 설정으로 데이터를 0번부터 가져갈 수 있습니다. 설정하지 않으면 새롭게 토픽에 생성된 메세지만 읽어옵니다.

위의 명령을 실행하면 콘솔에 로그온한 생산자가 입력한 모든 메시지가 즉시 표시됩니다.
소비자가 실행되는 동안 생산자가 메시지를 입력하면 실시간으로 콘솔에 출력되는 것을 볼 수 있습니다.

 

이런 식으로 Kafka는 지속적인 메시지 대기열처럼 작동하여 소비자가 아직 읽지 않은 메시지를 저장하고 소비자가 실행되는 동안 새 메시지가 오는 대로 전달합니다.

 

카프카에서는 consumer 그룹이라는 개념이 있는데 --consumer-property group.id=group-01 형식으로 consumer 그룹을 지정할 수 있습니다. 카프카 브로커는 컨슈머 그룹 중 하나의 컨슈머에게만 이벤트를 전달합니다. 동일한 이벤트 처리를 하는 컨슈머를 clustering 한 경우에 컨슈머 그룹으로 지정하면 클러스터링된 컨슈머 중 하나의 서버가 데이터를 수신합니다.

 

key와 value를 콘솔창에 표시하기 위해서는 --property print.key=true --property key.separator=: 를 설정합니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning \
   --property print.key=true --property key.separator=:

"key":"second key value"
"secone message":null
key3:third

 

Consumer Group

기존의 Message Queue 솔루션에서는 컨슈머가 메시지를 가져가면, 해당 메세지를 큐에서 삭제된다. 즉, 하나의 큐에 대하여 여러 컨슈머가 붙어서 같은 메세지를 컨슈밍할 수 없다. 하지만 Kafka는, 컨슈머가 메세지를 가져가도 큐에서 즉시 삭제되지 않으며, 하나의 토픽에 여러 컨슈머 그룹이 붙어 메세지를 가져갈 수 있다.

또한 각 consumer group마다 해당 topic의 partition에 대한 별도의 offset을 관리하고, group에 컨슈머가 추가/제거 될 때마다 rebalancing을 하여 group 내의 consumer에 partition을 할당하게 된다. 이는 컨슈머의 확장/축소를 용이하게 하고, 하나의 MQ(Message Queue)를 컨슈머 별로 다른 용도로 사용할 수 있는 확장성을 제공한다.

 

아래 명령어로 컨슈머 그룹을 지정합니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --group [그룹 이름]
  • 그룹에 속하는 컨슈머가 여러개이면 로드밸런싱을 통해 자동으로 메세지를 분배

2. 그룹 정보 확인

kafka-console-groups.sh --bootstrap-server localhost:9092 --describe --group [그룹 이름]
  • 위에 대한 명령어로 아래의 정보들을 얻을 수 있다.
    -
    • CURRENT-OFFSET : Consumer Group이 Kafka에서 읽은 offset
    • LOG-END-OFFSET : 해당 topic, partition의 마지막 offset
    • LAG : LOG-END-OFFSET 과 CURRENT-OFFSET의 차이
  • LAG의 경우 topic의 partition단위로 읽어야 할 남은 데이터 수를 의미한다

 

Offsets

오프셋 리셋

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [그룹 이름] --reset-offsets --to-earlist --execute --topic [토픽 이름]
  • 오프셋을 리셋하여 데이터를 다시 읽어드릴 때 사용
  • reset-offsets의 옵션으로 --to-earlist , --to-datetime, --from-file, --to-current, --shift-by 등이 있다
    --shift-by -2 를 하면 오프셋이 2칸 전으로 이동한다
  • execute의 옵션으로 --all-topics 와 --topic [그룹 이름] 이 있다

 

복제 테스트 : 브로커가 Offline 되면 어떤 일이?

이제 시스템에 Kafka 클러스터가 설정되었으므로 데이터 복제를 테스트해 보겠습니다.

실행한 3개의 브로커 중 첫 번째 브로커를 Control + C 키로 종료합니다. 

# 첫번째 브로커 중지

[2022-01-01 02:09:13,095] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-01-01 02:09:13,097] INFO Broker and topic stats closed (kafka.server.BrokerTopicStats)
[2022-01-01 02:09:13,098] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 02:09:13,099] INFO [KafkaServer id=1] shut down completed (kafka.server.KafkaServer)


실행한 3개의 브로커 중 하나를 종료해도 클러스터가 클러스터가 제대로 실행되는지 아래 명령어를 실행합니다.
다른 터미널 창에서 다른 소비자를 시작해 보겠습니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic my-kafka-topic --from-beginning --group group2

real-time data
두번째 입력
awesome!!!
first insert

 

여기에 추가한 것은 이 소비자를 다른 소비자와 구별하는 그룹 옵션입니다. 명령을 실행하면 처음부터 입력된 모든 메시지가 표시됩니다. 동일한 소비자인 경우에는 이미 데이터를 읽었기 때문에 데이터가 보지이 않습니다.


중개인 중 하나가 중지되었지만 데이터는 손실되지 않았습니다. 이는 이전에 설정한 복제 계수(replication-factor)를 2로 설정하여 데이터 사본이 여러 브로커에 존재하도록 했기 때문입니다.

 

? 의문점 : 위에서 보는 것처럼 입력한 순서대로 출력되지 않았습니다. 한글때문인지 아니면 원래 순서가 없는 것인지 정확한 이유는 파악하지 못했습니다. 추후 확인이 필요한 사안입니다.

 

 

다른 브로커를 중단하면 어떻게 될까요? 

본 문서에서는 두번 째 브로커도 중지하고 테스트 해보겠습니다. 

이 경우에는 데이터 조회가 되지 않네요? 브로커를 3개로 구성한 경우 2개의 브로커가 장애가 발생한 경우 정상 작동하지 않습니다. 즉, 브로커의 과반수 이상에 장애가 발생하면 정상 작동하지 않습니다.

Kafka 생산자 및 소비자 구현

클러스터를 실행하고 나면 애플리케이션 코드에서 생산자와 소비자를 구현할 수 있습니다.
Golang 또는 Node.js에서 Kafka 생산자 및 소비자를 구현하는 방법은 링크된 문서를 참고하시면 됩니다.

 

<참고사이트>

https://www.sohamkamani.com/install-and-run-kafka-locally/

 

 

 

<참고사이

 

[ 참고 자료 ]

https://medium.com/@kiranps11/kafka-and-zookeeper-multinode-cluster-setup-3511aef4a505

https://www.instaclustr.com/blog/the-power-of-kafka-partitions-how-to-get-the-most-out-of-your-kafka-cluster/

 

1.3 Quick Start

STEP 1: GET KAFKA

최신 Kafka 릴리스를 다운로드하고 압축을 풉니다.

$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0

STEP 2: START THE KAFKA ENVIRONMENT

로컬 환경에 Java 8 이상이 설치되어 있어야 하고, zookeeper 가 설치 및 실행되고 있어야 합니다.
zookeeper가 설치되어 있을 경우, zookeeper 를 시작합니다.

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

다른 터미널 세션을 열고 kafka 서버를 실행합니다.

$ bin/kafka-server-start.sh config/server.properties

모든 서비스가 성공적으로 시작되면 기본 Kafka 환경이 실행되고 사용할 준비가 된 것입니다.

STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS

Kafka는 여러 시스템에서 이벤트(문서에서 레코드 또는 메시지라고도 함)를 읽고, 쓰고, 저장하고, 처리할 수 있는 분산 이벤트 스트리밍 플랫폼입니다.
이벤트의 예로는 결제 거래, 휴대폰의 지리적 위치 업데이트, 배송 주문, IoT 장치 또는 의료 장비의 센서 측정 등이 있습니다. 이러한 이벤트는 주제별로 구성 및 저장됩니다. 매우 단순화된 주제는 파일 시스템의 폴더와 유사하고 이벤트는 해당 폴더의 파일로 생각하면 이해가 쉽습니다.
이벤트를 작성하기 전에 topic을 작성해야 합니다. 다른 터미널 세션을 열고 다음을 실행하여 topic을 생성합니다.

$ bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092

Kafka의 모든 명령줄 도구에는 추가 옵션이 있습니다. 인수 없이 kafka-topics.sh 명령을 실행하여 사용 정보를 표시합니다. 예를 들어 topic의 파티션 수와 같은 세부 정보를 표시합니다.

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: 1ejyYwvJQF65aAk_5N-lgg PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: quickstart-events        Partition: 0    Leader: 3       Replicas: 3     Isr: 3

STEP 4: WRITE SOME EVENTS INTO THE TOPIC

Kafka 클라이언트는 이벤트 쓰기(또는 읽기)를 위해 네트워크를 통해 Kafka 브로커와 통신합니다. 일단 수신되면 브로커는 필요한 기간 동안(또는영원히) 내구성 및 내결함성 방식으로 이벤트를 저장합니다.
콘솔 생산자 클라이언트를 실행하여 주제에 이벤트를 작성합니다. 입력하는 각 행은 topic에 이벤트로 작성됩니다.

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

Ctrl-C를 사용하여 생산자 클라이언트를 중지할 수 있습니다.

STEP 5: READ THE EVENTS

다른 터미널 세션을 열고 콘솔 소비자 클라이언트를 실행하여 방금 생성한 이벤트를 읽습니다. 최초 수행 시 "internal topic __consumer_offsets" 이 자동 생성됩니다.

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl-C를 사용하여 언제든지 소비자 클라이언트를 중지할 수 있습니다.
이벤트는 Kafka에 영구적으로 저장되기 때문에 원하는 만큼 많은 소비자가 이벤트를 읽을 수 있습니다. 또 다른 터미널 세션을 열고 이전 명령을 다시 실행하여 이를 쉽게 확인할 수 있습니다.

 

Topics 삭제

이전버전 : 

이전 버전에서는 kafka server.properties파일에 설정을 하고 kafka broker를 재기동해야 topic을 삭제할 수 있습니다. 단, 이벤트를 읽고 있는 consumber는 에러(Error while fetching metadata with correlation)가 발생합니다.  

# config/server.properties파일
delete.topic.enable = true

server.properties에 delete.topic.enable=true 설정을 하고 kafka borker를 재기동 한 다음, kafka-topics.sh로 삭제를 해주면 됩니다. 

$  bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

최신버전 : (3.0 version)

KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다. 
브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)

elete.topic.enable 기본 설정은 true입니다.

$  bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

 

동적 파라미터 설정

STEP 6: IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT

관계형 데이터베이스 또는 기존 메시징 시스템과 같은 기존 시스템과 이미 이러한 시스템을 사용하는 많은 애플리케이션에 많은 데이터가 있을 수 있습니다. Kafka Connect를 사용하면 외부 시스템에서 Kafka로 또는 그 반대로 데이터를 지속적으로 수집할 수 있습니다. 이 프로세스를 더욱 쉽게 만들기 위해 수백 개의 커넥터를 사용할 수 있습니다.
Kafka에서 데이터를 지속적으로 가져오거나 내보내는 방법에 대해 자세히 알아보려면 Kafka Connect section 섹션을 살펴보세요.

STEP 7: PROCESS YOUR EVENTS WITH KAFKA STREAMS

데이터가 Kafka에 이벤트로 저장되면 Java/Scala용 Kafka Streams 클라이언트 라이브러리를 사용하여 데이터를 처리할 수 있습니다. 이를 통해 입력 및/또는 출력 데이터가 Kafka 주제에 저장되는 실시간 애플리케이션 및 마이크로서비스를 구현할 수 있습니다. Kafka Streams는 클라이언트 측에서 표준 Java 및 Scala 애플리케이션을 작성하고 배포하는 단순성과 Kafka의 서버 측 클러스터 기술의 이점을 결합하여 이러한 애플리케이션을 확장성, 탄력성, 내결함성 및 분산성을 높입니다. 라이브러리는 정확히 한 번 처리, 상태 저장 작업 및 집계, 윈도우, 조인, 이벤트 시간 기반 처리 등을 지원합니다.

Kafka Streams WordCount 알고리즘을 구현하는 방법은 다음과 같습니다.

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

상세 내용은 Kafka Streams demoapp development tutorial 을 참고합니다.

STEP 8: TERMINATE THE KAFKA ENVIRONMENT

  1. Ctrl-C를 사용하여 생산자 및 소비자 클라이언트 중지
  2. Ctrl-C를 사용하여 Kafka 브로커 중지.
  3. Ctrl-C를 사용하여 Kafka 브로커 중지

생성한 이벤트를 포함하여 로컬 Kafka 환경의 데이터도 삭제하려면 다음 명령을 실행합니다.

$ rm -rf /tmp/kafka-logs /tmp/zookeeper

 

APACHE KAFKA

Apache Kafka란 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위한

오픈 소스 분산 이벤트 스트리밍 플랫폼입니다.

CORE CAPABILITIES

  • HIGH THROUGHPUT
  • SCALABLE
  • PERMANENT STORAGE
  • HIGH AVAILABILITY

ECOSYSTEM

  • BUILT-IN STREAM PROCESSING
  • CONNECT TO ALMOST ANYTHING
  • CLIENT LIBRARIES
  • LARGE ECOSYSTEM OPEN SOURCE TOOLS

TRUST & EASE OF USE

  • MISSION CRITICAL
  • TRUSTED BY THOUSANDS OF ORGS
  • VAST USER COMMUNITY
  • RICH ONLINE RESOURCES

WHY KAFKA?

이벤트 스트리밍이란?

이벤트 스트리밍은 인체의 디지털 중추 신경계라고 할 수 있으며  '상시 가동' 세상을 위한 기반 기술입니다.
기술적으로 말하면 이벤트 스트리밍은 데이터베이스, 센서, 모바일 장치, 클라우드 서비스 및 소프트웨어 애플리케이션과 같은 이벤트 소스에서 이벤트 스트림의 형태로 실시간으로 데이터를 캡처하는 방식입니다.

 

나중에 검색할 수 있도록 이러한 이벤트 스트림을 영구적으로 저장합니다. 소급적으로나 실시간으로 이벤트 스트림을 조작, 처리 및 반응합니다. 필요에 따라 이벤트 스트림을 다른 목적지 기술로 라우팅하는 단계를 포함합니다. 이벤트 스트리밍은 올바른 정보가 적시에 적절한 위치에 있도록 데이터의 지속적인 흐름과 해석을 보장합니다.

 

이벤트 스트리밍은 무엇에 사용할 수 있습니까?

이벤트 스트리밍은 수많은 산업 및 조직의 다양한 사용 사례에 적용됩니다.

  • 증권 거래소, 은행 및 보험과 같은 실시간으로 지불 및 금융 거래를 처리합니다.
  • 물류 및 자동차 산업과 같이 자동차, 트럭, 차량 및 선적을 실시간으로 추적하고 모니터링합니다.
  • 공장 및 풍력 발전 단지와 같은 IoT 장치 또는 기타 장비의 센서 데이터를 지속적으로 캡처하고 분석합니다.
  • 소매, 호텔 및 여행 산업, 모바일 애플리케이션과 같은 고객 상호 작용 및 주문을 수집하고 즉시 대응합니다.
  • 병원에서 치료 중인 환자를 모니터링하고 상태 변화를 예측하여 응급 상황에서 시기 적절한 치료를 보장합니다.
  • 회사의 여러 부서에서 생성된 데이터를 연결, 저장 및 사용 가능하게 만드는 것.
  • 데이터 플랫폼, 이벤트 중심 아키텍처 및 마이크로서비스의 기반 역할을 합니다.

 

Apache Kafka 는 이벤트 스트리밍 플랫폼

Kafka는 다음 세 가지 주요 기능의 단일 솔루션으로 엔드투엔드 이벤트 스트리밍 사용 사례를 구현할 수 있습니다.

  1. 다른 시스템에서 데이터를 import/export를 포함하여 이벤트 스트림을 게시(쓰기) 및 구독(읽기)
  2. 이벤트 스트림을 원하는 만큼 내구성 있고 안정적으로 저장
  3. 이벤트 스트림을 발생 시 또는 소급하여 처리

이 모든 기능은 분산, 확장성, 탄력성, 내결함성이 있으며 안전한 방식으로 제공됩니다.

Kafka는 베어메탈 하드웨어, 가상 머신, 컨테이너, 온프레미스 및 클라우드에 배포할 수 있습니다.

Kafka 환경을 자가 관리하거나 다양한 공급업체에서 제공하는 완전 관리형 서비스를 사용할 수 있습니다.

 

Kafka는 어떻게 작동하나?

Kafka는 고성능 TCP 네트워크 프로토콜을 통해 통신하는 서버와 클라이언트로 구성된 분산 시스템입니다. 

베어메탈 하드웨어, 가상 머신, 온프레미스 및 클라우드 환경의 컨테이너에 배포할 수 있습니다.

서버:

Kafka는 여러 데이터 센터 또는 클라우드 region들에 있는 다수의 서버 클러스터로 실행됩니다.

이러한 서버 중 일부는 브로커라고 하는 스토리지 계층을 구성합니다.

다른 서버는 Kafka Connect를 실행하여  Kafka를 관계형 데이터베이스 및 기타 Kafka 클러스터와 같은 기존 시스템과 통합하기 위해, 데이터를 이벤트 스트림으로 지속적으로 가져오고 내보냅니다.

미션 크리티컬한 사용 사례를 구현할 수 있도록 Kafka 클러스터는 확장성이 뛰어나고 내결함성이 있습니다. 서버 중 하나에 장애가 발생하면 다른 서버가 작업을 인계받아 데이터 손실 없이 지속적인 운영을 보장합니다.

 

클라이언트: 

네트워크 문제나 기계 장애가 발생한 경우에도 이벤트 스트림을 병렬로 대규모로 내결함성 방식으로 읽고, 쓰고, 처리하는 분산 애플리케이션 및 마이크로서비스를 만들 수 있습니다.

클라이언트는 고수준 Kafka Streams 라이브러리를 포함하여 Java 및 Scala에서 사용할 수 있으며 Go, Python, C/C++ 및 기타 여러 프로그래밍 언어 및 REST API 에서 사용할 수 있습니다.

 

Kafka 주요 개념과 용어

이벤트는 세상 또는 비즈니스에서 "무언가 발생"했다는 사실을 기록합니다.

문서에서는 기록 또는 메시지라고도 합니다. Kafka에서 데이터를 읽거나 쓸 때 이벤트 형식으로 이 작업을 수행합니다.

개념적으로 이벤트에는 키, 값, 타임스탬프 및 선택적 메타데이터 헤더가 있습니다.

 

다음은 이벤트의 예입니다.

  • Event key: "Alice"
  • Event value: "Made a payment of $200 to Bob"
  • Event timestamp: "Jun. 25, 2020 at 2:06 p.m."

생산자(Producers)는

Kafka에 이벤트를 게시(쓰기)하는 클라이언트 응용 프로그램이고

소비자(consumers)는

이러한 이벤트를 구독(읽기 및 처리)하는 응용 프로그램입니다.

Kafka에서 생산자와 소비자는 완전히 분리되고 서로 알 수 없으며, 이는 Kafka가 높은 확장성을 달성하기 위한 핵심 설계 요소입니다. 예를 들어 생산자는 소비자를 기다릴 필요가 없습니다. Kafka는 이벤트를 정확히 한 번 처리하는 기능과 같은 다양한 보장성을 제공합니다.

 

이벤트는 주제(topics)로 구성되고 영구적으로 저장됩니다. 간단하게 주제는 파일 시스템의 폴더와 유사하고 이벤트는 해당 폴더의 파일입니다. 예로써 주제 이름은 "지불"일 수 있습니다.

Kafka의 토픽은 항상 다중 생성자 및 다중 구독자입니다. 주제에는 이벤트를 작성하는 0개, 1개 또는 많은 생성자와 이러한 이벤트를 구독하는 0개, 1개 또는 많은 소비자가 있을 수 있습니다. 주제의 이벤트는 필요한 만큼 자주 읽을 수 있습니다. 기존 메시징 시스템과 달리 이벤트는 소비 후 삭제되지 않습니다. 대신 주제별 구성 설정을 통해 Kafka가 이벤트를 유지해야 하는 기간을 정의할 수 있고, 이보다 오래된 이벤트가 삭제됩니다.

Kafka의 성능은 데이터 크기와 관련하여 사실상 일정하므로 데이터를 장기간 저장하는 것이 좋습니다.

 

주제는 분할(partitioned)되어 있으며, 이는 다른 Kafka 브로커에 있는 여러 "버킷(buckets)"에 분산되어 있음을 의미합니다.

데이터의 분산 배치는 클라이언트 애플리케이션이 동시에 많은 브로커에서 데이터를 읽고 쓸 수 있도록 하기 때문에 확장성에 매우 중요합니다. 새 이벤트가 주제에 게시되면 실제로 해당 주제의 파티션 중 하나에 추가됩니다.

동일한 이벤트 키(예: 고객 또는 차량 ID)가 있는 이벤트는 동일한 파티션에 기록되고 Kafka는 지정된 파티션의 모든 소비자가 항상 기록된 것과 정확히 동일한 순서로 해당 파티션의 이벤트를 읽도록 보장합니다.

Figure: 

이 예제 주제에는 4개의 파티션 P1~P4가 있습니다. 두 개의 서로 다른 생산자가 네트워크를 통해 서로 독립적으로 새 이벤트를 주제에 게시하고 있습니다. 동일한 키를 사용하는 이벤트(그림에서 색상으로 표시)는 동일한 파티션에 기록됩니다. 두 생산자 모두 동일한 파티션에 쓸 수도 있습니다.

데이터 내결함성 및 고가용성을 만들기 위해 모든 주제를 여러 지리적 지역이나 데이터 센터에 걸쳐 복제할 수 있습니다. 일이 잘못될 경우에 대비하여 데이터 복사본이 있는 여러 브로커가 있도록 합니다. 일반적인 프로덕션 설정은 복제 3팩터입니다. 즉, 항상 3개의 데이터 복사본이 있습니다. 이 복제는 주제 파티션 수준에서 수행됩니다.

 

We build for multiple versions of Scala

: 여러 버전의 Scala로 빌드되었다. 즉 Kafka는 Scala로 만들었다.

 

What is Scala?

스칼라는 함수형 객체지향 프로그래밍 언어 입니다. 스칼라는 자바의 복잡한 단점을 해결하기 위해 만들어 졌습니다. 스칼라는 자바 바이트 코드를 사용하기 때문에 JVM위에서 실행 시킬 수 있습니다. 또한 자바의 클래스들을 바로 사용할 수도 있고, 자바에서도 스칼라 코드들을 호출할 수 있습니다. 

 

 

 

+ Recent posts