1.3 Quick Start

STEP 1: GET KAFKA

최신 Kafka 릴리스를 다운로드하고 압축을 풉니다.

$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0

STEP 2: START THE KAFKA ENVIRONMENT

로컬 환경에 Java 8 이상이 설치되어 있어야 하고, zookeeper 가 설치 및 실행되고 있어야 합니다.
zookeeper가 설치되어 있을 경우, zookeeper 를 시작합니다.

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

다른 터미널 세션을 열고 kafka 서버를 실행합니다.

$ bin/kafka-server-start.sh config/server.properties

모든 서비스가 성공적으로 시작되면 기본 Kafka 환경이 실행되고 사용할 준비가 된 것입니다.

STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS

Kafka는 여러 시스템에서 이벤트(문서에서 레코드 또는 메시지라고도 함)를 읽고, 쓰고, 저장하고, 처리할 수 있는 분산 이벤트 스트리밍 플랫폼입니다.
이벤트의 예로는 결제 거래, 휴대폰의 지리적 위치 업데이트, 배송 주문, IoT 장치 또는 의료 장비의 센서 측정 등이 있습니다. 이러한 이벤트는 주제별로 구성 및 저장됩니다. 매우 단순화된 주제는 파일 시스템의 폴더와 유사하고 이벤트는 해당 폴더의 파일로 생각하면 이해가 쉽습니다.
이벤트를 작성하기 전에 topic을 작성해야 합니다. 다른 터미널 세션을 열고 다음을 실행하여 topic을 생성합니다.

$ bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092

Kafka의 모든 명령줄 도구에는 추가 옵션이 있습니다. 인수 없이 kafka-topics.sh 명령을 실행하여 사용 정보를 표시합니다. 예를 들어 topic의 파티션 수와 같은 세부 정보를 표시합니다.

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: 1ejyYwvJQF65aAk_5N-lgg PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: quickstart-events        Partition: 0    Leader: 3       Replicas: 3     Isr: 3

STEP 4: WRITE SOME EVENTS INTO THE TOPIC

Kafka 클라이언트는 이벤트 쓰기(또는 읽기)를 위해 네트워크를 통해 Kafka 브로커와 통신합니다. 일단 수신되면 브로커는 필요한 기간 동안(또는영원히) 내구성 및 내결함성 방식으로 이벤트를 저장합니다.
콘솔 생산자 클라이언트를 실행하여 주제에 이벤트를 작성합니다. 입력하는 각 행은 topic에 이벤트로 작성됩니다.

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

Ctrl-C를 사용하여 생산자 클라이언트를 중지할 수 있습니다.

STEP 5: READ THE EVENTS

다른 터미널 세션을 열고 콘솔 소비자 클라이언트를 실행하여 방금 생성한 이벤트를 읽습니다. 최초 수행 시 "internal topic __consumer_offsets" 이 자동 생성됩니다.

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl-C를 사용하여 언제든지 소비자 클라이언트를 중지할 수 있습니다.
이벤트는 Kafka에 영구적으로 저장되기 때문에 원하는 만큼 많은 소비자가 이벤트를 읽을 수 있습니다. 또 다른 터미널 세션을 열고 이전 명령을 다시 실행하여 이를 쉽게 확인할 수 있습니다.

 

Topics 삭제

이전버전 : 

이전 버전에서는 kafka server.properties파일에 설정을 하고 kafka broker를 재기동해야 topic을 삭제할 수 있습니다. 단, 이벤트를 읽고 있는 consumber는 에러(Error while fetching metadata with correlation)가 발생합니다.  

# config/server.properties파일
delete.topic.enable = true

server.properties에 delete.topic.enable=true 설정을 하고 kafka borker를 재기동 한 다음, kafka-topics.sh로 삭제를 해주면 됩니다. 

$  bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

최신버전 : (3.0 version)

KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다. 
브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)

elete.topic.enable 기본 설정은 true입니다.

$  bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

 

동적 파라미터 설정

STEP 6: IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT

관계형 데이터베이스 또는 기존 메시징 시스템과 같은 기존 시스템과 이미 이러한 시스템을 사용하는 많은 애플리케이션에 많은 데이터가 있을 수 있습니다. Kafka Connect를 사용하면 외부 시스템에서 Kafka로 또는 그 반대로 데이터를 지속적으로 수집할 수 있습니다. 이 프로세스를 더욱 쉽게 만들기 위해 수백 개의 커넥터를 사용할 수 있습니다.
Kafka에서 데이터를 지속적으로 가져오거나 내보내는 방법에 대해 자세히 알아보려면 Kafka Connect section 섹션을 살펴보세요.

STEP 7: PROCESS YOUR EVENTS WITH KAFKA STREAMS

데이터가 Kafka에 이벤트로 저장되면 Java/Scala용 Kafka Streams 클라이언트 라이브러리를 사용하여 데이터를 처리할 수 있습니다. 이를 통해 입력 및/또는 출력 데이터가 Kafka 주제에 저장되는 실시간 애플리케이션 및 마이크로서비스를 구현할 수 있습니다. Kafka Streams는 클라이언트 측에서 표준 Java 및 Scala 애플리케이션을 작성하고 배포하는 단순성과 Kafka의 서버 측 클러스터 기술의 이점을 결합하여 이러한 애플리케이션을 확장성, 탄력성, 내결함성 및 분산성을 높입니다. 라이브러리는 정확히 한 번 처리, 상태 저장 작업 및 집계, 윈도우, 조인, 이벤트 시간 기반 처리 등을 지원합니다.

Kafka Streams WordCount 알고리즘을 구현하는 방법은 다음과 같습니다.

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

상세 내용은 Kafka Streams demoapp development tutorial 을 참고합니다.

STEP 8: TERMINATE THE KAFKA ENVIRONMENT

  1. Ctrl-C를 사용하여 생산자 및 소비자 클라이언트 중지
  2. Ctrl-C를 사용하여 Kafka 브로커 중지.
  3. Ctrl-C를 사용하여 Kafka 브로커 중지

생성한 이벤트를 포함하여 로컬 Kafka 환경의 데이터도 삭제하려면 다음 명령을 실행합니다.

$ rm -rf /tmp/kafka-logs /tmp/zookeeper

 

+ Recent posts