메시지 키가 있을 때 메시지 키가 있을 경우에는 파티셔너의 종류와 관계없이 동일하게 동작합니다. 메시지 키의 해쉬값을 구해서 해당 해쉬값과 파티션을 매칭하여 적재합니다. 이로 인해 동일한 메시지 키를 가진 레코드는 동일한 파티션에 들어가게 됩니다. 다만 주의할 점은 파티션 개수가 늘어날 때 입니다. 파티션 개수가 늘어나면 메시지 키 해시값과 파티션 매칭이 틀어지게 되어 파티션 증가 전후로 동일한 메시지 키더라도 다른 파티션에 레코드가 들어갈 수 있으므로 주의해야 합니다.
UniformStickyPartitioner
메시지 키의 존재 유무에 따라 다르게 동작한다.
메시지 키가 있는 경우
메시지 키를 가진 레코드는 파티셔너에 의해서 특정한 해시값이 생성된다.
이 해시값을 기준으로 어느 파티션으로 들어갈지 정해진다.
토픽에 파티션이 2개가 있는 경우, 파티셔너의 해시로직에 의해서 키값에 따라 다른 파티션에 들어가게 된다.
동일한 메시지 키값을 가진 레코드는동일한 해시값을 만들기 때문에항상 동일한 파티션에 들어가는 것을 보장한다.
이로 인해,“순서를 지켜서 처리할 수 있다”는 장점이 있다.
파티션 한 개의 내부에서는 큐처럼 동작하기 때문에 순서를 지킬 수 있는 것이다.
UniformStickyPartitioner
유니폼 스티키 파티션은 2.4.0 부터 기본 설정으로 사용되는 파티셔너입니다. 이 파티셔너는 스티키 파티셔너라고도 부릅니다. 스티키 파티션은 라운드-로빈 파티셔너와 다르게 프로듀서 내부동작에 특화되어 있습니다. 특히 배치전송에 특화되어 있습니다.
프로듀서는 파티션에 데이터를 전송하기 전에 Accumulator에 데이터를 버퍼로 쌓아 놓고 발송합니다. 스티키 파티셔너를 사용할 경우 Accumulator의 버퍼를 채워서 보내기 때문에 성능향상에서 유리합니다.
KIP-480 implements a new partitioner, which chooses the sticky partition that changes when the batch is full if no partition or key is present. Using the sticky partitioner helps improve message batching, decrease latency, and reduce the load for the broker. Some of the benchmarks which Justine Olshan discusses on the KIP show up to a 50% reduction in latency and 5–15% reduction in CPU utilization.
RoundRobinPartitioner
메시지 키가 없을 경우, 라운드-로빈 방식으로 데이터가 들어오는대로 파티션을 순회하면서 레코드를 넣습니다. 파티션 개수가 늘어날때도 마찬가지로 순회하면서 지속적으로 데이터를 분배하면서 넣습니다.
기본 설정되는 파티션 어사이너입니다. 토픽의 파티션을 숫자기준으로 나열하고, 컨슈머의 이름을 사전순으로 나열한 뒤에 배정하는 정확히 반으로 나누어 배정합니다. 만약 딱 반으로 안나뉘어지는 홀수개의 파티션을 나눌 경우에는 앞쪽 순서의 컨슈머가 파티션을 더 많이 할당합니다.
예를들어 파티션이 3개인 토픽(p0, p1, p2)과 컨슈머가 2개(c0, c1)가 있다고 가정해 봅시다. 파티션 3개를 컨슈머에 분배하려면 2개와 1개를 배분해야 합니다. 앞쪽 순서의 컨슈머는 파티션을 2개 가지게 됩니다. 이로 인해 아래와 같이 매칭됩니다.
RoundRobinAssignor
라운드-로빈 어사이너는 파티션을 컨슈머에 번갈아가며 할당하는 방식입니다.
예를들어 파티션이 3개인 토픽(p0, p1, p2)과 컨슈머 2개(c0, c1)가 있다고 가정해 봅시다. 파티션 3개를 컨슈머에 분배하는데 파티션 순서를 번갈아 가면서 할당하게 되어 아래와 같이 매칭됩니다.
라운드-로빈 어사이너를 사용할 경우 특정 상황에서는 일부 컨슈머에 파티션이 몰릴 수도 있습니다. 예를들어 파티션이 1개인 토픽, 2개인 토픽, 3개인 토픽이 있고 3개의 컨슈머가 각각 토픽 1번, 토픽 2번, 토픽 3번을 매칭하면 아래와 같이 라운드-로빈 로직으로 인해 일부 컨슈머에 파티션이 몰릴 수 있으므로 주의해야 합니다.
StickyAssignor
스티키 어사이너는 두가지 목적으로 사용됩니다. 첫번째는 최대한 파티션을 균등하게 매칭하기 위함입니다. 균등하게 매칭이라고 뜻하는것은 아래와 같은 정책을 내포합니다.
- 컨슈머에 매칭된 파티션의 개수가 최대 1개 이상을 넘지 않도록 합니다. - each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.
두번째는 리밸런싱이 일어날 경우 최대한 파티션의 이동을 줄이는데 목적이 있습니다. 파티션 매칭 이동을 줄이면 리밸런싱 발생시 오버헤드를 줄일 수 있습니다.
스티키 어사이너를 사용할 경우 최대한 균등하게 파티션을 컨슈머에 나눕니다. 라운드로빈과 비슷하다고 생각할수도 있지만 실제로 동작은 그렇지 않습니다. 그리고 리밸런싱 발생시에 동작이 다름을 확인할 수 있습니다.
예를 들어 3개의 컨슈머가 있고 3개의 토픽(각각 1개, 2개, 3개 파티션 보유)이 있다고 가정해봅시다. 라운드 로빈 어사이너로 분배했을 경우에는 이전 예시와 같이 몰리는 현상이 발생할 수 있지만 스티키 파티셔너의 경우 다르게 동작하여 아래와 같이 매칭되는 것을 확인할 수 있습니다.
추가적으로 스티키 어사이너를 사용할 경우 이미할당된 파티션의 대부분이 남아있다는 특징을 리밸런스 리스너에서 활용할 수 있습니다. 관련 내용은 아래 링크에서 확인할 수 있습니다.
카프카(Kafka)에서는 다양한 언어로 데이터를 주고 받는 기능을 제공하는데 본 포스팅은 파이썬(Python)으로 구현하는 프로듀서(producer)/컨슈머(consumer) 즉 데이터를 보내고 받는 방법을 설명한다.
파이썬으로 카프카를 호출하는 방법이 대표적으로 2가지 방법이 존재하는 것 같다. 하나는 카프카를 만든 제이 크렙스(Jay Kreps)가 만든 회사인 confluent가 제공하는 라이브러리이고, 다른 하나는 kafka-python이라는 라이브러리를 사용하는 방법이다. 후자인 kafka-python을 범용적으로 많이 사용하는데 성능은 컨플루언트의 라이브러리가 더 좋다.
후자를 사용하는 이유는 단하나 confluent는 c로 만든 라이브러리를 호출하여 사용하는 방식이라 별도의 설치과정이 존재하기 때문이다. 즉 성능을 중시 여긴다면 c로 만든 confluent를 사용하면 될 것이고, 쉽게 사용하는 것을 목적으로 한다면 후자를 사용해도 상관이 없을 것이다.
kafka-pyhon 라이브러리 설치
pip install kafka-python
kafka-python 설치 과정
Producer 구현
라이브러리를 설치했으면 아래와 같은 코드를 작성하고 실행해본다.
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
start = time.time()
for i in range(10000):
data = {'str' : 'result'+str(i)}
producer.send('test', value=data)
producer.flush()
print("elapsed :", time.time() - start)
위 코드는 json 형태로 data를 생성하는데 "result"라는 문자열에 + i값을 만번까지 증가시켜서 test라는 토픽으로 보내는 내용이다. 실행을 하게 되면 최종적으로는 만번 전달하는데 걸린 시간이 출력이 되는 것인데 필자는 다음과 같은 속도가 나왔다.
프로듀서 옵션
KafkaProducer를 선언할 때 acks가 0으로 확인 없이 전송하며, compression_type이 gzip으로 gzip형태로 압축하여 전송을 한다. 그리고 카프카 서버가 여러대일 경우 bootstrap_servers에 브로커 리스트를 배열 값으로 지정을 하면 되며 자세한 옵션 정보는 아래와 같다.
프로듀서 옵션 정보
Produce 결과
console로 확인하는 producer 실시간 전송내역들
C:\Users\user\anaconda3\python.exe D:/rainbow/application/producer.py
elapsed : 5.04697060585022
Process finished with exit code 0
만번 전송하는데 5.04초가 걸렸으니 초당 약 2천건 정도 보내졌다는 것을 알 수 있다.
Consumer 구현
이제 데이터를 전송했으면 받는 부분인 consumer를 구현해보도록 한다.
from kafka import KafkaConsumer
from json import loads
# topic, broker list
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')),
consumer_timeout_ms=1000
)
# consumer list를 가져온다
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
message.topic, message.partition, message.offset, message.key, message.value
))
print('[end] get consumer list')
Consumer를 선언할 때, test라는 topic을 기준으로 카프카 서버는 localhost를 지정하였다.
Consumer 옵션
옵션
설명
bootstrap_servers
카프카 클러스터들의 호스트와 포트 정보 리스트
auto_offset_reset
earliest : 가장 초기 오프셋값 latest : 가장 마지막 오프셋값 none : 이전 오프셋값을 찾지 못할 경우 에러
enable_auto_commit
주기적으로 offset을 auto commit
group_id
컨슈머 그룹을 식별하기 위한 용도
value_deserializer
producer에서 value를 serializer를 했기 때문에 사용
consumer_timeout_ms
이 설정을 넣지 않으면 데이터가 없어도 오랜기간 connection한 상태가 된다. 데이터가 없을 때 빠르게 종료시키려면 timeout 설정을 넣는다.
foreground로 실행되니 최종 "Creating new log file" 보이면 다음 단계를 진행합니다.
[2022-01-01 00:34:06,106] INFO ______ _ (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO |___ / | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO / / ___ ___ | | __ ___ ___ _ __ ___ _ __ (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO / / / _ \ / _ \ | |/ / / _ \ / _ \ | '_ \ / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO / /__ | (_) | | (_) | | < | __/ | __/ | |_) | | __/ | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO /_____| \___/ \___/ |_|\_\ \___| \___| | .__/ \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO | | (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,110] INFO |_| (org.apache.zoo
..... .....
[2022-01-01 00:34:10,369] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
[2022-01-01 00:34:10,373] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
[2022-01-01 01:08:12,522] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2022-01-01 01:08:14,923] WARN fsync-ing the write ahead log in SyncThread:0 took 2389ms which will adversely effect operation latency.File size is 67108880 bytes. See the ZooKeeper troubleshooting guide (org.apache.zookeeper.server.persistence.FileTxnLog)
config/zookeeper.properties 파일을 보면 clientPort 속성이 2181로 설정된 것을 볼 수 있습니다. 이 속성은 Zookeeper 서버가 현재 수신 대기 중인 포트입니다.
# File: kafka_2.13-3.0.0/config/zookeeper.properties
# the port at which the clients will connect
clientPort=2181
Kafka Broker 실행하기
Kafka 브로커는 클러스터의 핵심이며 데이터가 저장되고 배포되는 파이프라인 역할을 합니다. Zookeeper를 시작한 방법과 유사하게,
브로커 서버를 시작하고(bin/kafka-server-start.sh) 구성(config/server.properties)하기 위한 두 개의 파일이 있습니다. kafka의 분산환경 구성을 해야하기 때문에 다이어그램과 같이 3개의 브로커를 시작하겠습니다. config/server.properties 파일을 열면 여러 구성 옵션이 표시됩니다(지금은 대부분 무시할 수 있음). 그러나 각 브로커 인스턴스에 대해 고유해야 하는 세 가지 속성이 있습니다.
File: kafka_2.13-3.0.0/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# The address the socket server listens on. It will get the value returned from
listeners=PLAINTEXT://:9092
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
* 주키퍼 서버들 멀티로 구성한 경우에는 zookeeper.connect 에 나열해주면 됩니다.
주제의 파티션 및 복제본에 대한 세부 정보를 출력합니다. Partition, Leader/follower, Replicas, Isr(In Sync Replica) 정보를 보여줍니다. 여기서 ISR은 kafka 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 나타냅니다. 만일, 브로커 중 1대의 서버가 중지된 상태라면 Isr 은 2개 만 표시됩니다. 3번 브로커 서버가 중지되었다면 Leader는 1 또는 2가 되고 Isr 은 1,2 가 됩니다.
KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다. 브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)
Caused by: org.apache.kafka.common.errors.InvalidRequestException: Invalid config value for resource ConfigResource(type=BROKER, name=''): Cannot update these configs dynamically: Set(delete.topic.enable)
2.
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
3.
Invalid config(s): delete.topic.enable
아직 동적 config 설정이 정상작동 하지 않는 것 같다. 패치버전이나 더 상위 버전이 나오는 다시 확인해 본다.
Zookeeper 쉘을 이용한 방법
* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.
zookeeper-shell.sh 를 이용하여서 znode를 삭제할 수 있습니다.
rmr명령은 더이상 사용되진 않는다. delete나 deleteall로 삭제합니다. delete 를 사용할 경우 message가 남아있는 경우 삭제가 되지 않습니다. 이럴 경우, deleteall로 삭제할 수 있습니다.
$ bin/zookeeper-shell.sh localhost:2181
ls /brokers/topics
## 실행결과
[__consumer_offsets, movie, music, my-kafka-topic]
rmr /brokers/topics/my-kafka-topic
## 실행결과. rmr은 사용할 수 없다.
Command not found: Command not found rmr
## The zkCli provides the rmr (deprecated) or deleteall command for this purpose
delete /brokers/topics/my-kafka-topic
## 실행결과
Node not empty: /brokers/topics/my-kafka-topic
deleteall /brokers/topics/my-kafka-topic
ls /brokers/topics
[__consumer_offsets, movie, music]
quit
## 삭제된 topic을 다시 생성해본다.
$ bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2
## 오류가 발생한다. topic을 kafka-topics.sh 로 조회하면 목록이 보인다.
## zookeeper와 kafka broker 간이 연동이 잘 안되는 것 같다.
Error while executing topic command : Topic 'my-kafka-topic' already exists
* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.
Producers: Topic에 메시지 보내기
"생산자(Producer)"는 데이터를 Kafka 클러스터에 넣는 프로세스입니다.
bin 디렉토리의 명령어는 콘솔에 텍스트를 입력할 때마다 클러스터에 데이터를 입력하는 콘솔 생성자(Producer)를 제공합니다.
이제 데이터를 입력하고 Enter 키를 치면, Kafka 클러스터에 텍스트를 입력할 수 있는 명령 프롬프트가 표시됩니다.
컨슈머가 데이터를 가져가도 Topic 데이터는 삭제되지 않습니다.
>first insert
>두번째 입력
프로듀서가 데이터를 보낼 때 '파티션키'를 지정해서 보낼 수 있습니다.
파티션키를 지정하지 않으면, 라운드로빈 방식으로 파티션에 저장하고,
파티션키를 지정하면, 파티셔너가 파티션키의 HASH 값을 구해서 특정 파티션에 할당합니다.
카프카에서 kafka-console-producer.sh로 Consumer에게 메세지를 보낼 때 기본적으로 key값이 null로 설정되어 있습니다. 이럴 때 설정에서 parse.key 및 key.separator 속성을 설정하면 key와 value가 포함된 메시지를 보낼 수 있습니다.
Kafka consumers 를 사용하여 클러스터에서 데이터를 읽을 수 있습니다. 이 경우 이전 섹션에서 생성한 데이터를 읽습니다. consumer를 시작하려면 다음 명령을 실행합니다. 아래와 같이 위에서 pruducer가 입력한 데이터가 출력됩니다.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning
first insert
두번째 입력
bootstrap-server는 클러스터의 브로커 중 하나일 수 있습니다.
Topic은 생산자가 클러스터에 데이터를 입력한 Topic(주제)과 동일해야 합니다.
from-beginning은 클러스터에 현재 가지고 있는 모든 메시지를 원한다고 클러스터에 알립니다.
컨슈머 그룹이 다른 새로운 컨슈머가 auto.offset.reset=earliest 설정으로 데이터를 0번부터 가져갈 수 있습니다. 설정하지 않으면 새롭게 토픽에 생성된 메세지만 읽어옵니다.
위의 명령을 실행하면 콘솔에 로그온한 생산자가 입력한 모든 메시지가 즉시 표시됩니다. 소비자가 실행되는 동안 생산자가 메시지를 입력하면 실시간으로 콘솔에 출력되는 것을 볼 수 있습니다.
이런 식으로 Kafka는 지속적인 메시지 대기열처럼 작동하여 소비자가 아직 읽지 않은 메시지를 저장하고 소비자가 실행되는 동안 새 메시지가 오는 대로 전달합니다.
카프카에서는 consumer 그룹이라는 개념이 있는데 --consumer-property group.id=group-01 형식으로 consumer 그룹을 지정할 수 있습니다. 카프카 브로커는 컨슈머 그룹 중 하나의 컨슈머에게만 이벤트를 전달합니다. 동일한 이벤트 처리를 하는 컨슈머를 clustering 한 경우에 컨슈머 그룹으로 지정하면 클러스터링된 컨슈머 중 하나의 서버가 데이터를 수신합니다.
key와 value를 콘솔창에 표시하기 위해서는 --property print.key=true --property key.separator=: 를 설정합니다.
기존의 Message Queue 솔루션에서는 컨슈머가 메시지를 가져가면, 해당 메세지를 큐에서 삭제된다. 즉, 하나의 큐에 대하여 여러 컨슈머가 붙어서 같은 메세지를 컨슈밍할 수 없다. 하지만 Kafka는, 컨슈머가 메세지를 가져가도 큐에서 즉시 삭제되지 않으며, 하나의 토픽에 여러 컨슈머 그룹이 붙어 메세지를 가져갈 수 있다.
또한 각 consumer group마다 해당 topic의 partition에 대한 별도의 offset을 관리하고, group에 컨슈머가 추가/제거 될 때마다 rebalancing을 하여 group 내의 consumer에 partition을 할당하게 된다. 이는 컨슈머의 확장/축소를 용이하게 하고, 하나의 MQ(Message Queue)를 컨슈머 별로 다른 용도로 사용할 수 있는 확장성을 제공한다.
reset-offsets의 옵션으로 --to-earlist , --to-datetime, --from-file, --to-current, --shift-by 등이 있다 - --shift-by -2 를 하면 오프셋이 2칸 전으로 이동한다
execute의 옵션으로 --all-topics 와 --topic [그룹 이름] 이 있다
복제 테스트 : 브로커가 Offline 되면 어떤 일이?
이제 시스템에 Kafka 클러스터가 설정되었으므로 데이터 복제를 테스트해 보겠습니다.
실행한 3개의 브로커 중 첫 번째 브로커를 Control + C 키로 종료합니다.
# 첫번째 브로커 중지
[2022-01-01 02:09:13,095] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-01-01 02:09:13,097] INFO Broker and topic stats closed (kafka.server.BrokerTopicStats)
[2022-01-01 02:09:13,098] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 02:09:13,099] INFO [KafkaServer id=1] shut down completed (kafka.server.KafkaServer)
실행한 3개의 브로커 중 하나를 종료해도 클러스터가 클러스터가 제대로 실행되는지 아래 명령어를 실행합니다. 다른 터미널 창에서 다른 소비자를 시작해 보겠습니다.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic my-kafka-topic --from-beginning --group group2
real-time data
두번째 입력
awesome!!!
first insert
여기에 추가한 것은 이 소비자를 다른 소비자와 구별하는 그룹 옵션입니다. 명령을 실행하면 처음부터 입력된 모든 메시지가 표시됩니다. 동일한 소비자인 경우에는 이미 데이터를 읽었기 때문에 데이터가 보지이 않습니다.
중개인 중 하나가 중지되었지만 데이터는 손실되지 않았습니다. 이는 이전에 설정한 복제 계수(replication-factor)를 2로 설정하여 데이터 사본이 여러 브로커에 존재하도록 했기 때문입니다.
? 의문점 : 위에서 보는 것처럼 입력한 순서대로 출력되지 않았습니다. 한글때문인지 아니면 원래 순서가 없는 것인지 정확한 이유는 파악하지 못했습니다. 추후 확인이 필요한 사안입니다.
다른 브로커를 중단하면 어떻게 될까요?
본 문서에서는 두번 째 브로커도 중지하고 테스트 해보겠습니다.
이 경우에는 데이터 조회가 되지 않네요? 브로커를 3개로 구성한 경우 2개의 브로커가 장애가 발생한 경우 정상 작동하지 않습니다. 즉, 브로커의 과반수 이상에 장애가 발생하면 정상 작동하지 않습니다.
Kafka 생산자 및 소비자 구현
클러스터를 실행하고 나면 애플리케이션 코드에서 생산자와 소비자를 구현할 수 있습니다. Golang 또는 Node.js에서 Kafka 생산자 및 소비자를 구현하는 방법은 링크된 문서를 참고하시면 됩니다.
로컬 환경에 Java 8 이상이 설치되어 있어야 하고, zookeeper 가 설치 및 실행되고 있어야 합니다. zookeeper가 설치되어 있을 경우, zookeeper 를 시작합니다.
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Kafka는 여러 시스템에서 이벤트(문서에서 레코드 또는 메시지라고도 함)를 읽고, 쓰고, 저장하고, 처리할 수 있는 분산 이벤트 스트리밍 플랫폼입니다. 이벤트의 예로는 결제 거래, 휴대폰의 지리적 위치 업데이트, 배송 주문, IoT 장치 또는 의료 장비의 센서 측정 등이 있습니다. 이러한 이벤트는 주제별로 구성 및 저장됩니다. 매우 단순화된 주제는 파일 시스템의 폴더와 유사하고 이벤트는 해당 폴더의 파일로 생각하면 이해가 쉽습니다. 이벤트를 작성하기 전에 topic을 작성해야 합니다. 다른 터미널 세션을 열고 다음을 실행하여 topic을 생성합니다.
Kafka 클라이언트는 이벤트 쓰기(또는 읽기)를 위해 네트워크를 통해 Kafka 브로커와 통신합니다. 일단 수신되면 브로커는 필요한 기간 동안(또는영원히) 내구성 및 내결함성 방식으로 이벤트를 저장합니다. 콘솔 생산자 클라이언트를 실행하여 주제에 이벤트를 작성합니다. 입력하는 각 행은 topic에 이벤트로 작성됩니다.
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event
다른 터미널 세션을 열고 콘솔 소비자 클라이언트를 실행하여 방금 생성한 이벤트를 읽습니다. 최초 수행 시 "internal topic __consumer_offsets" 이 자동 생성됩니다.
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
Ctrl-C를 사용하여 언제든지 소비자 클라이언트를 중지할 수 있습니다. 이벤트는 Kafka에 영구적으로 저장되기 때문에 원하는 만큼 많은 소비자가 이벤트를 읽을 수 있습니다. 또 다른 터미널 세션을 열고 이전 명령을 다시 실행하여 이를 쉽게 확인할 수 있습니다.
Topics 삭제
이전버전 :
이전 버전에서는 kafka server.properties파일에 설정을 하고 kafka broker를 재기동해야 topic을 삭제할 수 있습니다. 단, 이벤트를 읽고 있는 consumber는 에러(Error while fetching metadata with correlation)가 발생합니다.
KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다. 브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)
관계형 데이터베이스 또는 기존 메시징 시스템과 같은 기존 시스템과 이미 이러한 시스템을 사용하는 많은 애플리케이션에 많은 데이터가 있을 수 있습니다. Kafka Connect를 사용하면 외부 시스템에서 Kafka로 또는 그 반대로 데이터를 지속적으로 수집할 수 있습니다. 이 프로세스를 더욱 쉽게 만들기 위해 수백 개의 커넥터를 사용할 수 있습니다. Kafka에서 데이터를 지속적으로 가져오거나 내보내는 방법에 대해 자세히 알아보려면 Kafka Connect section 섹션을 살펴보세요.
데이터가 Kafka에 이벤트로 저장되면 Java/Scala용 Kafka Streams 클라이언트 라이브러리를 사용하여 데이터를 처리할 수 있습니다. 이를 통해 입력 및/또는 출력 데이터가 Kafka 주제에 저장되는 실시간 애플리케이션 및 마이크로서비스를 구현할 수 있습니다. Kafka Streams는 클라이언트 측에서 표준 Java 및 Scala 애플리케이션을 작성하고 배포하는 단순성과 Kafka의 서버 측 클러스터 기술의 이점을 결합하여 이러한 애플리케이션을 확장성, 탄력성, 내결함성 및 분산성을 높입니다. 라이브러리는 정확히 한 번 처리, 상태 저장 작업 및 집계, 윈도우, 조인, 이벤트 시간 기반 처리 등을 지원합니다.
이벤트 스트리밍은 인체의 디지털 중추 신경계라고 할 수 있으며 '상시 가동' 세상을 위한 기반 기술입니다. 기술적으로 말하면 이벤트 스트리밍은 데이터베이스, 센서, 모바일 장치, 클라우드 서비스 및 소프트웨어 애플리케이션과 같은 이벤트 소스에서 이벤트 스트림의 형태로 실시간으로 데이터를 캡처하는 방식입니다.
나중에 검색할 수 있도록 이러한 이벤트 스트림을 영구적으로 저장합니다. 소급적으로나 실시간으로 이벤트 스트림을 조작, 처리 및 반응합니다. 필요에 따라 이벤트 스트림을 다른 목적지 기술로 라우팅하는 단계를 포함합니다. 이벤트 스트리밍은 올바른 정보가 적시에 적절한 위치에 있도록 데이터의 지속적인 흐름과 해석을 보장합니다.
이벤트 스트리밍은 무엇에 사용할 수 있습니까?
이벤트 스트리밍은 수많은 산업 및 조직의 다양한 사용 사례에 적용됩니다.
증권 거래소, 은행 및 보험과 같은 실시간으로 지불 및 금융 거래를 처리합니다.
물류 및 자동차 산업과 같이 자동차, 트럭, 차량 및 선적을 실시간으로 추적하고 모니터링합니다.
공장 및 풍력 발전 단지와 같은 IoT 장치 또는 기타 장비의 센서 데이터를 지속적으로 캡처하고 분석합니다.
소매, 호텔 및 여행 산업, 모바일 애플리케이션과 같은 고객 상호 작용 및 주문을 수집하고 즉시 대응합니다.
병원에서 치료 중인 환자를 모니터링하고 상태 변화를 예측하여 응급 상황에서 시기 적절한 치료를 보장합니다.
문서에서는 기록 또는 메시지라고도 합니다. Kafka에서 데이터를 읽거나 쓸 때 이벤트 형식으로 이 작업을 수행합니다.
개념적으로 이벤트에는 키, 값, 타임스탬프 및 선택적 메타데이터 헤더가 있습니다.
다음은 이벤트의 예입니다.
Event key: "Alice"
Event value: "Made a payment of $200 to Bob"
Event timestamp: "Jun. 25, 2020 at 2:06 p.m."
생산자(Producers)는
Kafka에 이벤트를 게시(쓰기)하는 클라이언트 응용 프로그램이고
소비자(consumers)는
이러한 이벤트를 구독(읽기 및 처리)하는 응용 프로그램입니다.
Kafka에서 생산자와 소비자는 완전히 분리되고 서로 알 수 없으며, 이는 Kafka가 높은 확장성을 달성하기 위한 핵심 설계 요소입니다. 예를 들어 생산자는 소비자를 기다릴 필요가 없습니다. Kafka는 이벤트를 정확히 한 번 처리하는 기능과 같은 다양한 보장성을 제공합니다.
이벤트는 주제(topics)로 구성되고 영구적으로 저장됩니다. 간단하게 주제는 파일 시스템의 폴더와 유사하고 이벤트는 해당 폴더의 파일입니다. 예로써 주제 이름은 "지불"일 수 있습니다.
Kafka의 토픽은 항상 다중 생성자 및 다중 구독자입니다. 주제에는 이벤트를 작성하는 0개, 1개 또는 많은 생성자와 이러한 이벤트를 구독하는 0개, 1개 또는 많은 소비자가 있을 수 있습니다. 주제의 이벤트는 필요한 만큼 자주 읽을 수 있습니다. 기존 메시징 시스템과 달리 이벤트는 소비 후 삭제되지 않습니다. 대신 주제별 구성 설정을 통해 Kafka가 이벤트를 유지해야 하는 기간을 정의할 수 있고, 이보다 오래된 이벤트가 삭제됩니다.
Kafka의 성능은 데이터 크기와 관련하여 사실상 일정하므로 데이터를 장기간 저장하는 것이 좋습니다.
주제는 분할(partitioned)되어 있으며, 이는 다른 Kafka 브로커에 있는 여러 "버킷(buckets)"에 분산되어 있음을 의미합니다.
데이터의 분산 배치는 클라이언트 애플리케이션이 동시에 많은 브로커에서 데이터를 읽고 쓸 수 있도록 하기 때문에 확장성에 매우 중요합니다. 새 이벤트가 주제에 게시되면 실제로 해당 주제의 파티션 중 하나에 추가됩니다.
동일한 이벤트 키(예: 고객 또는 차량 ID)가 있는 이벤트는 동일한 파티션에 기록되고 Kafka는 지정된 파티션의 모든 소비자가 항상 기록된 것과 정확히 동일한 순서로 해당 파티션의 이벤트를 읽도록 보장합니다.
Figure:
이 예제 주제에는 4개의 파티션 P1~P4가 있습니다. 두 개의 서로 다른 생산자가 네트워크를 통해 서로 독립적으로 새 이벤트를 주제에 게시하고 있습니다. 동일한 키를 사용하는 이벤트(그림에서 색상으로 표시)는 동일한 파티션에 기록됩니다. 두 생산자 모두 동일한 파티션에 쓸 수도 있습니다.
데이터 내결함성 및 고가용성을 만들기 위해 모든 주제를 여러 지리적 지역이나 데이터 센터에 걸쳐 복제할 수 있습니다. 일이 잘못될 경우에 대비하여 데이터 복사본이 있는 여러 브로커가 있도록 합니다. 일반적인 프로덕션 설정은 복제 3팩터입니다. 즉, 항상 3개의 데이터 복사본이 있습니다. 이 복제는 주제 파티션 수준에서 수행됩니다.
We build for multiple versions of Scala
: 여러 버전의 Scala로 빌드되었다. 즉 Kafka는 Scala로 만들었다.
What is Scala?
스칼라는 함수형 객체지향 프로그래밍 언어 입니다. 스칼라는 자바의 복잡한 단점을 해결하기 위해 만들어 졌습니다. 스칼라는 자바 바이트 코드를 사용하기 때문에 JVM위에서 실행 시킬 수 있습니다. 또한 자바의 클래스들을 바로 사용할 수도 있고, 자바에서도 스칼라 코드들을 호출할 수 있습니다.