requirement
- zookeeper
- apache kafka
- apache connect
standalone mode
파일 소스 커넥트
FileSource 커넥터는 파일에서 데이터를 읽고 이를 Apache Kafka®로 보냅니다. 모든 커넥터에 공통적인 구성 외에 file 과 topic 을 속성으로 사용합니다. 다음은 구성의 예입니다. 파일이름을 local-file-soruce.properties 로 저장합니다.
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=connect-test
source file connector를 생성합니다.
$ bin/connect-standalone.sh config/connect-standalone.properties local-file-source.properties
이 커넥터는 하나의 파일만 읽고 해당 파일 내의 데이터를 Kafka broker로 보냅니다. 그런 다음 추가된 데이터에 대해서만 파일을 감시합니다. 이미 Kafka로 전송된 파일 라인의 수정 사항은 처리되지 않습니다.
test.txt 파일에 임의의 데이터를 저장합니다. 아래는text.txt 파일 내용 예시입니다.
first message
seconde line
세번째 메시지
파일에 있는 데이터를 조회하기 위해 /kafka-console-consumer.sh 를 실행합니다.
bin/kafka-console-consumer.sh --bootstrap-server 172.24.118.82:9092 --topic connect-test --from-beginning
## Result
{"schema":{"type":"string","optional":false},"payload":"first message"}
{"schema":{"type":"string","optional":false},"payload":"seconde line"}
{"schema":{"type":"string","optional":false},"payload":"세번째 메시지"}
test.txt 파일에 데이터를 추가해 봅니다. consumer 화면에 메시지가 출력되는 것을 확인할 수 있습니다.
파일싱크 커넥터
FileSink 커넥터는 Kafka에서 데이터를 읽고 로컬 파일로 출력합니다. 다른 싱크 커넥터와 마찬가지로 여러 항목을 지정할 수 있습니다. FileSink 커넥터는 모든 커넥터에 공통적인 구성 외에 속성 file만 사용합니다 . 다음은 구성의 예입니다.
파일이름을 local-file-sink.properties 로 저장합니다.
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=connect-test
파일에 데이터를 저장하기 위해 sink connector를 실행한 후 test.sink.txt 파일을 열어보면 test.txt 파일의 내용이 저장된 것을 확인할 수 있습니다.
$ bin/connect-standalone.sh config/connect-standalone.properties local-file-sink.properties
## Result
$ cat /temp/test.sink.txt
first message
seconde line
세번째 메시지
standalone mode에서 리스너 port는 default가 8083입니다. file-source와 file-sink connector를 동시에 실행하면 port 충돌이 발생합니다. connect-standalone.properties 파일을 복사하여 새로운 onnect-standalone_new.properties 파일을 생성한 후 listeners=HTTP://:18083 처럼 다른 port를 사용하도록 설정합니다.
분산 모드
분산 모드에서 Kafka 커넥터는 각 이벤트 스레드가 처리하는 작업 수의 균형을 자동으로 조정합니다. 사용자는 작업을 동적으로 늘리거나 줄일 수 있으며 작업 실행, 구성 수정 및 오프셋 제출 시 내결함성이 보장될 수 있습니다.
분산 모드에서 Kafka 커넥터는 오프셋, 구성 및 작업 상태를 Kafka Topic에 저장합니다(독립 실행형 모드에서는 로컬 파일에 보관됨). 필요에 따라 테마의 파티션 및 복사본 수를 설정할 수 있도록 오프셋을 저장할 테마를 수동으로 생성하는 것이 좋습니다.
분산 모드에서 Kafka 커넥터의 구성 파일은 명령줄을 사용할 수 없습니다.
REST API는 Kafka broker를 생성, 수정 및 제거하는 데 사용합니다.
분산 connect 생성
$ bin/connect-distributed.sh config/connect-distributed.properties
...........
[2022-02-04 01:08:12,549] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:303)
[2022-02-04 01:08:12,549] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2022-02-04 01:08:12,789] INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1716)
'Apache Kafka' 카테고리의 다른 글
kafka connector - nginx access log 연결 (0) | 2022.01.23 |
---|---|
kafkacat (0) | 2022.01.23 |
[Kafka Connect] Connector Rest API (0) | 2022.01.23 |
카프카 미러메이커2 (0) | 2022.01.14 |
카프카 ISR(In-Sync-Replicas) (0) | 2022.01.14 |