requirement

- zookeeper

- apache kafka

- apache connect

 

standalone mode

파일 소스 커넥트

FileSource 커넥터는 파일에서 데이터를 읽고 이를 Apache Kafka®로 보냅니다. 모든 커넥터에 공통적인 구성 외에 file 과 topic 을 속성으로 사용합니다. 다음은 구성의 예입니다. 파일이름을 local-file-soruce.properties 로 저장합니다.

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=connect-test
 

source file connector를 생성합니다.

$ bin/connect-standalone.sh config/connect-standalone.properties local-file-source.properties

 

이 커넥터는 하나의 파일만 읽고 해당 파일 내의 데이터를 Kafka broker로 보냅니다. 그런 다음 추가된 데이터에 대해서만 파일을 감시합니다. 이미 Kafka로 전송된 파일 라인의 수정 사항은 처리되지 않습니다.

test.txt 파일에 임의의 데이터를 저장합니다. 아래는text.txt 파일 내용 예시입니다.

first message
seconde line
세번째 메시지

 

파일에 있는 데이터를 조회하기 위해 /kafka-console-consumer.sh 를 실행합니다.

bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic connect-test --from-beginning

## Result
{"schema":{"type":"string","optional":false},"payload":"first message"}
{"schema":{"type":"string","optional":false},"payload":"seconde line"}
{"schema":{"type":"string","optional":false},"payload":"세번째 메시지"}

test.txt 파일에 데이터를 추가해 봅니다. consumer 화면에 메시지가 출력되는 것을 확인할 수 있습니다.

 

파일싱크 커넥터

FileSink 커넥터는 Kafka에서 데이터를 읽고 로컬 파일로 출력합니다. 다른 싱크 커넥터와 마찬가지로 여러 항목을 지정할 수 있습니다. FileSink 커넥터는 모든 커넥터에 공통적인 구성 외에 속성 file만 사용합니다 . 다음은 구성의 예입니다.

파일이름을 local-file-sink.properties 로 저장합니다.

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=connect-test

 

파일에 데이터를 저장하기 위해 sink connector를 실행한 후 test.sink.txt 파일을 열어보면 test.txt 파일의 내용이 저장된 것을 확인할 수 있습니다.

$ bin/connect-standalone.sh config/connect-standalone.properties local-file-sink.properties

## Result
$ cat /temp/test.sink.txt
first message
seconde line
세번째 메시지

 

standalone mode에서 리스너 port는 default가 8083입니다. file-source와 file-sink connector를 동시에 실행하면 port 충돌이 발생합니다. connect-standalone.properties 파일을 복사하여 새로운 onnect-standalone_new.properties 파일을 생성한 후 listeners=HTTP://:18083 처럼 다른 port를 사용하도록 설정합니다.

 

 

 

분산 모드

분산 모드에서 Kafka 커넥터는 각 이벤트 스레드가 처리하는 작업 수의 균형을 자동으로 조정합니다. 사용자는 작업을 동적으로 늘리거나 줄일 수 있으며 작업 실행, 구성 수정 및 오프셋 제출 시 내결함성이 보장될 수 있습니다.

분산 모드에서 Kafka 커넥터는 오프셋, 구성 및 작업 상태를 Kafka Topic에 저장합니다(독립 실행형 모드에서는 로컬 파일에 보관됨). 필요에 따라 테마의 파티션 및 복사본 수를 설정할 수 있도록 오프셋을 저장할 테마를 수동으로 생성하는 것이 좋습니다.

분산 모드에서 Kafka 커넥터의 구성 파일은 명령줄을 사용할 수 없습니다.

REST API는 Kafka broker를 생성, 수정 및 제거하는 데 사용합니다.

 

분산 connect 생성

$ bin/connect-distributed.sh config/connect-distributed.properties

...........
[2022-02-04 01:08:12,549] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:303)
[2022-02-04 01:08:12,549] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2022-02-04 01:08:12,789] INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1716)

 

'Apache Kafka' 카테고리의 다른 글

kafka connector - nginx access log 연결  (0) 2022.01.23
kafkacat  (0) 2022.01.23
[Kafka Connect] Connector Rest API  (0) 2022.01.23
카프카 미러메이커2  (0) 2022.01.14
카프카 ISR(In-Sync-Replicas)  (0) 2022.01.14

구성환경

  • WSL2(Ubuntu 20.04.3 LTS)
  • zookeeper
  • kafka broker
  • kafka connect

 

nginx 설치

## install nginx
sudo atp intall nginx

## start nginx
sudo service nginx start

config 파일 : /etc/nginx/nginx.conf

access log 파일 : /var/log/nginx/access.log

 

source properties 설정

nginx-accesslog.properties 파일 내용

name=nginx-accesslog-source
connector.class=FileStreamSource
tasks.max=1
file=/var/log/nginx/access.log
topic=nginx-accesslog

connector 생성 및 worker 기동

timestamp+incrementing mode의 worker를 실행합니다.

bin/connect-standalone.sh config/connect-standalone.properties nginx-accesslog.properties

consumer 메시지 수신 확인

bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic nginx-accesslog

access log 데이터 추가

브라우저로 http://localhost 에 접속합니다.

consumer 추가 메시지 확인

consumer 창에서 accesslog 메시지가 출력되는지 확인합니다.

sink porperties 설정

name=nginx-accesslog-sink
connector.class=FileStreamSink
tasks.max=1
file=/var/log/nginx/access.log.sink
topics=nginx-accesslog

'Apache Kafka' 카테고리의 다른 글

Kafka Connect FileStream Connectors  (0) 2022.01.27
kafkacat  (0) 2022.01.23
[Kafka Connect] Connector Rest API  (0) 2022.01.23
카프카 미러메이커2  (0) 2022.01.14
카프카 ISR(In-Sync-Replicas)  (0) 2022.01.14

kcat(이전의 kafkacat)은 Apache Kafka® 배포를 테스트 및 디버그하는 데 사용할 수 있는 명령줄 유틸리티입니다.

kcat을 사용하여 Kafka topic에 대해 produce, consume 및 list 할 수 있습니다. "netcat for Kafka"로 설명되는 이 도구는 Kafka에서 데이터를 검사하고 생성하기 위한 도구입니다.

 

kafkacat 설치

 sudo apt install kafkacat

 

Consumer Mode

Consumer 모드에서 kcat은 주제 및 파티션에서 메시지를 읽고 출력합니다.

Kafka 브로커( -b)와 topic( -t)를 지정해야 합니다. 

kcat에 브로커( -b) 및 topic( -t)에 대한 내용을 볼 수 있습니다.

 kafkacat -b localhost:9092 -t mysql_users
% Auto-selecting Consumer mode (use -P or -C to override)
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
[...]
 
 

Producer Mode

In producer mode, kcat reads messages from standard input (stdin). You must specify a Kafka broker (-b) and topic (-t). You can optionally specify a delimiter (-D). The default delimiter is newline.

You can easily send data to a topic using kcat. Run it with the -P command and enter the data you want, and then press Ctrl-D to finish:

Producer 모드에서 kcat은 표준 입력(stdin)에서 메시지를 읽습니다. Kafka 브로커( -b)와 주제( -t)를 지정해야 합니다.

kcat을 사용하여 주제에 데이터를 보낼 수 있습니다. -P 명령 으로 실행하고원하는 데이터를 입력한 다음 Ctrl-D 를 눌러 완료합니다.

kafkacat -b localhost:9092 -t new_topic -P

test
 

 

Metadata Listing Mode

메타데이터 목록 모드( -L)에서 kcat은 Kafka 클러스터와 주제, 파티션, 복제본 및 동기화 복제본(ISR)의 현재 상태를 표시합니다.

kafkacat -b localhost:9092 -L
 

JSON( -J) 옵션을 추가하여 출력을 JSON으로 내보냅니다.

kafkacat -b mybroker -L -J
 

kcat example code

kcat의 Hello World 예제는 kcat: Apache Kafka®  예제를 참조하세요. 모든 예에는 온프레미스 또는 Confluent Cloud에서 실행되는 모든 Kafka 클러스터에 연결할 수 있는 생산자 및 소비자가 포함됩니다.

 

 

'Apache Kafka' 카테고리의 다른 글

Kafka Connect FileStream Connectors  (0) 2022.01.27
kafka connector - nginx access log 연결  (0) 2022.01.23
[Kafka Connect] Connector Rest API  (0) 2022.01.23
카프카 미러메이커2  (0) 2022.01.14
카프카 ISR(In-Sync-Replicas)  (0) 2022.01.14

Connector

connector 목록 조회

curl -X GET "http://localhost:8083/connectors/"

connector 상세 정보 조회

curl -X GET "http://localhost:8083/connectors?expand=status&expand=info"

connector config 조회

  • GET 으로도 동일하게 동작함
curl -X PUT "http://localhost:8083/connectors/{connector_name}/config

특정 connector 상태 조회

curl -X GET "http://localhost:8083/connectors/{connector_name}/status"

connector 재시작

  • ※ task는 재시작되지 않음
curl -X POST "http://localhost:8083/connectors/{connector_name}/restart"

connector 일시중지 (pause)

  • 비동기 방식이므로 상태 조회시 바로 PAUSE 를 리턴하지 않을 수 있음
curl -X PUT "http://localhost:8083/connectors/{connector_name}/pause"

connector resume

  • pause 상태인 connector 를 resume시킨다.
  • 비동기 방식이므로 상태 조회시 바로 RUNNING을 리턴하지 않을 수 있음
curl -X PUT "http://localhost:8083/connectors/{connector_name}/resume"

 

connector 삭제

curl -X DELETE "http://localhost:8083/connectors/{connector_name}

Task

connector의 task 목록 조회

curl -X GET "http://localhost:8083/connectors/{connector_name}/tasks"

connector 의 task 상태 조회

curl -X GET "http://localhost:8083/connectors/{connector_name}/tasks/{task_id}/status"

connector 의 task 재시작

  • ※ connector 가 RUNNING, task 가 FAIL 일 경우 사용
curl -X POST "http://localhost:8083/connectors/{connector_name}/tasks/{task_id}/restart"

Topic

connector topic 조회

curl -X GET "http://localhost:8083/connectors/{connector_name}/topics"

connector topic reset

curl -X PUT "http://localhost:8083/connectors/{connector_name}/topics/reset"

Connector Plugin

Kafka Connector Cluster 에 설치된 모든 plugin 목록 조회

curl -X GET "http://localhost:8083/connector-plugins"

Kafka Connector plgin validate

  • ※ plugin 종류에 따라 필수 field가 다를 수 있음
  • ex) FileStreamSinkConnector 로 test-topic 에 대해 validate 할 경우
echo '{"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "test-topic"}' | curl -X PUT -d -@ "http://localhost:8083/connector-plugins/FileStreamSinkConnector/config/validate

 

 

 

reference : 공식 Document

 

원문 : https://velog.io/@anjinwoong/Kafka-Connect-Kafka-Connect-자주-사용하는-명령어API-정리

'Apache Kafka' 카테고리의 다른 글

kafka connector - nginx access log 연결  (0) 2022.01.23
kafkacat  (0) 2022.01.23
카프카 미러메이커2  (0) 2022.01.14
카프카 ISR(In-Sync-Replicas)  (0) 2022.01.14
ksql - 아파치 카프카 용어 - 수정  (0) 2022.01.13

개요

MirrorMaker 2(MM2)는 Apache Kafka 2.4.0 이상에 포함된 도구로, 여러 Apache Kafka 클러스터를 동기화 상태로 유지하고 클러스터 간의 팬인 및 팬아웃 메시지에 사용할 수 있습니다. 다음은 MM2의 고급 기능 중 일부입니다.

  • Kafka Connect 프레임워크 및 에코시스템을 활용합니다.
  • 소스 및 싱크 커넥터를 모두 포함합니다.
  • 전용 클러스터에서 커넥터를 관리하는 고급 드라이버를 포함합니다.
  • 새로운 주제, 파티션을 감지합니다.
  • 클러스터 간에 주제 구성을 자동으로 동기화합니다.
  • 다운스트림 주제 ACL을 관리합니다.
  • "active/active" 클러스터 구성과 active 클러스터를 원하는 수만큼 지원합니다.
  • 데이터 센터 간 복제, 집계 및 기타 복잡한 토폴로지를 지원합니다.
  • 여러 데이터 센터/클러스터에 대한 종단 간 복제 지연 시간을 포함한 새로운 메트릭을 제공합니다.
  • 클러스터 간에 소비자를 마이그레이션하는 데 필요한 오프셋을 내보냅니다.
  • 오프셋 번역을 위한 도구.
  • MirrorMaker 호환 레거시 모드.

'Apache Kafka' 카테고리의 다른 글

kafkacat  (0) 2022.01.23
[Kafka Connect] Connector Rest API  (0) 2022.01.23
카프카 ISR(In-Sync-Replicas)  (0) 2022.01.14
ksql - 아파치 카프카 용어 - 수정  (0) 2022.01.13
카프카 Connect  (0) 2022.01.12

ISR(In-Sync-Replicas)이란?

ISR은 kafka 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 말하는데, 리더 파티션과 팔로워 파티션에 대해 간단하게 알아보도록 하자.

 

리더 파티션과 팔로워 파티션

Kafka에서는 복제 계수라는 것이 존재하는데, 이는 Hadoop에서 사용하는 복제 계수와 비슷한 개념으로 데이터의 유실을 방지 하기 위해 사용되는 개념이다.

복제 계수가 2인 kafka topic은 리더 파티션 1개와 팔로워 파티션이 1개 생성이 되며, 데이터를 전달받았을 때 각각 아래의 역할을 수행한다.

리더 파티션

리더 파티션은 producer로부터 데이터를 전달받아 저장하는 작업을 진행

팔로워 파티션

복제 계수를 맞추기 위해 리더 파티션으로부터 데이터를 전달받아 동기화

 

그림을 통해 리더파티션, 팔로워 파티션, ISR 상태에 대해서 알아보자.

먼저 아래의 그림은 producer에서 kafka로 데이터를 보내고 리더 파티션이 데이터를 팔로워 파티션으로 복제하는 그림을 나타낸 것이다. 리더 파티션에는 4번째 데이터까지 존재하고, 팔로워 파티션에는 3번까지의 데이터가 존재하는 그림으로 복제 하는 과정에서 딜레이가 발생할 수 있으며, 항상 같은 데이터를 가지고 있는 것이 아니다.

다음으로 ISR 상태에 대해서 알아보자

위의 그림과 다르게 리더 파티션의 데이터와 팔로워 파티션에 있는 데이터가 모두 동일한 것을 알 수 있는데 이를 ISR 상태라고 한다.

 

ISR 관련 설정

그렇다면 ISR을 이용해 뭘 할 수 있을까?

리더 파티션에 문제가 생기는 경우 새로운 리더를 선출해야하는데 ISR상태의 파티션이 리더가 될 것인지, 아닌 파티션도 리더 파티션이 될 수 있는지에 대한 옵션을 설정할 수 있다. kafka document의 내용을 살펴보자.

unclean.leader.election.enable 옵션을 false로 설정하는 경우 ISR이 아닌 팔로워 파티션은 리더가 되지 않으며, ISR이 전혀 이루어지지 않은 경우에는 리더 파티션이 포함된 브로커가 다시 정상 동작을 할 때까지 기다리게 되며, 서비스에는 영향이 생길 수 있으나, 데이터의 유실은 발생하지 않을 것이다.

 

반대로 true로 설정하는 경우 ISR이 아닌 팔로워 파티션도 리더가 될 수 있는데, 이 경우 데이터는 일부 유실될 수 있지만 서비스에는 영향을 미치지 않게 된다.

데이터 유실과 서비스 영향도를 잘 고려하여 해당 옵션을 설정하고 사용해야 할 것이다.

 

원문 : https://dydwnsekd.tistory.com/67

'Apache Kafka' 카테고리의 다른 글

[Kafka Connect] Connector Rest API  (0) 2022.01.23
카프카 미러메이커2  (0) 2022.01.14
ksql - 아파치 카프카 용어 - 수정  (0) 2022.01.13
카프카 Connect  (0) 2022.01.12
AWS MSK  (0) 2022.01.11

아파치 카프카 입문서

ksqlDB는 Apache Kafka®에서 스트림 처리를 위해 특별히 구축된 데이터베이스입니다. 이 섹션에서는 ksqlDB를 효과적으로 사용하기 위해 필요한 최소한의 Kafka 개념에 대해 설명합니다. 자세한 내용은 공식 Apache Kafka 설명서 를 참조하십시오 .

Record

Kafka에서 데이터의 기본 단위는 이벤트입니다. 이벤트는 특정 시점에 세계에서 일어난 일을 모델링합니다. Kafka에서는 레코드라고 하는 데이터 구성을 사용하여 각 이벤트를 나타냅니다. 레코드에는 키, 값, 타임스탬프, 주제, 파티션, 오프셋 및 헤더와 같은 몇 가지 다른 종류의 데이터가 들어 있습니다.

레코드의 키는 이벤트의 ID를 나타내는 임의의 데이터입니다. 이벤트가 웹 페이지의 클릭인 경우 키는 클릭을 수행한 사용자의 ID일 수 있습니다.

값은 기본 데이터를 나타내는 데이터의 부분입니다. 클릭 이벤트의 값에는 이벤트가 발생한 페이지, 클릭된 DOM 요소 및 기타 정보가 포함될 수 있습니다.

타임 스탬프는 이벤트가 일어 났을 때입니다. 추적할 수 있는 몇 가지 다른 "종류" 시간이 있습니다. 

주제 와 파티션이 특정 이벤트에 속하고 큰 수집 및 이벤트의 하위 집합을 설명 오프셋 (아래 그에 더) 그 큰 컬렉션을 내 정확한 위치를 설명합니다.

마지막으로 헤더는 레코드에 대한 임의의 사용자 제공 메타데이터를 전달합니다.

ksqlDB는 이러한 정보 중 일부를 추상화하므로 이에 대해 생각할 필요가 없습니다. 다른 것들은 직접 노출되며 프로그래밍 모델의 필수적인 부분입니다. 예를 들어 ksqlDB에서 데이터의 기본 단위는 행 입니다. 행은 Kafka 레코드에 대한 유용한 추상화입니다. 행에는 키 열과 값 열의 두 가지 열이 있습니다. 그들은 또한 timestamp.

일반적으로 ksqlDB는 상위 수준 프로그래밍 모델에 기여하지 않는 Kafka 수준 구현 세부 정보를 올리는 것을 피합니다.

주제

주제는 명명된 레코드 모음입니다. 그들의 목적은 상호 관심사의 이벤트를 함께 개최할 수 있도록 하는 것입니다. 일련의 클릭 기록은 "클릭" 주제에 저장되어 한 곳에서 모두 액세스할 수 있습니다. 주제는 추가 전용입니다. 주제에 레코드를 추가하면 개별적으로 변경하거나 삭제할 수 없습니다.

어떤 종류의 레코드를 주제에 넣을 수 있는지에 대한 규칙은 없습니다. 그들은 같은 구조를 따를 필요가 없고, 같은 상황과 관련되거나 이와 유사한 것이 필요하지 않습니다. 주제에 대한 게시를 관리하는 방법은 전적으로 사용자 규칙 및 시행의 문제입니다.

ksqlDB는 스트림 과 테이블을 통해 주제에 대해 더 높은 수준의 추상화를 제공합니다 . 스트림 또는 테이블은 스키마를 Kafka 주제와 연결합니다. 스키마는 주제에 저장할 수 있는 레코드의 모양을 제어합니다. 이러한 종류의 정적 입력은 주제에 어떤 종류의 행이 있는지 이해하기 쉽게 하고 일반적으로 이를 처리하는 프로그램에서 실수를 줄이는 데 도움이 됩니다.

파티션

레코드가 주제에 배치되면 특정 파티션에 배치됩니다. 파티션은 오프셋별로 완전히 정렬된 레코드 시퀀스입니다. 주제에는 스토리지 및 처리의 확장성을 높이기 위해 여러 파티션이 있을 수 있습니다. 주제를 생성할 때 얼마나 많은 파티션이 있는지 선택합니다.

주제에 레코드를 추가할 때 파티션 전략은 레코드가 저장될 파티션을 선택합니다. 많은 파티션 전략이 있습니다. 가장 일반적인 방법은 전체 파티션 수에 대해 레코드 키의 내용을 해시하는 것입니다. 이는 동일한 ID를 가진 모든 레코드를 동일한 파티션에 배치하는 효과가 있으며, 이는 강력한 순서 보장 때문에 유용합니다.

레코드의 순서는 레코드가 추가될 때 설정되는 오프셋이라는 데이터에 의해 추적됩니다. 오프셋이 10인 레코드는 오프셋이 20인 동일한 파티션의 레코드보다 먼저 발생했습니다 .

여기에서 대부분의 역학은 사용자를 대신하여 ksqlDB에서 자동으로 처리됩니다. 스트림 또는 테이블을 생성할 때 확장성을 제어할 수 있도록 기본 주제에 대한 파티션 수를 선택합니다. 스키마를 선언할 때 키의 일부인 열과 값의 일부인 열을 선택합니다. 이 외에도 개별 파티션이나 오프셋에 대해 생각할 필요가 없습니다. 여기에 몇 가지 예가 있습니다.

레코드가 처리되면 키 내용이 해시되어 새 다운스트림 파티션이 동일한 키를 가진 다른 모든 레코드와 일치하게 됩니다. 레코드가 추가되면 오류나 오류가 있는 경우에도 올바른 오프셋 순서를 따릅니다. 쿼리가 행을 처리하려는 방식으로 인해 스트림의 키 콘텐츠가 변경되면( GROUP BY또는 를 통해 PARTITION BY) 기본 레코드 키가 다시 계산되고 레코드가 계산을 수행하기 위해 설정된 새 주제의 새 파티션으로 전송됩니다.

생산자와 소비자

생산자와 소비자는 주제로 또는 주제에서 레코드의 이동을 용이하게 합니다. 애플리케이션이 레코드를 게시하거나 구독하려고 할 때 API(일반적으로 클라이언트 라고 함 )를 호출하여 그렇게 합니다. 클라이언트는 구조화된 네트워크 프로토콜을 통해 브로커(아래 참조)와 통신합니다.

소비자는 주제에서 레코드를 읽을 때 절대 삭제하거나 변경하지 않습니다. 동일한 정보를 반복적으로 읽을 수 있는 이 패턴은 충돌하지 않는 방식으로 동일한 데이터 세트에 대해 여러 애플리케이션을 구축하는 데 도움이 됩니다. 또한 애플리케이션이 이벤트 스트림을 되감고 이전 정보를 다시 읽을 수 있는 "재생"을 지원하기 위한 기본 빌딩 블록입니다.

생산자와 소비자는 상당히 낮은 수준의 API를 노출합니다. 고유한 레코드를 구성하고, 스키마를 관리하고, 직렬화를 구성하고, 어디로 보내는지 처리해야 합니다.

ksqlDB는 상위 수준의 지속적인 생산자 및 소비자 역할을 합니다. 레코드의 모양을 선언한 다음 데이터를 채우고, 변경하고, 쿼리하는 방법을 설명하는 고급 SQL 명령을 실행하기만 하면 됩니다. 이러한 SQL 프로그램은 세부사항을 처리하는 저수준 클라이언트 API 호출로 변환됩니다.

브로커

브로커는 주제에 대한 액세스를 저장하고 관리하는 서버입니다. 여러 브로커가 함께 클러스터링되어 고가용성 내결함성 방식으로 주제를 복제할 수 있습니다. 클라이언트는 브로커와 통신하여 레코드를 읽고 씁니다.

ksqlDB 서버 또는 클러스터를 실행할 때 각 노드는 Kafka 브로커와 통신하여 처리를 수행합니다. Kafka 브로커의 관점에서 각 ksqlDB 서버는 클라이언트와 같습니다. 브로커에서 처리가 수행되지 않습니다. ksqlDB의 서버는 자체 노드에서 모든 계산을 수행합니다.

직렬 변환기

모든 문제에 완벽하게 맞는 데이터 형식은 없기 때문에 Kafka는 레코드의 키 및 값 부분에 있는 데이터 내용에 대해 불가지론적으로 설계되었습니다. 레코드가 클라이언트에서 브로커로 이동할 때 사용자 페이로드(키 및 값)는 바이트 배열로 변환되어야 합니다. 이를 통해 Kafka는 불투명한 일련의 바이트가 무엇인지 알 필요 없이 작업할 수 있습니다. 레코드가 소비자에게 전달될 때 해당 바이트 배열은 응용 프로그램에 의미 있는 원래 주제로 다시 변환되어야 합니다. 바이트 표현으로 변환하거나 변환하는 프로세스를 각각 직렬화 및 역직렬화 라고 합니다.

생산자가 주제에 레코드를 보낼 때 키와 값을 바이트 배열로 변환하는 데 사용할 직렬 변환기를 결정해야 합니다. 키 및 값 직렬 변환기는 독립적으로 선택됩니다. 소비자가 레코드를 받으면 바이트 배열을 원래 값으로 다시 변환하는 데 사용할 역직렬 변환기를 결정해야 합니다. 직렬 변환기와 역직렬 변환기는 쌍으로 제공됩니다. 다른 디시리얼라이저를 사용하면 바이트 내용을 이해할 수 없습니다.

ksqlDB는 직렬화의 추상화를 상당히 높입니다. 직렬 변환기를 수동으로 구성하는 대신 스트림/테이블 생성 시 구성 옵션을 사용하여 형식을 선언합니다. 어떤 주제가 어떤 방식으로 직렬화되었는지 추적하는 대신 ksqlDB는 각 스트림 및 테이블의 바이트 표현에 대한 메타데이터를 유지 관리합니다. 소비자는 올바른 deserializer를 사용하도록 자동으로 구성됩니다.

스키마

Kafka로 직렬화된 레코드는 불투명 바이트이지만 레코드를 처리할 수 있도록 하려면 구조에 대한 몇 가지 규칙이 있어야 합니다. 이 구조의 한 측면은 데이터의 모양과 필드를 정의하는 데이터 스키마입니다. 정수인가요? foo, bar, 및 키가 있는 맵 baz입니까? 다른 것?

시행 메커니즘이 없으면 스키마가 암시적입니다. 소비자는 어떻게든 생산된 데이터의 형태를 알아야 합니다. 종종 이것은 한 그룹의 사람들이 스키마에 구두로 동의하도록 함으로써 발생합니다. 그러나 이 접근 방식은 오류가 발생하기 쉽습니다. 스키마를 중앙에서 관리하고 감사하며 프로그래밍 방식으로 시행할 수 있는 경우가 더 좋습니다.

Kafka 외부 프로젝트인 Confluent Schema Registry 는 스키마 관리를 돕습니다. 스키마 레지스트리를 사용하면 생산자가 주제를 스키마에 등록할 수 있으므로 추가 데이터가 생성될 때 스키마를 준수하지 않으면 거부됩니다. 소비자는 스키마 레지스트리를 참조하여 모르는 주제에 대한 스키마를 찾을 수 있습니다.

생산자, 소비자 및 스키마 구성을 하나로 묶는 대신 ksqlDB는 스키마 레지스트리와 투명하게 통합됩니다. 두 시스템이 서로 통신할 수 있도록 구성 옵션을 활성화하여 ksqlDB는 모든 스트림 및 테이블 스키마를 스키마 레지스트리에 저장합니다. 그런 다음 이러한 스키마를 다운로드하여 ksqlDB 데이터로 작업하는 모든 애플리케이션에서 사용할 수 있습니다. 또한 ksqlDB는 기존 주제의 스키마를 자동으로 유추할 수 있으므로 스트림이나 테이블을 정의할 때 구조를 선언할 필요가 없습니다.

소비자 그룹

소비자 프로그램이 부팅되면 여러 소비자가 들어갈 수 있는 소비자 그룹에 등록됩니다 . 레코드를 사용할 수 있을 때마다 그룹의 정확히 한 소비자가 레코드를 읽습니다. 이는 일련의 프로세스가 레코드 소비를 조정하고 로드 밸런싱할 수 있는 방법을 효과적으로 제공합니다.

단일 주제의 레코드는 그룹의 한 프로세스에서 사용하도록 되어 있으므로 구독의 각 파티션은 한 번에 한 소비자만 읽습니다. 각 소비자가 담당하는 파티션 수는 총 소스 파티션 수를 소비자 수로 나눈 값으로 정의됩니다. 소비자가 그룹에 동적으로 가입하면 소유권이 다시 계산되고 파티션이 다시 할당됩니다. 소비자가 그룹을 떠나면 동일한 계산이 수행됩니다.

ksqlDB는 이 강력한 로드 밸런싱 프리미티브를 기반으로 합니다. ksqlDB 서버 클러스터에 퍼시스턴트 쿼리를 배포하면 소스 파티션 수에 따라 클러스터 전체에 워크로드가 분산된다. 이 모든 것이 자동으로 수행되기 때문에 그룹 구성원 자격을 명시적으로 관리할 필요가 없습니다.

예를 들어 소스 파티션이 10개인 영구 쿼리를 노드가 2개인 ksqlDB 클러스터에 배포하는 경우 각 노드는 5개의 파티션을 처리합니다. 서버를 잃어버리면 남은 유일한 서버가 자동으로 재조정되어 10개를 모두 처리합니다. 이제 4개의 서버를 더 추가하면 각각이 2개의 파티션을 처리하도록 재조정됩니다.

보유 및 압축

일정 기간 후에 오래된 레코드를 정리하는 것이 종종 바람직합니다. 보존과 압축은 이를 위한 두 가지 다른 옵션입니다. 둘 다 선택 사항이며 함께 사용할 수 있습니다.

보존은 레코드가 삭제되기 전에 저장되는 기간을 정의합니다. 보존은 주제의 레코드를 삭제하는 유일한 방법 중 하나입니다. 이 매개변수는 이벤트 스트림을 재생할 수 있는 시간 범위를 정의하기 때문에 스트림 처리에서 특히 중요합니다. 재생은 버그를 수정하거나, 새 애플리케이션을 빌드하거나, 일부 기존 논리를 백테스트하는 경우에 유용합니다.

ksqlDB를 사용하면 기본 스트림 및 테이블의 기본 주제 보존을 직접 제어할 수 있으므로 개념을 이해하는 것이 중요합니다. 자세한 내용 은 Kafka 문서의 주제 및 로그를 참조하십시오 .

대조적으로 압축은 키당 최신 레코드를 제외한 모든 레코드를 주기적으로 삭제하는 각 Kafka 브로커의 백그라운드에서 실행되는 프로세스입니다. 이는 선택적인 옵트인 프로세스입니다. 압축은 레코드가 상태 부분에 대한 일종의 업데이트를 나타내고 최신 업데이트가 결국 중요한 유일한 업데이트일 때 특히 유용합니다.

ksqlDB는 구체화된 테이블을 지원하는 기본 변경 로그를 지원하기 위해 압축을 직접 활용합니다. 이를 통해 ksqlDB는 장애 조치 시 테이블을 재구축하는 데 필요한 최소한의 정보를 저장할 수 있습니다. 자세한 내용 은 Kafka 문서의 로그 압축을 참조하세요 .

'Apache Kafka' 카테고리의 다른 글

카프카 미러메이커2  (0) 2022.01.14
카프카 ISR(In-Sync-Replicas)  (0) 2022.01.14
카프카 Connect  (0) 2022.01.12
AWS MSK  (0) 2022.01.11
주키퍼 CLI  (0) 2022.01.09

Kafka Connect

Kafka Connect는 Apache Kafka®와 다른 데이터 시스템 간에 데이터를 스트리밍하기 위한 도구입니다. Kafka Connect는 데이터베이스 데이터를 수집하거나 모든 애플리케이션 서버에서 Kafka topic 데이터를 수집하여 스트림 처리할 수 있도록 합니다. 내보내기 커넥터는 Kafka topic의 데이터를 Elasticsearch나 Hadoop시스템 등으로 전달할 수 있습니다.

kafka connect는 connector와 task로 구성되어 있습니다.

Kafka Connect 작동 방식

Kafka Connect에는 두 가지 유형의 커넥터가 포함되어 있습니다.

  • 소스 커넥터 – 데이터베이스를 수집하고 Kafka topic에 대한 테이블 업데이트를 스트리밍합니다. 또한 소스 커넥터는 모든 애플리케이션 서버에서 메트릭을 수집하고 이를 Kafka topic에 저장할 수 있습니다.
  • 싱크 커넥터 – Kafka topic의 데이터를 Elasticsearch나 Hadoop과 같은 시스템으로 전달합니다.

사용자 환경에 Kafka Connect를 배포하려면 Kafka Connect 사용 방법 - 시작하기 를 참조하십시오 .

 

독립 실행형 모드와 분산형 모드

커넥터는 프로세스로 실행됩니다. 이를 worker라고 합니다. worker를 실행하는 데는 독립형모드와 분산형모드가 있습니다.

Kafka Connect worker는 공유 시스템에서 실행할 수 있는 JVM 프로세스입니다. 

 

독립 실행형 모드 는 로컬 시스템에서 Kafka Connect를 개발하고 테스트하는 데 유용합니다. 일반적으로 단일 에이전트를 사용하는 환경(예: 웹 서버 로그를 Kafka로 전송)에도 사용할 수 있습니다.

독립 실행형 worker 상태는 로컬 파일 시스템에 저장됩니다.

 

분산형 모드 는 여러 시스템(노드)에서 Connect worker를 실행합니다. Connect 클러스터를 구성합니다. Kafka Connect는 클러스터 전체에 커넥터를 배포합니다. 필요에 따라 노드를 추가하거나 제거할 수 있습니다.

노드가 예기치 않게 클러스터에서 제거되는 경우 Kafka Connect는 해당 노드의 작업을 클러스터의 다른 노드에 자동으로 배포합니다. Kafka Connect는 복제되는 Kafka 클러스터 내부에 커넥터 구성, 상태 및 오프셋 정보를 저장하기 때문에 Connect worker를 실행하는 노드가 손실되더라도 데이터가 손실되지 않습니다.

분산형 worker는 모든 상태를 Kafka에 저장합니다.

 

 

Kafka와 mariaDB 연결하기

1. database 및 user 생성

mariadb 는 설치가 되었다는 가정에서 설명합니다. mariDB 설치는 설치문서를 참고하세요.

kafka connector와 연동할 database, user 그리고 table을 생성합니다.

$ mysql -u root -p

-- database 생성
MariaDB [mysql]> create database kafkadb;

-- user 생성
MariaDB [mysql]> CREATE USER 'kafka'@'%' IDENTIFIED BY 'kafka123';

-- 생성된 계정에 데이터베이스 ALL 권한 부여
MariaDB [mysql]> GRANT ALL PRIVILEGES ON kafkadb.* TO 'kafka'@'%' IDENTIFIED BY 'kafka123';

-- 권한 적용
MariaDB [mysql]> FLUSH PRIVILEGES;

 

kafka connector에서 사용할 table을 생성합니다.

$ mysql -u kafka -p
-- For timestamp+incrementing TEST
MariaDB [mysql]> CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY,
  created DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;

-- For timestamp TEST
MariaDB [mysql]> CREATE TABLE products (
  seq INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  created DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;

-- For incrementing TEST
MariaDB [mysql]> CREATE TABLE orders (
  seq INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  product VARCHAR(255) NOT NULL	
) ;

-- For bulk TEST
MariaDB [mysql]> create table bulk_tab(
  name VARCHAR(255) ,
  log VARCHAR(255)	
) ;

 

 

2. connector 생성 및 실행

2.1 독립 실행형 worker 실행

다음은 독립 실행형 모드에서 worker를 실행하는 구문입니다.

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties connector3.properties ...]
 

두 번째 매개변수 connector1.properties 에 mariaDB에 대한 접속 정보를 기술합니다. 여러 개의 connector를 생성할 수 있습니다. 여러 개의 connector을 생성하려면 아래와 같이 connector.properties 파일을 나열하면 됩니다.

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties connector2.properties connector3.properties ...

 

 

1) mysql DB connector1.properties 예제

imestamp+incrementing mode의 connector를 생성하는 properties 파일 내용입니다.

name=mysql-time-incre-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://localhost:3306/kafkadb?user=kafka&password=kafka123
table.whitelist=customers
mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id
topic.prefix=mysql-

 

파라미터 설명

  • name=커넥터의 고유한 이름입니다.
  • connector.class : 커넥터의 자바 클래스명(mysql은 io.debezium.connector.mysql.MySqlConnector)
  • connection.url : jdbc:mysql://IP_Address:port/dbname?user=username&password=user_password
  • table.whitelist : connector에서 사용할 테이블을 기술
  • mode : timestamp와 incrementing 모드. 즉, 컬럼의 변경 기준을 timestamp로 할지 incrementing을 할지를 설정
  • timestamp.column.name : timestamp나 datetime 의 컬럼 기술
  • incrementing.column.name : auto_increment로 지정된 컬럼 기술. auto_increment 로 지정되지 않아도 증가하는 컬럼을 기술할 수 있음
  • topic_prefix : kafka 에 생성할 topic의 접두어 

 

1.1 worker 실행

timestamp+incrementing mode의 worker를 실행합니다.

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties
 

* timestamp+incrementing  mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.

여기에서 물음표(?) 는 kafka 내부에서 관리하는 offset입니다.

첫번째 물음표는 서버의 현재시간을 나타내고 두번째 물음표는 offset을 나타냅니다.

incrementing 데이터를 입력할 경우 modified 날짜가 현재시간보다는 작고 offset 날짜와는 같아야 합니다.

조건이 까다로우므로 timestamp+incrementing  modes는 조심해서 적용해야 합니다. timestamp와 incrementing 은 각각 적용하는 것이 관리가 용이합니다.

 Begin using SQL query: SELECT * FROM `kafkadb`.`customers` 
 WHERE `kafkadb`.`customers`.`modified` < ? 
 AND ((`kafkadb`.`customers`.`modified` = ? AND `kafkadb`.`customers`.`id` > ?) OR `kafkadb`.`customers`.`modified` > ?) 
 ORDER BY `kafkadb`.`customers`.`modified`,`kafkadb`.`customers`.`id` ASC

 

1.2 consumer 실행

위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.

위의 예제에서 topic.prefix=mysql- 이고 테이블은 customers 이므로  mysql-customers 라는 이름으로 topic이 생성됩니다. 

아래는 mysql-customers topic를 조회하는 예제입니다. mariaDB에 새로운 데이터를 입력하거나 modified 컬럼이 수정되면 consumer에 메시지를 보냅니다.

bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic mysql-customers

## Result
{
... ...
  "payload": {
    "id": 1,
    "name": "jyp",
    "email": "ypjeong@kafka.com",
    "created": 1642725853000,
    "modified": 1642725853000
  }
}

 

1.3 데이터 입력

mariaDB에 접속하여 아래와 같이 테이블에 데이터를 입력합니다. 데이터 입력 시 consumer 창에 데이터가 출력되는 것을 확인할 수 있습니다.

INSERT INTO customers(name, email) values('jyp','ypjeong@kafka.com');
INSERT INTO customers(name, email) values('jyh','yhjeon@kafka.com');
INSERT INTO customers(name, email) values('kky','kykim@kafka.com');

 

 

2) mysql DB connector2.properties 예제

imestamp mode의 connector를 생성하는 properties 파일 내용입니다.

name=mysql-time-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://localhost:3306/kafkadb?user=kafka&password=kafka123
table.whitelist=products
mode=timestamp
timestamp.column.name=modified
topic.prefix=mysql-

 

2.1 worker 실행

timestamp mode의 worker를 실행합니다.

bin/connect-standalone.sh config/connect-standalone.properties connector2.properties

* timestamp  mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.

여기에서 물음표(?) 는 kafka 내부에서 관리하는 offset입니다.

Found offset {{table=products}=null, {protocol=1, table=kafkadb.products}={timestamp_nanos=0, timestamp=1642904785000}} for partition {protocol=1, table=kafkadb.products}

첫번째 물음표는 offset을 나타내고 두번째 물음표는 서버의 현재시간을 나타냅니다.

만일, timestamp mode가 적용된 컬럼에 현재시간보다 큰 날짜값이 들어온다면 그 시간 차이만큼 데이터는 전송되지 않습니다. 물론, offset 날짜보다 작은 값으로 udpate하면 데이터를 가져올 수 없습니다. 

SELECT * FROM `kafkadb`.`products` 
WHERE `kafkadb`.`products`.`modified` > ?  -- offset 날짜
AND   `kafkadb`.`products`.`modified` < ?  -- kafka connector 현재 날짜
ORDER BY `kafkadb`.`products`.`modified` ASC
 

2.2 consumer 실행

위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.

위의 예제에서 topic.prefix=mysql- 이고 테이블은 products 이므로  mysql-porducts 라는 이름으로 topic이 생성됩니다. 

아래는 mysql-porducts topic을 조회하는 예제입니다. mariaDB products 테이블의 modified 컬럼이 수정되면 consumer에 메시지를 보냅니다.

bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic mysql-products

 

2.3 데이터 입력

mariaDB에 접속하여 아래와 같이 테이블에 데이터를 입력합니다. 데이터 입력 시 consumer 창에 데이터가 출력되는 것을 확인할 수 있습니다.

INSERT INTO products(name) values('computer');
INSERT INTO products(name) values('mouse');

 

3) mysql DB connector3.properties 예제

incrementing mode의 connector를 생성하는 properties 파일 내용입니다.

name=mysql-incre-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://localhost:3306/kafkadb?user=kafka&password=kafka123
table.whitelist=orders
mode=incrementing
incrementing.column.name=seq
topic.prefix=mysql-

 

3.1 worker 실행

incrementing mode의 worker를 실행합니다.

bin/connect-standalone.sh config/connect-standalone.properties connector3.properties
* incrementing  mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.

여기에서 물음표(?) 는 kafka 내부에서 관리하는 offset입니다. 

SELECT * FROM `kafkadb`.`orders` 
WHERE `kafkadb`.`orders`.`seq` > ? 
ORDER BY `kafkadb`.`orders`.`seq` ASC

 

3.2 consumer 실행

위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.

위의 예제에서 topic.prefix=mysql- 이고 테이블은 orders 이므로  mysql-orders 라는 이름으로 topic이 생성됩니다. 

아래는 mysql-orders topic을 조회하는 예제입니다. mariaDB orders 테이블에 새로운 데이터가 입력되면 consumer에 메시지를 보냅니다.

bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic mysql-orders

 

3.3 데이터 입력

mariaDB에 접속하여 아래와 같이 테이블에 데이터를 입력합니다. 데이터 입력 시 consumer 창에 데이터가 출력되는 것을 확인할 수 있습니다.

INSERT INTO orders(name, product) values('jyp', 'computer');
INSERT INTO orders(name, product) values('jyp', 'mouse');
INSERT INTO orders(name, product) values('jyp', 'keyboard');

 

4) bulk mode 

bulk mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.

bulk 모드에서 커넥터는 테이블 전체 데이터를 주기적으로 쿼리합니다. 따라서 원본 테이블에 100,000행인 경우 커넥터는 데이터베이스의 새 행 또는 오래된 행 수에 관계없이 모든 데이터를 폴링하고 Apache Kafka topic에 100,000 개의 새 메시지를 삽입합니다.

예제는 생략합니다.

 SELECT * FROM `kafkadb`.`bulk_tab`

 

5) multi task

multi task 를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.

multi task는 테이블에 동일한 컬럼명일 경우에 적용하면 multi-task로 작동합니다.예제에서는 2개의 다른 테이블이 동일한 컬럼명(id)을 가지고 있고 id 컬럼에 incrementing mode를 적용하여 multi-task로 수행하는 예제입니다. 컬럼명이 동일하고 데이터 추출 mode가 동일하면 multi-task로 수행할 수 있습니다.예제는생략합니다.
[2022-01-23 20:06:54,884] INFO [mysql-multi-incre-source|task-0] 
Begin using SQL query: 
SELECT * 
FROM `kafkadb`.`multitab1` 
WHERE `kafkadb`.`multitab1`.`id` > ? 
ORDER BY `kafkadb`.`multitab1`.`id` ASC

[2022-01-23 20:06:54,884] INFO [mysql-multi-incre-source|task-1] 
Begin using SQL query: 
SELECT * 
FROM `kafkadb`.`multitab2` 
WHERE `kafkadb`.`multitab2`.`id` > ? 
ORDER BY `kafkadb`.`multitab2`.`id` ASC

 

아래와 같이 2개의 task가 수행되는 것을 확인할 수 있습니다.

 curl -X GET "http://localhost:8083/connectors/mysql-multi-incre-source/status" | jq

## Result
{
  "name": "mysql-multi-incre-source",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.1.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    }
  ],
  "type": "source"
}

 

 * offset 을 reset하기 위해서

1) connect.offsets 파일을 삭제합니다.

standalone mode에서 offset은 local 서버에서 관리됩니다. connector를 실행하기 위한 파라미터 파일인 

connect-standalone.properties 파일에 offset.storage.file.filename=/tmp/connect.offsets 가 정의되어 있습니다.

distribute mode에서는 kafka 서버 또는 zookeeper에서 관리합니다.

offset.storage.file.filename=/tmp/connect.offsets

 2) kafkacat 으로 reset 합니다. 본 문서에서는 설명하지 않습니다.

3) source connector의 이름을 변경합니다. 예제에서 mysql-id-timestamp-source 이름을 mysql-id-timestamp-source-new 등으로 변경하여 새로운 connector를 생성합니다.

 

 

2.2 REST 예제

kafka connector 및 task 에 대한 정보 조회 및 관리를 위한 REST API 예제입니다.

worker 클러스터 ID, 버전 및 git 소스 코드 커밋 ID를 가져옵니다.

$ curl localhost:8083 | jq

## Result
{
  "version": "3.0.0",
  "commit": "8cb0a5e9d3441962",
  "kafka_cluster_id": "_zhnTzR-Qui2WAYlyvvSSA"
}

 

작업자에서 사용할 수 있는 커넥터 플러그인을 나열합니다.

$ curl localhost:8083/connector-plugins | jq

## Resutl
[
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "10.3.x"
  },
  ... ...
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

 

활성화된 커넥터를 나열합니다.

$ curl localhost:8083/connectors
["mysql-timestamp-source"]

 

connector의 상태를 확인합니다. 

$ curl -s localhost:8083/connectors/mysql-timestamp-source/status | jq
{"name":"mysql-timestamp-
## Result
{
  "name": "mysql-timestamp-source",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.1.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    }
  ],
  "type": "source"
}

 

아래 명령어로 worker를 재시작합니다. 단, 커넥터를 다시 시작해도 task는 다시 시작되지 않습니다.

curl -X POST localhost:8083/connectors/mysql-timestamp-source/restart

 

커넥터의 task를 조회합니다.

$ curl localhost:8083/connectors/mysql-timestamp-source/tasks | jq

## Result
[
  {
    "id": {
      "connector": "mysql-timestamp-source",
      "task": 0
    },
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "mode": "timestamp+incrementing",
      "timestamp.column.name": "modified",
      "incrementing.column.name": "id",
      "topic.prefix": "mysql-",
      "tables": "`kafkadb`.`customers`",
      "task.class": "io.confluent.connect.jdbc.source.JdbcSourceTask",
      "tasks.max": "10",
      "name": "mysql-timestamp-source",
      "connection.url": "jdbc:mysql://localhost:3306/kafkadb?user=kafka&password=kafka123",
      "table.whitelist": "customers"
    }
  }
]

 

task를 다시 시작합니다(명령이 성공하면 출력이 없음).

curl -X POST localhost:8083/connectors/mysql-timestamp-source/tasks/0/restart
 

커넥터를 일시중지합니다.(명령이 성공하면 출력이 없음).

curl -X PUT localhost:8083/connectors/mysql-timestamp-source/pause

 

일시중지된 커넥터를 다시 시작합니다. (명령이 성공하면 출력이 없음).

curl -X PUT localhost:8083/connectors/mysql-timestamp-source/resume

 

커넥터의 config 가져오려면 다음 명령을 사용합니다.

curl localhost:8083/connectors/mysql-timestamp-source/config | jq

 

커넥터를 삭제합니다.(명령이 성공하면 출력이 없음).

curl -X DELETE localhost:8083/connectors/mysql-timestamp-source

 

Worker를 중지시키는 명령어는 따로 없으므로, 프로세스를 찾아 kill 시킵니다.

  1. worker process의 PID를 찾습니다.
     ps auwx | grep ConnectStandalone | grep -v grep | awk '{print$2}'
     
     ## Result
     9843

     Or

     jcmd | grep ConnectStandalone
     
     ## Result
     9843 org.apache.kafka.connect.cli.ConnectStandalone config/connect-standalone.properties mysql2.properties

     

  2. Kill 명령어로 프로세스를 제거합니다.
    kill -9 9843

 

consumer 연결

위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.

위의 예제에서 topic.prefix=mysql- 이고 테이블은 customers 이므로  mysql-customers 라는 이름으로 topic이 생성됩니다. 

아래는 mysql-customers topic를 조회하는 예제입니다. mariaDB에 새로운 데이터를 입력하거나 modified 컬럼이 수정되면 consumer에 메시지를 보냅니다.

bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9093 \
--topic mysql-customers  --from-beginning

## Result
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": false,
        "field": "email"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "modified"
      }
    ],
    "optional": false,
    "name": "customers"
  },
  "payload": {
    "id": 1,
    "name": "jyp",
    "email": "ypjeong@kafka.com",
    "created": 1642725853000,
    "modified": 1642725853000
  }
}

 

 

2.3 POST 요청 또는 JSON 파일 사용 예제

POST 요청 예제

아래와 같이 Source Connector에 대한 설정을 마치고 POST 요청을 통해 Connector를 생성합니다. 


JSON 파일 예제

Command Line에 Source Connector 생성정보를 모두 작성하는건 쉽지 않으므로 json파일로 작성하는 것이 편리합니다.


 

 

2.4 분산형 모드

The following shows an example command that launches a worker in distributed mode:

bin/connect-distributed worker.properties
 

For an example distributed mode configurat

 

 

 

 

 

 

 

  • connection.hostname=MySQL 서버의 IP address 또는 hostname
  • connection.port=MySQL 서버 port
  • connection.user=데이터베이스 계정
  • connection.password=데이터베이스 계정의 패스워드

 

  • database.include.list : 모니터링할 데이터베이스 이름(쉼표로 추가) 화이트리스트에 포함되지 않은 데이터베이스 이름은 모니터링에서 제외됩니다. 기본적으로 모든 데이터베이스가 모니터링됩니다. 
  • database.histroy.kafka.bootstrap.servers: Kafka 클러스터
  • database.history.kafka.topic=커넥터가 데이터베이스 스키마 기록을 저장할 Kafka topic 이름입니다.
  • include.schema.changes:커넥터가 데이터베이스 서버 ID와 이름이 같은 Kafka topic에 데이터베이스 스키마의 변경 사항을 게시해야 하는지의 여부를 지정하는 참/거짓 값입니다. 각 스키마 변경은 데이터베이스 이름을 포함하고 값에 DDL 문을 포함하는 키를 사용하여 기록됩니다. .

 

 

 

3. distributed mode

 

 

 

 

 

 

 

 

 

 

 

 

 

 

이 문서에서는 다음 주제를 다룹니다.

배포 고려 사항

Kafka Connect는 Kafka 브로커 cluster가 필수 전제 조건입니다.

 

컨플루언트 스키마 레지스트리

스키마 레지스트리 카프카 connect의 필수 서비스가 아니지만, 카프카 데이터 형식으로 avoro, Protobuf 및 JSON 스키마를 사용할 수 있습니다. 

 

독립 실행형 모드와 분산형 모드

커넥터는 프로세스로 실행됩니다. 이를 worker라고 합니다. worker를 실행하는 데는 독립형모드와 분산형모드가 있습니다.

 

 

독립 실행형 모드 는 로컬 시스템에서 Kafka Connect를 개발하고 테스트하는 데 유용합니다. 일반적으로 단일 에이전트를 사용하는 환경(예: 웹 서버 로그를 Kafka로 전송)에도 사용할 수 있습니다.

 

 

분산 모드 는 여러 시스템(노드)에서 Connect worker를 실행합니다. Connect 클러스터를 구성합니다. Kafka Connect는 클러스터 전체에 커넥터를 배포합니다. 필요에 따라 노드를 추가하거나 제거할 수 있습니다.

노드가 예기치 않게 클러스터에서 제거되는 경우 Kafka Connect는 해당 노드의 작업을 클러스터의 다른 노드에 자동으로 배포합니다. 또한 Kafka Connect는 안전하게 복제되는 Kafka 클러스터 내부에 커넥터 구성, 상태 및 오프셋 정보를 저장하기 때문에 Connect worker를 실행하는 노드가 손실되더라도 데이터가 손실되지 않습니다.

 

 

 

 

 

 

 

Connect 플러그인 설치

Kafka Connect는 확장 가능하도록 설계되어 개발자가 사용자 지정 커넥터, 변환 또는 변환기를 만들고 사용자가 이를 설치하고 실행할 수 있습니다. Kafka Connect 플러그인은 하나 JAR 파일입니다. 

 

Kafka Connect는 plugin.path에 정의된 플러그인 경로를 사용하여 플러그인을 찾습니다.

카프카 홈디렉토리 밑에 conf 디렉토리의 connect-standalone.properties 파일에 plugin.path를 설정합니다.

콤마(,)로 분리하여 여러 경로를 설정할 수 있습니다.

아래는 worker 환경설정의 예입니다. 

plugin.path=/mnt/d/share/kafka/plugins
 

플러그인을 설치하려면 플러그인 경로에 JAR를 배치하거나, JAR파일이 포함된 디렉토리의 절대 경로를 plugin.path에 추가합니다.

위의 예제에서는 Connect를 실행하는 시스템에 /mnt/d/share/kafka/plugins 디렉토리 만들고 JAR를 배치합니다.

Connect worker를 시작하면 각 worker는 플러그인 경로에 있는 모든 커넥터, 변환 및 변환기 플러그인을 검색합니다. 커넥터, 변환 또는 변환기를 사용할 때 Connect worker는 먼저 해당 플러그인에서 클래스를 로드한 다음 Kafka Connect 런타임 및 Java 라이브러리를 로드합니다.

 

 

Kafka Connector를 사용한 설정

수동으로 커넥터 설치

커넥터를 수동으로 설치하려면:

  1. Confluent Hub 에서 커넥터를 찾고 커넥터 ZIP 파일을 다운로드합니다.
  2. ZIP 파일 내용을 추출하고 내용을 원하는 위치에 복사합니다. 예를 들어 라는 이름의 디렉토리 plugin.path를 만든 다음 커넥터 플러그인 내용을 복사합니다.
  3. Connect 속성 파일의 플러그인 경로에 추가합니다. 예를 들어, plugin.path=/usr/local/share/kafka/plugins. Kafka Connect는 플러그인 경로를 사용하여 플러그인을 찾습니다. 플러그인 경로는 Kafka Connect의 작업자 구성 에 정의된 쉼표로 구분된 디렉토리 목록입니다 .
  4. 해당 구성으로 Connect 작업자를 시작하십시오. Connect는 해당 플러그인 내에 정의된 모든 커넥터를 검색합니다.
  5. Connect가 실행 중인 각 시스템에 대해 이 단계를 반복합니다. 각 커넥터는 각 작업자에서 사용할 수 있어야 합니다.

 

 

 

 

 

Worker 구성 및 실행

독립 실행형 모드 또는 분산 모드에서 worker를 실행하는 방법에 대해 알아봅니다.

독립 실행형 모드

독립 실행형 모드는 일반적으로 개발 및 테스트 또는 경량의 단일 에이전트 환경(예: 웹 서버 로그를 Kafka로 전송)에 사용됩니다. 다음은 독립 실행형 모드에서 worker를 시작하는 예제 입니다. kafka bin 디렉토리에 connect-standalone.sh 파일이 존재합니다.

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties connector3.properties ...]
 

첫 번째 매개변수( connect-standalone.properties)는 worker 구성 속성 파일 입니다. Kafka 클러스터 및 직렬화 형식과 같은 설정을 제어할 수 있습니다. 

두 번째 매개변수( connector1.properties)는 커넥터 구성 속성 파일입니다. 모든 커넥터에는 worker와 함께 로드되는 구성 속성이 있습니다. 예제와 같이 이 명령어로 여러 커넥터를 시작할 수 있습니다.

동일한 호스트 시스템에서 여러 독립 실행형 작업자를 실행하는 경우 다음 두 구성 속성은 각 worker에 대해 고유해야 합니다.

  • offset.storage.file.filename: 커넥터 오프셋의 저장 파일 이름입니다. 이 파일은 독립 실행형 모드에서 로컬 파일 시스템에 저장됩니다. 두 작업자에 대해 동일한 파일 이름을 사용하면 오프셋 데이터가 삭제되거나 다른 값으로 덮어쓰여집니다.
  • listeners: REST API가 수신하는 URI 목록 형식 protocol://host:port,protocol2://host2:port- 프로토콜은 HTTP 또는 HTTPS입니다. 0.0.0.0모든 인터페이스에 바인딩하려면 호스트 이름을 지정하거나 기본 인터페이스에 바인딩하려면 호스트 이름을 비워 둘 수 있습니다.

 

 

카프카 커넥트 실행

Kafka Connect는 독립 실행형(단일 프로세스) 및 분산실행형의 두 가지 실행 모드를 지원합니다.

독립 실행형 모드에서는 모든 작업이 단일 프로세스에서 수행됩니다.

다음 명령을 사용하여 독립 실행형 프로세스를 시작할 수 있습니다.

    > bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

첫 번째 매개변수는 작업자에 대한 구성입니다. 여기에는 Kafka 연결 매개변수, 직렬화 형식 및 오프셋 커밋 빈도와 같은 설정이 포함됩니다. 모든 worker(독립 실행형 및 분산형 모두)에는 몇 가지 구성이 필요합니다.

  • bootstrap.servers - Kafka에 대한 연결을 부트스트랩하는 데 사용되는 Kafka 서버 목록
  • key.converter- Kafka Connect 형식과 Kafka에 작성된 직렬화된 형식 간에 변환하는 데 사용되는 변환기 클래스입니다. 이것은 Kafka에서 읽거나 쓰는 메시지의 키 형식을 제어하며 이는 커넥터와 독립적이므로 모든 커넥터가 모든 직렬화 형식과 함께 작동할 수 있습니다. 일반적인 형식의 예로는 JSON 및 Avro가 있습니다.
  • value.converter- Kafka Connect 형식과 Kafka에 작성된 직렬화된 형식 간에 변환하는 데 사용되는 변환기 클래스입니다. 이것은 Kafka에서 읽거나 쓰는 메시지의 값 형식을 제어하며 이것은 커넥터와 독립적이므로 모든 커넥터가 모든 직렬화 형식과 함께 작동할 수 있습니다. 일반적인 형식의 예로는 JSON 및 Avro가 있습니다.

독립 실행형 모드와 관련된 중요한 구성 옵션은 다음과 같습니다.

  • offset.storage.file.filename - 오프셋 데이터를 저장할 파일

여기에 구성된 매개변수는 구성, 오프셋 및 topic에 액세스하기 위해 Kafka Connect에서 사용하는 생산자 및 소비자를 위한 것입니다. Kafka 소스 작업에서 사용하는 생산자와 Kafka 싱크 작업에서 사용하는 소비자 구성의 경우 동일한 매개변수를 사용할 수 있지만 접두사 producer.및 consumer. 각각 구성하는 것이 좋습니다. 

worker 구성에서 접두사 없이 상속되는 경우는 Kafka 클라이언트 매개변수는 bootstrap.servers 입니다. 예외는 연결을 허용하기 위해 추가 매개변수가 필요한 보안 클러스터입니다. 이러한 매개변수는 작업자 구성에서 관리 액세스에 대해 한 번, Kafka 소스에 대해 한 번, Kafka 싱크에 대해 한 번 등 최대 세 번 설정해야 합니다.

 

kafka connector에는 connector1.properties sample 파일이 없습니다. 아래 파일을 참고하여 접속하고자 하는 환경을 설정해줘야 합니다.

 

 

mysql DB connector.properties 예제

name=mysql-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=localhost
database.port=3306
database.user=root
database.password=****
database.server.name=appdb
database.histroy.kafka.bootstrap.servers=172.24.118.82:9092
database.history.kafka.topic=mysql_topic
include.schema.changes=true

의미 설명

  • name=커넥터의 고유한 이름입니다.
  • connector.class=커넥터의 자바 클래스명(mysql은 io.debezium.connector.mysql.MySqlConnector)
  • database.hostname=MySQL 서버의 IP address 또는 hostname
  • database.port=MySQL 서버 port
  • database.user=데이터베이스 계정
  • database.password=데이터베이스 계정의 패스워드
  • database.server.name=모니터링되는 특정 MySQL 데이터베이스 서버/클러스터에 대한 네임스페이스를 식별하는 논리적 이름입니다. 논리적 이름은 이 커넥터에서 나오는 모든 Kafka topic 이름에 대한 접두사로 사용되므로 고유해야 합니다. 기본값은 host:port입니다. 여기서 host는 database.hostname 속성 값이고 port는 database.port 속성 값입니다. Confluent는 기본값을 의미 있는 이름으로 변경할 것을 권장합니다.
  • database.include.list : 모니터링할 데이터베이스 이름(쉼표로 추가) 화이트리스트에 포함되지 않은 데이터베이스 이름은 모니터링에서 제외됩니다. 기본적으로 모든 데이터베이스가 모니터링됩니다. 
  • database.histroy.kafka.bootstrap.servers: Kafka 클러스터
  • database.history.kafka.topic=커넥터가 데이터베이스 스키마 기록을 저장할 Kafka topic 이름입니다.
  • include.schema.changes:커넥터가 데이터베이스 서버 ID와 이름이 같은 Kafka topic에 데이터베이스 스키마의 변경 사항을 게시해야 하는지의 여부를 지정하는 참/거짓 값입니다. 각 스키마 변경은 데이터베이스 이름을 포함하고 값에 DDL 문을 포함하는 키를 사용하여 기록됩니다. .

 

MySQL connector 의 상세 정보 참조 :

https://docs.confluent.io/debezium-connect-mysql-source/current/mysql_source_connector_config.html#mysql-source-connector-config

 

 

 

 

 

 

 

2.3.0부터 클라이언트 구성 재정의는 접두사를 사용하여 커넥터별로 개별적으로 구성할 수 있으며 producer.override.Kafka consumer.override.소스 또는 Kafka 싱크에 대해 각각 구성할 수 있습니다. 이러한 재정의는 나머지 커넥터 구성 속성에 포함됩니다.

나머지 매개변수는 커넥터 구성 파일입니다. 

분산 모드는 작업의 균형을 조정하고 동적으로 확장(또는 축소)할 수 있으며 활성 작업과 구성 및 오프셋 커밋 데이터에 내결함성을 제공합니다. 실행은 독립 실행형 모드와 매우 유사합니다.

    > bin/connect-distributed.sh config/connect-distributed.properties

차이점은 시작되는 클래스와 Kafka Connect 프로세스가 구성을 저장할 위치, 작업을 할당하는 방법, 오프셋 및 작업 상태를 저장할 위치를 결정하는 방법을 변경하는 구성 매개변수에 있습니다. 분산 모드에서 Kafka Connect는 오프셋, 구성 및 작업 상태를 Kafka topic에 저장합니다. 원하는 수의 파티션 및 복제 요소를 달성하기 위해 오프셋, 구성 및 상태에 대한 주제를 수동으로 생성하는 것이 좋습니다. Kafka Connect를 시작할 때 토픽이 아직 생성되지 않은 경우 기본 파티션 수와 복제 팩터를 사용하여 토픽이 자동 생성되며 이는 용도에 가장 적합하지 않을 수 있습니다.

특히 위에서 언급한 공통 설정 외에 다음 구성 매개변수는 클러스터를 시작하기 전에 설정하는 것이 중요합니다.

  • group.id(기본값 connect-cluster) - Connect 클러스터 그룹을 형성하는 데 사용되는 클러스터의 고유 이름. 소비자 그룹 ID와 충돌해서는 안 됩니다.
  • config.storage.topic(기본값 connect-configs) - 커넥터 및 작업 구성을 저장하는 데 사용할 주제입니다. 이것은 단일 파티션, 고도로 복제되고 압축된 주제여야 합니다. 자동 생성된 주제가 여러 파티션을 가질 수 있거나 압축이 아닌 삭제를 위해 자동으로 구성될 수 있으므로 올바른 구성을 보장하기 위해 주제를 수동으로 생성해야 할 수도 있습니다.
  • offset.storage.topic(기본값 connect-offsets) - 오프셋을 저장하는 데 사용할 주제입니다. 이 항목에는 많은 파티션이 있어야 하고 복제되어야 하며 압축을 위해 구성되어야 합니다.
  • status.storage.topic(기본값 connect-status) - 상태를 저장하는 데 사용할 주제입니다. 이 항목에는 여러 파티션이 있을 수 있으며 압축을 위해 복제 및 구성해야 합니다.

분산 모드에서 커넥터 구성은 명령줄에서 전달되지 않습니다. 대신 아래에 설명된 REST API를 사용하여 커넥터를 생성, 수정 및 파괴하십시오.

 

 

 

'Apache Kafka' 카테고리의 다른 글

카프카 ISR(In-Sync-Replicas)  (0) 2022.01.14
ksql - 아파치 카프카 용어 - 수정  (0) 2022.01.13
AWS MSK  (0) 2022.01.11
주키퍼 CLI  (0) 2022.01.09
아파치 주키퍼 설치 및 실행  (0) 2022.01.09

+ Recent posts