Kafka Connect
Kafka Connect는 Apache Kafka®와 다른 데이터 시스템 간에 데이터를 스트리밍하기 위한 도구입니다. Kafka Connect는 데이터베이스 데이터를 수집하거나 모든 애플리케이션 서버에서 Kafka topic 데이터를 수집하여 스트림 처리할 수 있도록 합니다. 내보내기 커넥터는 Kafka topic의 데이터를 Elasticsearch나 Hadoop시스템 등으로 전달할 수 있습니다.
kafka connect는 connector와 task로 구성되어 있습니다.
Kafka Connect 작동 방식
Kafka Connect에는 두 가지 유형의 커넥터가 포함되어 있습니다.
- 소스 커넥터 – 데이터베이스를 수집하고 Kafka topic에 대한 테이블 업데이트를 스트리밍합니다. 또한 소스 커넥터는 모든 애플리케이션 서버에서 메트릭을 수집하고 이를 Kafka topic에 저장할 수 있습니다.
- 싱크 커넥터 – Kafka topic의 데이터를 Elasticsearch나 Hadoop과 같은 시스템으로 전달합니다.
사용자 환경에 Kafka Connect를 배포하려면 Kafka Connect 사용 방법 - 시작하기 를 참조하십시오 .
독립 실행형 모드와 분산형 모드
커넥터는 프로세스로 실행됩니다. 이를 worker라고 합니다. worker를 실행하는 데는 독립형모드와 분산형모드가 있습니다.
Kafka Connect worker는 공유 시스템에서 실행할 수 있는 JVM 프로세스입니다.
독립 실행형 모드 는 로컬 시스템에서 Kafka Connect를 개발하고 테스트하는 데 유용합니다. 일반적으로 단일 에이전트를 사용하는 환경(예: 웹 서버 로그를 Kafka로 전송)에도 사용할 수 있습니다.
독립 실행형 worker 상태는 로컬 파일 시스템에 저장됩니다.
분산형 모드 는 여러 시스템(노드)에서 Connect worker를 실행합니다. Connect 클러스터를 구성합니다. Kafka Connect는 클러스터 전체에 커넥터를 배포합니다. 필요에 따라 노드를 추가하거나 제거할 수 있습니다.
노드가 예기치 않게 클러스터에서 제거되는 경우 Kafka Connect는 해당 노드의 작업을 클러스터의 다른 노드에 자동으로 배포합니다. Kafka Connect는 복제되는 Kafka 클러스터 내부에 커넥터 구성, 상태 및 오프셋 정보를 저장하기 때문에 Connect worker를 실행하는 노드가 손실되더라도 데이터가 손실되지 않습니다.
분산형 worker는 모든 상태를 Kafka에 저장합니다.
Kafka와 mariaDB 연결하기
1. database 및 user 생성
mariadb 는 설치가 되었다는 가정에서 설명합니다. mariDB 설치는 설치문서를 참고하세요.
kafka connector와 연동할 database, user 그리고 table을 생성합니다.
$ mysql -u root -p
-- database 생성
MariaDB [mysql]> create database kafkadb;
-- user 생성
MariaDB [mysql]> CREATE USER 'kafka'@'%' IDENTIFIED BY 'kafka123';
-- 생성된 계정에 데이터베이스 ALL 권한 부여
MariaDB [mysql]> GRANT ALL PRIVILEGES ON kafkadb.* TO 'kafka'@'%' IDENTIFIED BY 'kafka123';
-- 권한 적용
MariaDB [mysql]> FLUSH PRIVILEGES;
kafka connector에서 사용할 table을 생성합니다.
$ mysql -u kafka -p
-- For timestamp+incrementing TEST
MariaDB [mysql]> CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY,
created DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;
-- For timestamp TEST
MariaDB [mysql]> CREATE TABLE products (
seq INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
created DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;
-- For incrementing TEST
MariaDB [mysql]> CREATE TABLE orders (
seq INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
product VARCHAR(255) NOT NULL
) ;
-- For bulk TEST
MariaDB [mysql]> create table bulk_tab(
name VARCHAR(255) ,
log VARCHAR(255)
) ;
2. connector 생성 및 실행
2.1 독립 실행형 worker 실행
다음은 독립 실행형 모드에서 worker를 실행하는 구문입니다.
bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties connector3.properties ...]
두 번째 매개변수 connector1.properties 에 mariaDB에 대한 접속 정보를 기술합니다. 여러 개의 connector를 생성할 수 있습니다. 여러 개의 connector을 생성하려면 아래와 같이 connector.properties 파일을 나열하면 됩니다.
bin/connect-standalone.sh config/connect-standalone.properties connector1.properties connector2.properties connector3.properties ...
1) mysql DB connector1.properties 예제
imestamp+incrementing mode의 connector를 생성하는 properties 파일 내용입니다.
name=mysql-time-incre-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://localhost:3306/kafkadb?user=kafka&password=kafka123
table.whitelist=customers
mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id
topic.prefix=mysql-
파라미터 설명
- name=커넥터의 고유한 이름입니다.
- connector.class : 커넥터의 자바 클래스명(mysql은 io.debezium.connector.mysql.MySqlConnector)
- connection.url : jdbc:mysql://IP_Address:port/dbname?user=username&password=user_password
- table.whitelist : connector에서 사용할 테이블을 기술
- mode : timestamp와 incrementing 모드. 즉, 컬럼의 변경 기준을 timestamp로 할지 incrementing을 할지를 설정
- timestamp.column.name : timestamp나 datetime 의 컬럼 기술
- incrementing.column.name : auto_increment로 지정된 컬럼 기술. auto_increment 로 지정되지 않아도 증가하는 컬럼을 기술할 수 있음
- topic_prefix : kafka 에 생성할 topic의 접두어
1.1 worker 실행
timestamp+incrementing mode의 worker를 실행합니다.
bin/connect-standalone.sh config/connect-standalone.properties connector1.properties
* timestamp+incrementing mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.
여기에서 물음표(?) 는 kafka 내부에서 관리하는 offset입니다.
첫번째 물음표는 서버의 현재시간을 나타내고 두번째 물음표는 offset을 나타냅니다.
incrementing 데이터를 입력할 경우 modified 날짜가 현재시간보다는 작고 offset 날짜와는 같아야 합니다.
조건이 까다로우므로 timestamp+incrementing modes는 조심해서 적용해야 합니다. timestamp와 incrementing 은 각각 적용하는 것이 관리가 용이합니다.
Begin using SQL query: SELECT * FROM `kafkadb`.`customers`
WHERE `kafkadb`.`customers`.`modified` < ?
AND ((`kafkadb`.`customers`.`modified` = ? AND `kafkadb`.`customers`.`id` > ?) OR `kafkadb`.`customers`.`modified` > ?)
ORDER BY `kafkadb`.`customers`.`modified`,`kafkadb`.`customers`.`id` ASC
1.2 consumer 실행
위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.
위의 예제에서 topic.prefix=mysql- 이고 테이블은 customers 이므로 mysql-customers 라는 이름으로 topic이 생성됩니다.
아래는 mysql-customers topic를 조회하는 예제입니다. mariaDB에 새로운 데이터를 입력하거나 modified 컬럼이 수정되면 consumer에 메시지를 보냅니다.
bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic mysql-customers
## Result
{
... ...
"payload": {
"id": 1,
"name": "jyp",
"email": "ypjeong@kafka.com",
"created": 1642725853000,
"modified": 1642725853000
}
}
1.3 데이터 입력
mariaDB에 접속하여 아래와 같이 테이블에 데이터를 입력합니다. 데이터 입력 시 consumer 창에 데이터가 출력되는 것을 확인할 수 있습니다.
INSERT INTO customers(name, email) values('jyp','ypjeong@kafka.com');
INSERT INTO customers(name, email) values('jyh','yhjeon@kafka.com');
INSERT INTO customers(name, email) values('kky','kykim@kafka.com');
2) mysql DB connector2.properties 예제
imestamp mode의 connector를 생성하는 properties 파일 내용입니다.
name=mysql-time-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://localhost:3306/kafkadb?user=kafka&password=kafka123
table.whitelist=products
mode=timestamp
timestamp.column.name=modified
topic.prefix=mysql-
2.1 worker 실행
timestamp mode의 worker를 실행합니다.
bin/connect-standalone.sh config/connect-standalone.properties connector2.properties
* timestamp mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.
여기에서 물음표(?) 는 kafka 내부에서 관리하는 offset입니다.
Found offset {{table=products}=null, {protocol=1, table=kafkadb.products}={timestamp_nanos=0, timestamp=1642904785000}} for partition {protocol=1, table=kafkadb.products}
첫번째 물음표는 offset을 나타내고 두번째 물음표는 서버의 현재시간을 나타냅니다.
만일, timestamp mode가 적용된 컬럼에 현재시간보다 큰 날짜값이 들어온다면 그 시간 차이만큼 데이터는 전송되지 않습니다. 물론, offset 날짜보다 작은 값으로 udpate하면 데이터를 가져올 수 없습니다.
SELECT * FROM `kafkadb`.`products`
WHERE `kafkadb`.`products`.`modified` > ? -- offset 날짜
AND `kafkadb`.`products`.`modified` < ? -- kafka connector 현재 날짜
ORDER BY `kafkadb`.`products`.`modified` ASC
2.2 consumer 실행
위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.
위의 예제에서 topic.prefix=mysql- 이고 테이블은 products 이므로 mysql-porducts 라는 이름으로 topic이 생성됩니다.
아래는 mysql-porducts topic을 조회하는 예제입니다. mariaDB products 테이블의 modified 컬럼이 수정되면 consumer에 메시지를 보냅니다.
bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic mysql-products
2.3 데이터 입력
mariaDB에 접속하여 아래와 같이 테이블에 데이터를 입력합니다. 데이터 입력 시 consumer 창에 데이터가 출력되는 것을 확인할 수 있습니다.
INSERT INTO products(name) values('computer');
INSERT INTO products(name) values('mouse');
3) mysql DB connector3.properties 예제
incrementing mode의 connector를 생성하는 properties 파일 내용입니다.
name=mysql-incre-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://localhost:3306/kafkadb?user=kafka&password=kafka123
table.whitelist=orders
mode=incrementing
incrementing.column.name=seq
topic.prefix=mysql-
3.1 worker 실행
incrementing mode의 worker를 실행합니다.
bin/connect-standalone.sh config/connect-standalone.properties connector3.properties
여기에서 물음표(?) 는 kafka 내부에서 관리하는 offset입니다.
SELECT * FROM `kafkadb`.`orders`
WHERE `kafkadb`.`orders`.`seq` > ?
ORDER BY `kafkadb`.`orders`.`seq` ASC
3.2 consumer 실행
위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.
위의 예제에서 topic.prefix=mysql- 이고 테이블은 orders 이므로 mysql-orders 라는 이름으로 topic이 생성됩니다.
아래는 mysql-orders topic을 조회하는 예제입니다. mariaDB orders 테이블에 새로운 데이터가 입력되면 consumer에 메시지를 보냅니다.
bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic mysql-orders
3.3 데이터 입력
mariaDB에 접속하여 아래와 같이 테이블에 데이터를 입력합니다. 데이터 입력 시 consumer 창에 데이터가 출력되는 것을 확인할 수 있습니다.
INSERT INTO orders(name, product) values('jyp', 'computer');
INSERT INTO orders(name, product) values('jyp', 'mouse');
INSERT INTO orders(name, product) values('jyp', 'keyboard');
4) bulk mode
bulk mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.
bulk 모드에서 커넥터는 테이블 전체 데이터를 주기적으로 쿼리합니다. 따라서 원본 테이블에 100,000행인 경우 커넥터는 데이터베이스의 새 행 또는 오래된 행 수에 관계없이 모든 데이터를 폴링하고 Apache Kafka topic에 100,000 개의 새 메시지를 삽입합니다.
예제는 생략합니다.
SELECT * FROM `kafkadb`.`bulk_tab`
5) multi task
multi task 를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.
multi task는 테이블에 동일한 컬럼명일 경우에 적용하면 multi-task로 작동합니다.예제에서는 2개의 다른 테이블이 동일한 컬럼명(id)을 가지고 있고 id 컬럼에 incrementing mode를 적용하여 multi-task로 수행하는 예제입니다. 컬럼명이 동일하고 데이터 추출 mode가 동일하면 multi-task로 수행할 수 있습니다.예제는생략합니다.[2022-01-23 20:06:54,884] INFO [mysql-multi-incre-source|task-0]
Begin using SQL query:
SELECT *
FROM `kafkadb`.`multitab1`
WHERE `kafkadb`.`multitab1`.`id` > ?
ORDER BY `kafkadb`.`multitab1`.`id` ASC
[2022-01-23 20:06:54,884] INFO [mysql-multi-incre-source|task-1]
Begin using SQL query:
SELECT *
FROM `kafkadb`.`multitab2`
WHERE `kafkadb`.`multitab2`.`id` > ?
ORDER BY `kafkadb`.`multitab2`.`id` ASC
아래와 같이 2개의 task가 수행되는 것을 확인할 수 있습니다.
curl -X GET "http://localhost:8083/connectors/mysql-multi-incre-source/status" | jq
## Result
{
"name": "mysql-multi-incre-source",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
}
],
"type": "source"
}
* offset 을 reset하기 위해서
1) connect.offsets 파일을 삭제합니다.
standalone mode에서 offset은 local 서버에서 관리됩니다. connector를 실행하기 위한 파라미터 파일인
connect-standalone.properties 파일에 offset.storage.file.filename=/tmp/connect.offsets 가 정의되어 있습니다.
distribute mode에서는 kafka 서버 또는 zookeeper에서 관리합니다.
offset.storage.file.filename=/tmp/connect.offsets
2) kafkacat 으로 reset 합니다. 본 문서에서는 설명하지 않습니다.
3) source connector의 이름을 변경합니다. 예제에서 mysql-id-timestamp-source 이름을 mysql-id-timestamp-source-new 등으로 변경하여 새로운 connector를 생성합니다.
2.2 REST 예제
kafka connector 및 task 에 대한 정보 조회 및 관리를 위한 REST API 예제입니다.
worker 클러스터 ID, 버전 및 git 소스 코드 커밋 ID를 가져옵니다.
$ curl localhost:8083 | jq
## Result
{
"version": "3.0.0",
"commit": "8cb0a5e9d3441962",
"kafka_cluster_id": "_zhnTzR-Qui2WAYlyvvSSA"
}
작업자에서 사용할 수 있는 커넥터 플러그인을 나열합니다.
$ curl localhost:8083/connector-plugins | jq
## Resutl
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.3.x"
},
... ...
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
활성화된 커넥터를 나열합니다.
$ curl localhost:8083/connectors
["mysql-timestamp-source"]
connector의 상태를 확인합니다.
$ curl -s localhost:8083/connectors/mysql-timestamp-source/status | jq
{"name":"mysql-timestamp-
## Result
{
"name": "mysql-timestamp-source",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
}
],
"type": "source"
}
아래 명령어로 worker를 재시작합니다. 단, 커넥터를 다시 시작해도 task는 다시 시작되지 않습니다.
curl -X POST localhost:8083/connectors/mysql-timestamp-source/restart
커넥터의 task를 조회합니다.
$ curl localhost:8083/connectors/mysql-timestamp-source/tasks | jq
## Result
[
{
"id": {
"connector": "mysql-timestamp-source",
"task": 0
},
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp+incrementing",
"timestamp.column.name": "modified",
"incrementing.column.name": "id",
"topic.prefix": "mysql-",
"tables": "`kafkadb`.`customers`",
"task.class": "io.confluent.connect.jdbc.source.JdbcSourceTask",
"tasks.max": "10",
"name": "mysql-timestamp-source",
"connection.url": "jdbc:mysql://localhost:3306/kafkadb?user=kafka&password=kafka123",
"table.whitelist": "customers"
}
}
]
task를 다시 시작합니다(명령이 성공하면 출력이 없음).
curl -X POST localhost:8083/connectors/mysql-timestamp-source/tasks/0/restart
커넥터를 일시중지합니다.(명령이 성공하면 출력이 없음).
curl -X PUT localhost:8083/connectors/mysql-timestamp-source/pause
일시중지된 커넥터를 다시 시작합니다. (명령이 성공하면 출력이 없음).
curl -X PUT localhost:8083/connectors/mysql-timestamp-source/resume
커넥터의 config 가져오려면 다음 명령을 사용합니다.
curl localhost:8083/connectors/mysql-timestamp-source/config | jq
커넥터를 삭제합니다.(명령이 성공하면 출력이 없음).
curl -X DELETE localhost:8083/connectors/mysql-timestamp-source
Worker를 중지시키는 명령어는 따로 없으므로, 프로세스를 찾아 kill 시킵니다.
- worker process의 PID를 찾습니다.
ps auwx | grep ConnectStandalone | grep -v grep | awk '{print$2}' ## Result 9843
Or
jcmd | grep ConnectStandalone ## Result 9843 org.apache.kafka.connect.cli.ConnectStandalone config/connect-standalone.properties mysql2.properties
- Kill 명령어로 프로세스를 제거합니다.
kill -9 9843
consumer 연결
위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.
위의 예제에서 topic.prefix=mysql- 이고 테이블은 customers 이므로 mysql-customers 라는 이름으로 topic이 생성됩니다.
아래는 mysql-customers topic를 조회하는 예제입니다. mariaDB에 새로운 데이터를 입력하거나 modified 컬럼이 수정되면 consumer에 메시지를 보냅니다.
bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9093 \
--topic mysql-customers --from-beginning
## Result
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "string",
"optional": false,
"field": "email"
},
{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created"
},
{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "modified"
}
],
"optional": false,
"name": "customers"
},
"payload": {
"id": 1,
"name": "jyp",
"email": "ypjeong@kafka.com",
"created": 1642725853000,
"modified": 1642725853000
}
}
2.3 POST 요청 또는 JSON 파일 사용 예제
POST 요청 예제
아래와 같이 Source Connector에 대한 설정을 마치고 POST 요청을 통해 Connector를 생성합니다.
JSON 파일 예제
Command Line에 Source Connector 생성정보를 모두 작성하는건 쉽지 않으므로 json파일로 작성하는 것이 편리합니다.
2.4 분산형 모드
The following shows an example command that launches a worker in distributed mode:
bin/connect-distributed worker.properties
For an example distributed mode configurat
- connection.hostname=MySQL 서버의 IP address 또는 hostname
- connection.port=MySQL 서버 port
- connection.user=데이터베이스 계정
- connection.password=데이터베이스 계정의 패스워드
- database.include.list : 모니터링할 데이터베이스 이름(쉼표로 추가) 화이트리스트에 포함되지 않은 데이터베이스 이름은 모니터링에서 제외됩니다. 기본적으로 모든 데이터베이스가 모니터링됩니다.
- database.histroy.kafka.bootstrap.servers: Kafka 클러스터
- database.history.kafka.topic=커넥터가 데이터베이스 스키마 기록을 저장할 Kafka topic 이름입니다.
- include.schema.changes:커넥터가 데이터베이스 서버 ID와 이름이 같은 Kafka topic에 데이터베이스 스키마의 변경 사항을 게시해야 하는지의 여부를 지정하는 참/거짓 값입니다. 각 스키마 변경은 데이터베이스 이름을 포함하고 값에 DDL 문을 포함하는 키를 사용하여 기록됩니다. .
3. distributed mode
이 문서에서는 다음 주제를 다룹니다.
- 배포 고려 사항
- Connect 플러그인 설치
- 작업자 구성 및 실행
- 키 및 값 변환기 구성
- 생산자와 소비자를 연결
- 소스 커넥터 자동 토픽 생성
- 커넥트 리포터
- ConfigProvider 인터페이스
- 다음 단계(추가 참조 및 데모 링크)
배포 고려 사항
Kafka Connect는 Kafka 브로커 cluster가 필수 전제 조건입니다.
컨플루언트 스키마 레지스트리
스키마 레지스트리 카프카 connect의 필수 서비스가 아니지만, 카프카 데이터 형식으로 avoro, Protobuf 및 JSON 스키마를 사용할 수 있습니다.
독립 실행형 모드와 분산형 모드
커넥터는 프로세스로 실행됩니다. 이를 worker라고 합니다. worker를 실행하는 데는 독립형모드와 분산형모드가 있습니다.
독립 실행형 모드 는 로컬 시스템에서 Kafka Connect를 개발하고 테스트하는 데 유용합니다. 일반적으로 단일 에이전트를 사용하는 환경(예: 웹 서버 로그를 Kafka로 전송)에도 사용할 수 있습니다.
분산 모드 는 여러 시스템(노드)에서 Connect worker를 실행합니다. Connect 클러스터를 구성합니다. Kafka Connect는 클러스터 전체에 커넥터를 배포합니다. 필요에 따라 노드를 추가하거나 제거할 수 있습니다.
노드가 예기치 않게 클러스터에서 제거되는 경우 Kafka Connect는 해당 노드의 작업을 클러스터의 다른 노드에 자동으로 배포합니다. 또한 Kafka Connect는 안전하게 복제되는 Kafka 클러스터 내부에 커넥터 구성, 상태 및 오프셋 정보를 저장하기 때문에 Connect worker를 실행하는 노드가 손실되더라도 데이터가 손실되지 않습니다.
Connect 플러그인 설치
Kafka Connect는 확장 가능하도록 설계되어 개발자가 사용자 지정 커넥터, 변환 또는 변환기를 만들고 사용자가 이를 설치하고 실행할 수 있습니다. Kafka Connect 플러그인은 하나 JAR 파일입니다.
Kafka Connect는 plugin.path에 정의된 플러그인 경로를 사용하여 플러그인을 찾습니다.
카프카 홈디렉토리 밑에 conf 디렉토리의 connect-standalone.properties 파일에 plugin.path를 설정합니다.
콤마(,)로 분리하여 여러 경로를 설정할 수 있습니다.
아래는 worker 환경설정의 예입니다.
plugin.path=/mnt/d/share/kafka/plugins
플러그인을 설치하려면 플러그인 경로에 JAR를 배치하거나, JAR파일이 포함된 디렉토리의 절대 경로를 plugin.path에 추가합니다.
위의 예제에서는 Connect를 실행하는 시스템에 /mnt/d/share/kafka/plugins 디렉토리를 만들고 JAR를 배치합니다.
Connect worker를 시작하면 각 worker는 플러그인 경로에 있는 모든 커넥터, 변환 및 변환기 플러그인을 검색합니다. 커넥터, 변환 또는 변환기를 사용할 때 Connect worker는 먼저 해당 플러그인에서 클래스를 로드한 다음 Kafka Connect 런타임 및 Java 라이브러리를 로드합니다.
Kafka Connector를 사용한 설정
수동으로 커넥터 설치
커넥터를 수동으로 설치하려면:
- Confluent Hub 에서 커넥터를 찾고 커넥터 ZIP 파일을 다운로드합니다.
- ZIP 파일 내용을 추출하고 내용을 원하는 위치에 복사합니다. 예를 들어 라는 이름의 디렉토리 plugin.path를 만든 다음 커넥터 플러그인 내용을 복사합니다.
- Connect 속성 파일의 플러그인 경로에 추가합니다. 예를 들어, plugin.path=/usr/local/share/kafka/plugins. Kafka Connect는 플러그인 경로를 사용하여 플러그인을 찾습니다. 플러그인 경로는 Kafka Connect의 작업자 구성 에 정의된 쉼표로 구분된 디렉토리 목록입니다 .
- 해당 구성으로 Connect 작업자를 시작하십시오. Connect는 해당 플러그인 내에 정의된 모든 커넥터를 검색합니다.
- Connect가 실행 중인 각 시스템에 대해 이 단계를 반복합니다. 각 커넥터는 각 작업자에서 사용할 수 있어야 합니다.
Worker 구성 및 실행
독립 실행형 모드 또는 분산 모드에서 worker를 실행하는 방법에 대해 알아봅니다.
독립 실행형 모드
독립 실행형 모드는 일반적으로 개발 및 테스트 또는 경량의 단일 에이전트 환경(예: 웹 서버 로그를 Kafka로 전송)에 사용됩니다. 다음은 독립 실행형 모드에서 worker를 시작하는 예제 입니다. kafka bin 디렉토리에 connect-standalone.sh 파일이 존재합니다.
bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties connector3.properties ...]
첫 번째 매개변수( connect-standalone.properties)는 worker 구성 속성 파일 입니다. Kafka 클러스터 및 직렬화 형식과 같은 설정을 제어할 수 있습니다.
두 번째 매개변수( connector1.properties)는 커넥터 구성 속성 파일입니다. 모든 커넥터에는 worker와 함께 로드되는 구성 속성이 있습니다. 예제와 같이 이 명령어로 여러 커넥터를 시작할 수 있습니다.
동일한 호스트 시스템에서 여러 독립 실행형 작업자를 실행하는 경우 다음 두 구성 속성은 각 worker에 대해 고유해야 합니다.
- offset.storage.file.filename: 커넥터 오프셋의 저장 파일 이름입니다. 이 파일은 독립 실행형 모드에서 로컬 파일 시스템에 저장됩니다. 두 작업자에 대해 동일한 파일 이름을 사용하면 오프셋 데이터가 삭제되거나 다른 값으로 덮어쓰여집니다.
- listeners: REST API가 수신하는 URI 목록 형식 protocol://host:port,protocol2://host2:port- 프로토콜은 HTTP 또는 HTTPS입니다. 0.0.0.0모든 인터페이스에 바인딩하려면 호스트 이름을 지정하거나 기본 인터페이스에 바인딩하려면 호스트 이름을 비워 둘 수 있습니다.
카프카 커넥트 실행
Kafka Connect는 독립 실행형(단일 프로세스) 및 분산실행형의 두 가지 실행 모드를 지원합니다.
독립 실행형 모드에서는 모든 작업이 단일 프로세스에서 수행됩니다.
다음 명령을 사용하여 독립 실행형 프로세스를 시작할 수 있습니다.
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
첫 번째 매개변수는 작업자에 대한 구성입니다. 여기에는 Kafka 연결 매개변수, 직렬화 형식 및 오프셋 커밋 빈도와 같은 설정이 포함됩니다. 모든 worker(독립 실행형 및 분산형 모두)에는 몇 가지 구성이 필요합니다.
- bootstrap.servers - Kafka에 대한 연결을 부트스트랩하는 데 사용되는 Kafka 서버 목록
- key.converter- Kafka Connect 형식과 Kafka에 작성된 직렬화된 형식 간에 변환하는 데 사용되는 변환기 클래스입니다. 이것은 Kafka에서 읽거나 쓰는 메시지의 키 형식을 제어하며 이는 커넥터와 독립적이므로 모든 커넥터가 모든 직렬화 형식과 함께 작동할 수 있습니다. 일반적인 형식의 예로는 JSON 및 Avro가 있습니다.
- value.converter- Kafka Connect 형식과 Kafka에 작성된 직렬화된 형식 간에 변환하는 데 사용되는 변환기 클래스입니다. 이것은 Kafka에서 읽거나 쓰는 메시지의 값 형식을 제어하며 이것은 커넥터와 독립적이므로 모든 커넥터가 모든 직렬화 형식과 함께 작동할 수 있습니다. 일반적인 형식의 예로는 JSON 및 Avro가 있습니다.
독립 실행형 모드와 관련된 중요한 구성 옵션은 다음과 같습니다.
- offset.storage.file.filename - 오프셋 데이터를 저장할 파일
여기에 구성된 매개변수는 구성, 오프셋 및 topic에 액세스하기 위해 Kafka Connect에서 사용하는 생산자 및 소비자를 위한 것입니다. Kafka 소스 작업에서 사용하는 생산자와 Kafka 싱크 작업에서 사용하는 소비자 구성의 경우 동일한 매개변수를 사용할 수 있지만 접두사 producer.및 consumer. 각각 구성하는 것이 좋습니다.
worker 구성에서 접두사 없이 상속되는 경우는 Kafka 클라이언트 매개변수는 bootstrap.servers 입니다. 예외는 연결을 허용하기 위해 추가 매개변수가 필요한 보안 클러스터입니다. 이러한 매개변수는 작업자 구성에서 관리 액세스에 대해 한 번, Kafka 소스에 대해 한 번, Kafka 싱크에 대해 한 번 등 최대 세 번 설정해야 합니다.
kafka connector에는 connector1.properties sample 파일이 없습니다. 아래 파일을 참고하여 접속하고자 하는 환경을 설정해줘야 합니다.
mysql DB connector.properties 예제
name=mysql-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=localhost
database.port=3306
database.user=root
database.password=****
database.server.name=appdb
database.histroy.kafka.bootstrap.servers=172.24.118.82:9092
database.history.kafka.topic=mysql_topic
include.schema.changes=true
의미 설명
- name=커넥터의 고유한 이름입니다.
- connector.class=커넥터의 자바 클래스명(mysql은 io.debezium.connector.mysql.MySqlConnector)
- database.hostname=MySQL 서버의 IP address 또는 hostname
- database.port=MySQL 서버 port
- database.user=데이터베이스 계정
- database.password=데이터베이스 계정의 패스워드
- database.server.name=모니터링되는 특정 MySQL 데이터베이스 서버/클러스터에 대한 네임스페이스를 식별하는 논리적 이름입니다. 논리적 이름은 이 커넥터에서 나오는 모든 Kafka topic 이름에 대한 접두사로 사용되므로 고유해야 합니다. 기본값은 host:port입니다. 여기서 host는 database.hostname 속성 값이고 port는 database.port 속성 값입니다. Confluent는 기본값을 의미 있는 이름으로 변경할 것을 권장합니다.
- database.include.list : 모니터링할 데이터베이스 이름(쉼표로 추가) 화이트리스트에 포함되지 않은 데이터베이스 이름은 모니터링에서 제외됩니다. 기본적으로 모든 데이터베이스가 모니터링됩니다.
- database.histroy.kafka.bootstrap.servers: Kafka 클러스터
- database.history.kafka.topic=커넥터가 데이터베이스 스키마 기록을 저장할 Kafka topic 이름입니다.
- include.schema.changes:커넥터가 데이터베이스 서버 ID와 이름이 같은 Kafka topic에 데이터베이스 스키마의 변경 사항을 게시해야 하는지의 여부를 지정하는 참/거짓 값입니다. 각 스키마 변경은 데이터베이스 이름을 포함하고 값에 DDL 문을 포함하는 키를 사용하여 기록됩니다. .
MySQL connector 의 상세 정보 참조 :
2.3.0부터 클라이언트 구성 재정의는 접두사를 사용하여 커넥터별로 개별적으로 구성할 수 있으며 producer.override.Kafka consumer.override.소스 또는 Kafka 싱크에 대해 각각 구성할 수 있습니다. 이러한 재정의는 나머지 커넥터 구성 속성에 포함됩니다.
나머지 매개변수는 커넥터 구성 파일입니다.
분산 모드는 작업의 균형을 조정하고 동적으로 확장(또는 축소)할 수 있으며 활성 작업과 구성 및 오프셋 커밋 데이터에 내결함성을 제공합니다. 실행은 독립 실행형 모드와 매우 유사합니다.
> bin/connect-distributed.sh config/connect-distributed.properties
차이점은 시작되는 클래스와 Kafka Connect 프로세스가 구성을 저장할 위치, 작업을 할당하는 방법, 오프셋 및 작업 상태를 저장할 위치를 결정하는 방법을 변경하는 구성 매개변수에 있습니다. 분산 모드에서 Kafka Connect는 오프셋, 구성 및 작업 상태를 Kafka topic에 저장합니다. 원하는 수의 파티션 및 복제 요소를 달성하기 위해 오프셋, 구성 및 상태에 대한 주제를 수동으로 생성하는 것이 좋습니다. Kafka Connect를 시작할 때 토픽이 아직 생성되지 않은 경우 기본 파티션 수와 복제 팩터를 사용하여 토픽이 자동 생성되며 이는 용도에 가장 적합하지 않을 수 있습니다.
특히 위에서 언급한 공통 설정 외에 다음 구성 매개변수는 클러스터를 시작하기 전에 설정하는 것이 중요합니다.
- group.id(기본값 connect-cluster) - Connect 클러스터 그룹을 형성하는 데 사용되는 클러스터의 고유 이름. 소비자 그룹 ID와 충돌해서는 안 됩니다.
- config.storage.topic(기본값 connect-configs) - 커넥터 및 작업 구성을 저장하는 데 사용할 주제입니다. 이것은 단일 파티션, 고도로 복제되고 압축된 주제여야 합니다. 자동 생성된 주제가 여러 파티션을 가질 수 있거나 압축이 아닌 삭제를 위해 자동으로 구성될 수 있으므로 올바른 구성을 보장하기 위해 주제를 수동으로 생성해야 할 수도 있습니다.
- offset.storage.topic(기본값 connect-offsets) - 오프셋을 저장하는 데 사용할 주제입니다. 이 항목에는 많은 파티션이 있어야 하고 복제되어야 하며 압축을 위해 구성되어야 합니다.
- status.storage.topic(기본값 connect-status) - 상태를 저장하는 데 사용할 주제입니다. 이 항목에는 여러 파티션이 있을 수 있으며 압축을 위해 복제 및 구성해야 합니다.
분산 모드에서 커넥터 구성은 명령줄에서 전달되지 않습니다. 대신 아래에 설명된 REST API를 사용하여 커넥터를 생성, 수정 및 파괴하십시오.
'Apache Kafka' 카테고리의 다른 글
카프카 ISR(In-Sync-Replicas) (0) | 2022.01.14 |
---|---|
ksql - 아파치 카프카 용어 - 수정 (0) | 2022.01.13 |
AWS MSK (0) | 2022.01.11 |
주키퍼 CLI (0) | 2022.01.09 |
아파치 주키퍼 설치 및 실행 (0) | 2022.01.09 |