FileSource 커넥터는 파일에서 데이터를 읽고 이를 Apache Kafka®로 보냅니다. 모든 커넥터에 공통적인 구성 외에 file 과 topic 을 속성으로 사용합니다. 다음은 구성의 예입니다. 파일이름을 local-file-soruce.properties 로 저장합니다.
파일에 데이터를 저장하기 위해 sink connector를 실행한 후 test.sink.txt 파일을 열어보면 test.txt 파일의 내용이 저장된 것을 확인할 수 있습니다.
$ bin/connect-standalone.sh config/connect-standalone.properties local-file-sink.properties
## Result
$ cat /temp/test.sink.txt
first message
seconde line
세번째 메시지
standalone mode에서 리스너 port는 default가 8083입니다. file-source와 file-sink connector를 동시에 실행하면 port 충돌이 발생합니다. connect-standalone.properties 파일을 복사하여 새로운 onnect-standalone_new.properties 파일을 생성한 후 listeners=HTTP://:18083 처럼 다른 port를 사용하도록 설정합니다.
분산 모드
분산 모드에서 Kafka 커넥터는 각 이벤트 스레드가 처리하는 작업 수의 균형을 자동으로 조정합니다. 사용자는 작업을 동적으로 늘리거나 줄일 수 있으며 작업 실행, 구성 수정 및 오프셋 제출 시 내결함성이 보장될 수 있습니다.
분산 모드에서 Kafka 커넥터는 오프셋, 구성 및 작업 상태를 Kafka Topic에 저장합니다(독립 실행형 모드에서는 로컬 파일에 보관됨). 필요에 따라 테마의 파티션 및 복사본 수를 설정할 수 있도록 오프셋을 저장할 테마를 수동으로 생성하는 것이 좋습니다.
분산 모드에서 Kafka 커넥터의 구성 파일은 명령줄을 사용할 수 없습니다.
REST API는 Kafka broker를 생성, 수정 및 제거하는 데 사용합니다.
분산 connect 생성
$ bin/connect-distributed.sh config/connect-distributed.properties
...........
[2022-02-04 01:08:12,549] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:303)
[2022-02-04 01:08:12,549] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
[2022-02-04 01:08:12,789] INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1716)
kcat(이전의 kafkacat)은 Apache Kafka® 배포를 테스트 및 디버그하는 데 사용할 수 있는 명령줄 유틸리티입니다.
kcat을 사용하여 Kafka topic에 대해 produce, consume 및 list 할 수 있습니다. "netcat for Kafka"로 설명되는 이 도구는 Kafka에서 데이터를 검사하고 생성하기 위한 도구입니다.
kafkacat 설치
sudo apt install kafkacat
Consumer Mode
Consumer 모드에서 kcat은 주제 및 파티션에서 메시지를 읽고 출력합니다.
Kafka 브로커( -b)와 topic( -t)를 지정해야 합니다.
kcat에 브로커( -b) 및 topic( -t)에 대한 내용을 볼 수 있습니다.
kafkacat -b localhost:9092 -t mysql_users
% Auto-selecting Consumer mode (use -P or -C to override)
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
[...]
Producer Mode
In producer mode, kcat reads messages from standard input (stdin). You must specify a Kafka broker (-b) and topic (-t). You can optionally specify a delimiter (-D). The default delimiter is newline.
You can easily send data to a topic using kcat. Run it with the -P command and enter the data you want, and then press Ctrl-D to finish:
Producer 모드에서 kcat은 표준 입력(stdin)에서 메시지를 읽습니다. Kafka 브로커( -b)와 주제( -t)를 지정해야 합니다.
kcat을 사용하여 주제에 데이터를 보낼 수 있습니다. -P 명령 으로 실행하고원하는 데이터를 입력한 다음 Ctrl-D 를 눌러 완료합니다.
kafkacat -b localhost:9092 -t new_topic -P
test
Metadata Listing Mode
메타데이터 목록 모드( -L)에서 kcat은 Kafka 클러스터와 주제, 파티션, 복제본 및 동기화 복제본(ISR)의 현재 상태를 표시합니다.
kafkacat -b localhost:9092 -L
JSON( -J) 옵션을 추가하여 출력을 JSON으로 내보냅니다.
kafkacat -b mybroker -L -J
kcat example code
kcat의 Hello World 예제는 kcat: Apache Kafka® 예제를 참조하세요. 모든 예에는 온프레미스 또는 Confluent Cloud에서 실행되는 모든 Kafka 클러스터에 연결할 수 있는 생산자 및 소비자가 포함됩니다.
ISR은 kafka 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 말하는데, 리더 파티션과 팔로워 파티션에 대해 간단하게 알아보도록 하자.
리더 파티션과 팔로워 파티션
Kafka에서는 복제 계수라는 것이 존재하는데, 이는 Hadoop에서 사용하는 복제 계수와 비슷한 개념으로 데이터의 유실을 방지 하기 위해 사용되는 개념이다.
복제 계수가 2인 kafka topic은 리더 파티션 1개와 팔로워 파티션이 1개 생성이 되며, 데이터를 전달받았을 때 각각 아래의 역할을 수행한다.
리더 파티션
리더 파티션은 producer로부터 데이터를 전달받아 저장하는 작업을 진행
팔로워 파티션
복제 계수를 맞추기 위해 리더 파티션으로부터 데이터를 전달받아 동기화
그림을 통해 리더파티션, 팔로워 파티션, ISR 상태에 대해서 알아보자.
먼저 아래의 그림은 producer에서 kafka로 데이터를 보내고 리더 파티션이 데이터를 팔로워 파티션으로 복제하는 그림을 나타낸 것이다. 리더 파티션에는 4번째 데이터까지 존재하고, 팔로워 파티션에는 3번까지의 데이터가 존재하는 그림으로 복제 하는 과정에서 딜레이가 발생할 수 있으며, 항상 같은 데이터를 가지고 있는 것이 아니다.
다음으로 ISR 상태에 대해서 알아보자
위의 그림과 다르게 리더 파티션의 데이터와 팔로워 파티션에 있는 데이터가 모두 동일한 것을 알 수 있는데 이를 ISR 상태라고 한다.
ISR 관련 설정
그렇다면 ISR을 이용해 뭘 할 수 있을까?
리더 파티션에 문제가 생기는 경우 새로운 리더를 선출해야하는데 ISR상태의 파티션이 리더가 될 것인지, 아닌 파티션도 리더 파티션이 될 수 있는지에 대한 옵션을 설정할 수 있다. kafka document의 내용을 살펴보자.
unclean.leader.election.enable 옵션을 false로 설정하는 경우 ISR이 아닌 팔로워 파티션은 리더가 되지 않으며, ISR이 전혀 이루어지지 않은 경우에는 리더 파티션이 포함된 브로커가 다시 정상 동작을 할 때까지 기다리게 되며, 서비스에는 영향이 생길 수 있으나, 데이터의 유실은 발생하지 않을 것이다.
반대로 true로 설정하는 경우 ISR이 아닌 팔로워 파티션도 리더가 될 수 있는데, 이 경우 데이터는 일부 유실될 수 있지만 서비스에는 영향을 미치지 않게 된다.
ksqlDB는 Apache Kafka®에서 스트림 처리를 위해 특별히 구축된 데이터베이스입니다. 이 섹션에서는 ksqlDB를 효과적으로 사용하기 위해 필요한 최소한의 Kafka 개념에 대해 설명합니다. 자세한 내용은 공식 Apache Kafka 설명서 를 참조하십시오 .
Record
Kafka에서 데이터의 기본 단위는 이벤트입니다. 이벤트는 특정 시점에 세계에서 일어난 일을 모델링합니다. Kafka에서는 레코드라고 하는 데이터 구성을 사용하여 각 이벤트를 나타냅니다. 레코드에는 키, 값, 타임스탬프, 주제, 파티션, 오프셋 및 헤더와 같은 몇 가지 다른 종류의 데이터가 들어 있습니다.
레코드의 키는 이벤트의 ID를 나타내는 임의의 데이터입니다. 이벤트가 웹 페이지의 클릭인 경우 키는 클릭을 수행한 사용자의 ID일 수 있습니다.
값은 기본 데이터를 나타내는 데이터의 부분입니다. 클릭 이벤트의 값에는 이벤트가 발생한 페이지, 클릭된 DOM 요소 및 기타 정보가 포함될 수 있습니다.
타임 스탬프는 이벤트가 일어 났을 때입니다. 추적할 수 있는 몇 가지 다른 "종류" 시간이 있습니다.
주제 와 파티션이 특정 이벤트에 속하고 큰 수집 및 이벤트의 하위 집합을 설명 오프셋 (아래 그에 더) 그 큰 컬렉션을 내 정확한 위치를 설명합니다.
마지막으로 헤더는 레코드에 대한 임의의 사용자 제공 메타데이터를 전달합니다.
ksqlDB는 이러한 정보 중 일부를 추상화하므로 이에 대해 생각할 필요가 없습니다. 다른 것들은 직접 노출되며 프로그래밍 모델의 필수적인 부분입니다. 예를 들어 ksqlDB에서 데이터의 기본 단위는 행 입니다. 행은 Kafka 레코드에 대한 유용한 추상화입니다. 행에는 키 열과 값 열의 두 가지 열이 있습니다. 그들은 또한 timestamp.
일반적으로 ksqlDB는 상위 수준 프로그래밍 모델에 기여하지 않는 Kafka 수준 구현 세부 정보를 올리는 것을 피합니다.
주제는 명명된 레코드 모음입니다. 그들의 목적은 상호 관심사의 이벤트를 함께 개최할 수 있도록 하는 것입니다. 일련의 클릭 기록은 "클릭" 주제에 저장되어 한 곳에서 모두 액세스할 수 있습니다. 주제는 추가 전용입니다. 주제에 레코드를 추가하면 개별적으로 변경하거나 삭제할 수 없습니다.
어떤 종류의 레코드를 주제에 넣을 수 있는지에 대한 규칙은 없습니다. 그들은 같은 구조를 따를 필요가 없고, 같은 상황과 관련되거나 이와 유사한 것이 필요하지 않습니다. 주제에 대한 게시를 관리하는 방법은 전적으로 사용자 규칙 및 시행의 문제입니다.
ksqlDB는 스트림 과 테이블을 통해 주제에 대해 더 높은 수준의 추상화를 제공합니다 . 스트림 또는 테이블은 스키마를 Kafka 주제와 연결합니다. 스키마는 주제에 저장할 수 있는 레코드의 모양을 제어합니다. 이러한 종류의 정적 입력은 주제에 어떤 종류의 행이 있는지 이해하기 쉽게 하고 일반적으로 이를 처리하는 프로그램에서 실수를 줄이는 데 도움이 됩니다.
레코드가 주제에 배치되면 특정 파티션에 배치됩니다. 파티션은 오프셋별로 완전히 정렬된 레코드 시퀀스입니다. 주제에는 스토리지 및 처리의 확장성을 높이기 위해 여러 파티션이 있을 수 있습니다. 주제를 생성할 때 얼마나 많은 파티션이 있는지 선택합니다.
주제에 레코드를 추가할 때 파티션 전략은 레코드가 저장될 파티션을 선택합니다. 많은 파티션 전략이 있습니다. 가장 일반적인 방법은 전체 파티션 수에 대해 레코드 키의 내용을 해시하는 것입니다. 이는 동일한 ID를 가진 모든 레코드를 동일한 파티션에 배치하는 효과가 있으며, 이는 강력한 순서 보장 때문에 유용합니다.
레코드의 순서는 레코드가 추가될 때 설정되는 오프셋이라는 데이터에 의해 추적됩니다. 오프셋이 10인 레코드는 오프셋이 20인 동일한 파티션의 레코드보다 먼저 발생했습니다 .
여기에서 대부분의 역학은 사용자를 대신하여 ksqlDB에서 자동으로 처리됩니다. 스트림 또는 테이블을 생성할 때 확장성을 제어할 수 있도록 기본 주제에 대한 파티션 수를 선택합니다. 스키마를 선언할 때 키의 일부인 열과 값의 일부인 열을 선택합니다. 이 외에도 개별 파티션이나 오프셋에 대해 생각할 필요가 없습니다. 여기에 몇 가지 예가 있습니다.
레코드가 처리되면 키 내용이 해시되어 새 다운스트림 파티션이 동일한 키를 가진 다른 모든 레코드와 일치하게 됩니다. 레코드가 추가되면 오류나 오류가 있는 경우에도 올바른 오프셋 순서를 따릅니다. 쿼리가 행을 처리하려는 방식으로 인해 스트림의 키 콘텐츠가 변경되면( GROUP BY또는 를 통해 PARTITION BY) 기본 레코드 키가 다시 계산되고 레코드가 계산을 수행하기 위해 설정된 새 주제의 새 파티션으로 전송됩니다.
생산자와 소비자는 주제로 또는 주제에서 레코드의 이동을 용이하게 합니다. 애플리케이션이 레코드를 게시하거나 구독하려고 할 때 API(일반적으로 클라이언트 라고 함 )를 호출하여 그렇게 합니다. 클라이언트는 구조화된 네트워크 프로토콜을 통해 브로커(아래 참조)와 통신합니다.
소비자는 주제에서 레코드를 읽을 때 절대 삭제하거나 변경하지 않습니다. 동일한 정보를 반복적으로 읽을 수 있는 이 패턴은 충돌하지 않는 방식으로 동일한 데이터 세트에 대해 여러 애플리케이션을 구축하는 데 도움이 됩니다. 또한 애플리케이션이 이벤트 스트림을 되감고 이전 정보를 다시 읽을 수 있는 "재생"을 지원하기 위한 기본 빌딩 블록입니다.
생산자와 소비자는 상당히 낮은 수준의 API를 노출합니다. 고유한 레코드를 구성하고, 스키마를 관리하고, 직렬화를 구성하고, 어디로 보내는지 처리해야 합니다.
ksqlDB는 상위 수준의 지속적인 생산자 및 소비자 역할을 합니다. 레코드의 모양을 선언한 다음 데이터를 채우고, 변경하고, 쿼리하는 방법을 설명하는 고급 SQL 명령을 실행하기만 하면 됩니다. 이러한 SQL 프로그램은 세부사항을 처리하는 저수준 클라이언트 API 호출로 변환됩니다.
브로커는 주제에 대한 액세스를 저장하고 관리하는 서버입니다. 여러 브로커가 함께 클러스터링되어 고가용성 내결함성 방식으로 주제를 복제할 수 있습니다. 클라이언트는 브로커와 통신하여 레코드를 읽고 씁니다.
ksqlDB 서버 또는 클러스터를 실행할 때 각 노드는 Kafka 브로커와 통신하여 처리를 수행합니다. Kafka 브로커의 관점에서 각 ksqlDB 서버는 클라이언트와 같습니다. 브로커에서 처리가 수행되지 않습니다. ksqlDB의 서버는 자체 노드에서 모든 계산을 수행합니다.
모든 문제에 완벽하게 맞는 데이터 형식은 없기 때문에 Kafka는 레코드의 키 및 값 부분에 있는 데이터 내용에 대해 불가지론적으로 설계되었습니다. 레코드가 클라이언트에서 브로커로 이동할 때 사용자 페이로드(키 및 값)는 바이트 배열로 변환되어야 합니다. 이를 통해 Kafka는 불투명한 일련의 바이트가 무엇인지 알 필요 없이 작업할 수 있습니다. 레코드가 소비자에게 전달될 때 해당 바이트 배열은 응용 프로그램에 의미 있는 원래 주제로 다시 변환되어야 합니다. 바이트 표현으로 변환하거나 변환하는 프로세스를 각각 직렬화 및 역직렬화 라고 합니다.
생산자가 주제에 레코드를 보낼 때 키와 값을 바이트 배열로 변환하는 데 사용할 직렬 변환기를 결정해야 합니다. 키 및 값 직렬 변환기는 독립적으로 선택됩니다. 소비자가 레코드를 받으면 바이트 배열을 원래 값으로 다시 변환하는 데 사용할 역직렬 변환기를 결정해야 합니다. 직렬 변환기와 역직렬 변환기는 쌍으로 제공됩니다. 다른 디시리얼라이저를 사용하면 바이트 내용을 이해할 수 없습니다.
ksqlDB는 직렬화의 추상화를 상당히 높입니다. 직렬 변환기를 수동으로 구성하는 대신 스트림/테이블 생성 시 구성 옵션을 사용하여 형식을 선언합니다. 어떤 주제가 어떤 방식으로 직렬화되었는지 추적하는 대신 ksqlDB는 각 스트림 및 테이블의 바이트 표현에 대한 메타데이터를 유지 관리합니다. 소비자는 올바른 deserializer를 사용하도록 자동으로 구성됩니다.
Kafka로 직렬화된 레코드는 불투명 바이트이지만 레코드를 처리할 수 있도록 하려면 구조에 대한 몇 가지 규칙이 있어야 합니다. 이 구조의 한 측면은 데이터의 모양과 필드를 정의하는 데이터 스키마입니다. 정수인가요? foo, bar, 및 키가 있는 맵 baz입니까? 다른 것?
시행 메커니즘이 없으면 스키마가 암시적입니다. 소비자는 어떻게든 생산된 데이터의 형태를 알아야 합니다. 종종 이것은 한 그룹의 사람들이 스키마에 구두로 동의하도록 함으로써 발생합니다. 그러나 이 접근 방식은 오류가 발생하기 쉽습니다. 스키마를 중앙에서 관리하고 감사하며 프로그래밍 방식으로 시행할 수 있는 경우가 더 좋습니다.
Kafka 외부 프로젝트인 Confluent Schema Registry 는 스키마 관리를 돕습니다. 스키마 레지스트리를 사용하면 생산자가 주제를 스키마에 등록할 수 있으므로 추가 데이터가 생성될 때 스키마를 준수하지 않으면 거부됩니다. 소비자는 스키마 레지스트리를 참조하여 모르는 주제에 대한 스키마를 찾을 수 있습니다.
생산자, 소비자 및 스키마 구성을 하나로 묶는 대신 ksqlDB는 스키마 레지스트리와 투명하게 통합됩니다. 두 시스템이 서로 통신할 수 있도록 구성 옵션을 활성화하여 ksqlDB는 모든 스트림 및 테이블 스키마를 스키마 레지스트리에 저장합니다. 그런 다음 이러한 스키마를 다운로드하여 ksqlDB 데이터로 작업하는 모든 애플리케이션에서 사용할 수 있습니다. 또한 ksqlDB는 기존 주제의 스키마를 자동으로 유추할 수 있으므로 스트림이나 테이블을 정의할 때 구조를 선언할 필요가 없습니다.
소비자 프로그램이 부팅되면 여러 소비자가 들어갈 수 있는 소비자 그룹에 등록됩니다 . 레코드를 사용할 수 있을 때마다 그룹의 정확히 한 소비자가 레코드를 읽습니다. 이는 일련의 프로세스가 레코드 소비를 조정하고 로드 밸런싱할 수 있는 방법을 효과적으로 제공합니다.
단일 주제의 레코드는 그룹의 한 프로세스에서 사용하도록 되어 있으므로 구독의 각 파티션은 한 번에 한 소비자만 읽습니다. 각 소비자가 담당하는 파티션 수는 총 소스 파티션 수를 소비자 수로 나눈 값으로 정의됩니다. 소비자가 그룹에 동적으로 가입하면 소유권이 다시 계산되고 파티션이 다시 할당됩니다. 소비자가 그룹을 떠나면 동일한 계산이 수행됩니다.
ksqlDB는 이 강력한 로드 밸런싱 프리미티브를 기반으로 합니다. ksqlDB 서버 클러스터에 퍼시스턴트 쿼리를 배포하면 소스 파티션 수에 따라 클러스터 전체에 워크로드가 분산된다. 이 모든 것이 자동으로 수행되기 때문에 그룹 구성원 자격을 명시적으로 관리할 필요가 없습니다.
예를 들어 소스 파티션이 10개인 영구 쿼리를 노드가 2개인 ksqlDB 클러스터에 배포하는 경우 각 노드는 5개의 파티션을 처리합니다. 서버를 잃어버리면 남은 유일한 서버가 자동으로 재조정되어 10개를 모두 처리합니다. 이제 4개의 서버를 더 추가하면 각각이 2개의 파티션을 처리하도록 재조정됩니다.
일정 기간 후에 오래된 레코드를 정리하는 것이 종종 바람직합니다. 보존과 압축은 이를 위한 두 가지 다른 옵션입니다. 둘 다 선택 사항이며 함께 사용할 수 있습니다.
보존은 레코드가 삭제되기 전에 저장되는 기간을 정의합니다. 보존은 주제의 레코드를 삭제하는 유일한 방법 중 하나입니다. 이 매개변수는 이벤트 스트림을 재생할 수 있는 시간 범위를 정의하기 때문에 스트림 처리에서 특히 중요합니다. 재생은 버그를 수정하거나, 새 애플리케이션을 빌드하거나, 일부 기존 논리를 백테스트하는 경우에 유용합니다.
ksqlDB를 사용하면 기본 스트림 및 테이블의 기본 주제 보존을 직접 제어할 수 있으므로 개념을 이해하는 것이 중요합니다. 자세한 내용 은 Kafka 문서의 주제 및 로그를 참조하십시오 .
대조적으로 압축은 키당 최신 레코드를 제외한 모든 레코드를 주기적으로 삭제하는 각 Kafka 브로커의 백그라운드에서 실행되는 프로세스입니다. 이는 선택적인 옵트인 프로세스입니다. 압축은 레코드가 상태 부분에 대한 일종의 업데이트를 나타내고 최신 업데이트가 결국 중요한 유일한 업데이트일 때 특히 유용합니다.
ksqlDB는 구체화된 테이블을 지원하는 기본 변경 로그를 지원하기 위해 압축을 직접 활용합니다. 이를 통해 ksqlDB는 장애 조치 시 테이블을 재구축하는 데 필요한 최소한의 정보를 저장할 수 있습니다. 자세한 내용 은 Kafka 문서의 로그 압축을 참조하세요 .
Kafka Connect는 Apache Kafka®와 다른 데이터 시스템 간에 데이터를 스트리밍하기 위한 도구입니다. Kafka Connect는 데이터베이스 데이터를 수집하거나 모든 애플리케이션 서버에서 Kafka topic 데이터를 수집하여 스트림 처리할 수 있도록 합니다. 내보내기 커넥터는 Kafka topic의 데이터를 Elasticsearch나 Hadoop시스템 등으로 전달할 수 있습니다.
kafka connect는 connector와 task로 구성되어 있습니다.
Kafka Connect 작동 방식
Kafka Connect에는 두 가지 유형의 커넥터가 포함되어 있습니다.
소스 커넥터 – 데이터베이스를 수집하고 Kafka topic에 대한 테이블 업데이트를 스트리밍합니다. 또한 소스 커넥터는 모든 애플리케이션 서버에서 메트릭을 수집하고 이를 Kafka topic에 저장할 수 있습니다.
싱크 커넥터 – Kafka topic의 데이터를 Elasticsearch나 Hadoop과 같은 시스템으로 전달합니다.
사용자 환경에 Kafka Connect를 배포하려면 Kafka Connect 사용 방법 - 시작하기 를 참조하십시오 .
독립 실행형 모드와 분산형 모드
커넥터는 프로세스로 실행됩니다. 이를 worker라고 합니다. worker를 실행하는 데는 독립형모드와 분산형모드가 있습니다.
Kafka Connect worker는 공유 시스템에서 실행할 수 있는 JVM 프로세스입니다.
독립 실행형 모드 는 로컬 시스템에서 Kafka Connect를 개발하고 테스트하는 데 유용합니다. 일반적으로 단일 에이전트를 사용하는 환경(예: 웹 서버 로그를 Kafka로 전송)에도 사용할 수 있습니다.
독립 실행형 worker 상태는 로컬 파일 시스템에 저장됩니다.
분산형 모드 는 여러 시스템(노드)에서 Connect worker를 실행합니다. Connect 클러스터를 구성합니다. Kafka Connect는 클러스터 전체에 커넥터를 배포합니다. 필요에 따라 노드를 추가하거나 제거할 수 있습니다.
노드가 예기치 않게 클러스터에서 제거되는 경우 Kafka Connect는 해당 노드의 작업을 클러스터의 다른 노드에 자동으로 배포합니다. Kafka Connect는 복제되는 Kafka 클러스터 내부에 커넥터 구성, 상태 및 오프셋 정보를 저장하기 때문에 Connect worker를 실행하는 노드가 손실되더라도 데이터가 손실되지 않습니다.
분산형 worker는 모든 상태를 Kafka에 저장합니다.
Kafka와 mariaDB 연결하기
1. database 및 user 생성
mariadb 는 설치가 되었다는 가정에서 설명합니다. mariDB 설치는 설치문서를 참고하세요.
kafka connector와 연동할 database, user 그리고 table을 생성합니다.
$ mysql -u root -p
-- database 생성
MariaDB [mysql]> create database kafkadb;
-- user 생성
MariaDB [mysql]> CREATE USER 'kafka'@'%' IDENTIFIED BY 'kafka123';
-- 생성된 계정에 데이터베이스 ALL 권한 부여
MariaDB [mysql]> GRANT ALL PRIVILEGES ON kafkadb.* TO 'kafka'@'%' IDENTIFIED BY 'kafka123';
-- 권한 적용
MariaDB [mysql]> FLUSH PRIVILEGES;
kafka connector에서 사용할 table을 생성합니다.
$ mysql -u kafka -p
-- For timestamp+incrementing TEST
MariaDB [mysql]> CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY,
created DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;
-- For timestamp TEST
MariaDB [mysql]> CREATE TABLE products (
seq INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
created DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;
-- For incrementing TEST
MariaDB [mysql]> CREATE TABLE orders (
seq INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
product VARCHAR(255) NOT NULL
) ;
-- For bulk TEST
MariaDB [mysql]> create table bulk_tab(
name VARCHAR(255) ,
log VARCHAR(255)
) ;
두 번째 매개변수 connector1.properties 에 mariaDB에 대한 접속 정보를 기술합니다. 여러 개의 connector를 생성할 수 있습니다. 여러 개의 connector을 생성하려면 아래와 같이 connector.properties 파일을 나열하면 됩니다.
* timestamp+incrementing mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.
여기에서 물음표(?) 는 kafka 내부에서 관리하는 offset입니다.
첫번째 물음표는 서버의 현재시간을 나타내고 두번째 물음표는 offset을 나타냅니다.
incrementing 데이터를 입력할 경우 modified 날짜가 현재시간보다는 작고 offset 날짜와는 같아야 합니다.
조건이 까다로우므로 timestamp+incrementing modes는 조심해서 적용해야 합니다. timestamp와 incrementing 은 각각 적용하는 것이 관리가 용이합니다.
Begin using SQL query: SELECT * FROM `kafkadb`.`customers`
WHERE `kafkadb`.`customers`.`modified` < ?
AND ((`kafkadb`.`customers`.`modified` = ? AND `kafkadb`.`customers`.`id` > ?) OR `kafkadb`.`customers`.`modified` > ?)
ORDER BY `kafkadb`.`customers`.`modified`,`kafkadb`.`customers`.`id` ASC
1.2 consumer 실행
위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.
위의 예제에서 topic.prefix=mysql- 이고 테이블은 customers 이므로 mysql-customers 라는 이름으로 topic이 생성됩니다.
아래는 mysql-customers topic를 조회하는 예제입니다. mariaDB에 새로운 데이터를 입력하거나 modified 컬럼이 수정되면 consumer에 메시지를 보냅니다.
mariaDB에 접속하여 아래와 같이 테이블에 데이터를 입력합니다. 데이터 입력 시 consumer 창에 데이터가 출력되는 것을 확인할 수 있습니다.
INSERT INTO customers(name, email) values('jyp','ypjeong@kafka.com');
INSERT INTO customers(name, email) values('jyh','yhjeon@kafka.com');
INSERT INTO customers(name, email) values('kky','kykim@kafka.com');
2) mysql DB connector2.properties 예제
imestamp mode의 connector를 생성하는 properties 파일 내용입니다.
Found offset {{table=products}=null, {protocol=1, table=kafkadb.products}={timestamp_nanos=0, timestamp=1642904785000}} for partition {protocol=1, table=kafkadb.products}
첫번째 물음표는 offset을 나타내고 두번째 물음표는 서버의 현재시간을 나타냅니다.
만일, timestamp mode가 적용된 컬럼에 현재시간보다 큰 날짜값이 들어온다면 그 시간 차이만큼 데이터는 전송되지 않습니다. 물론, offset 날짜보다 작은 값으로 udpate하면 데이터를 가져올 수 없습니다.
SELECT * FROM `kafkadb`.`products`
WHERE `kafkadb`.`products`.`modified` > ? -- offset 날짜
AND `kafkadb`.`products`.`modified` < ? -- kafka connector 현재 날짜
ORDER BY `kafkadb`.`products`.`modified` ASC
2.2 consumer 실행
위의 connector에서 생성한 topic의 메시지들을 streaming으로 받아옵니다. connector 생성 시 topic은 topic.prefix 에서 정의한 prefix명과 테이블명으로 구성됩니다.
위의 예제에서 topic.prefix=mysql- 이고 테이블은 products 이므로 mysql-porducts 라는 이름으로 topic이 생성됩니다.
아래는 mysql-porducts topic을 조회하는 예제입니다. mariaDB products 테이블의 modified 컬럼이 수정되면 consumer에 메시지를 보냅니다.
mariaDB에 접속하여 아래와 같이 테이블에 데이터를 입력합니다. 데이터 입력 시 consumer 창에 데이터가 출력되는 것을 확인할 수 있습니다.
INSERT INTO orders(name, product) values('jyp', 'computer');
INSERT INTO orders(name, product) values('jyp', 'mouse');
INSERT INTO orders(name, product) values('jyp', 'keyboard');
4) bulk mode
bulk mode를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.
bulk 모드에서 커넥터는 테이블 전체 데이터를 주기적으로 쿼리합니다. 따라서 원본 테이블에 100,000행인 경우 커넥터는 데이터베이스의 새 행 또는 오래된 행 수에 관계없이 모든 데이터를 폴링하고 Apache Kafka topic에 100,000 개의 새 메시지를 삽입합니다.
예제는 생략합니다.
SELECT * FROM `kafkadb`.`bulk_tab`
5) multi task
multi task 를 적용할 경우 kafka 내부에서 동작하는 SQL문입니다.
multi task는 테이블에 동일한 컬럼명일 경우에 적용하면 multi-task로 작동합니다.예제에서는 2개의 다른 테이블이 동일한 컬럼명(id)을 가지고 있고 id 컬럼에 incrementing mode를 적용하여 multi-task로 수행하는 예제입니다. 컬럼명이 동일하고 데이터 추출 mode가 동일하면 multi-task로 수행할 수 있습니다.예제는생략합니다.
[2022-01-23 20:06:54,884] INFO [mysql-multi-incre-source|task-0]
Begin using SQL query:
SELECT *
FROM `kafkadb`.`multitab1`
WHERE `kafkadb`.`multitab1`.`id` > ?
ORDER BY `kafkadb`.`multitab1`.`id` ASC
[2022-01-23 20:06:54,884] INFO [mysql-multi-incre-source|task-1]
Begin using SQL query:
SELECT *
FROM `kafkadb`.`multitab2`
WHERE `kafkadb`.`multitab2`.`id` > ?
ORDER BY `kafkadb`.`multitab2`.`id` ASC
database.history.kafka.topic=커넥터가 데이터베이스 스키마 기록을 저장할 Kafka topic 이름입니다.
include.schema.changes:커넥터가 데이터베이스 서버 ID와 이름이 같은 Kafka topic에 데이터베이스 스키마의 변경 사항을 게시해야 하는지의 여부를 지정하는 참/거짓 값입니다. 각 스키마 변경은 데이터베이스 이름을 포함하고 값에 DDL 문을 포함하는 키를 사용하여 기록됩니다. .
스키마 레지스트리 카프카 connect의 필수 서비스가 아니지만, 카프카 데이터 형식으로 avoro, Protobuf 및 JSON 스키마를 사용할 수 있습니다.
독립 실행형 모드와 분산형 모드
커넥터는 프로세스로 실행됩니다. 이를 worker라고 합니다. worker를 실행하는 데는 독립형모드와 분산형모드가 있습니다.
독립 실행형 모드 는 로컬 시스템에서 Kafka Connect를 개발하고 테스트하는 데 유용합니다. 일반적으로 단일 에이전트를 사용하는 환경(예: 웹 서버 로그를 Kafka로 전송)에도 사용할 수 있습니다.
분산 모드 는 여러 시스템(노드)에서 Connect worker를 실행합니다. Connect 클러스터를 구성합니다. Kafka Connect는 클러스터 전체에 커넥터를 배포합니다. 필요에 따라 노드를 추가하거나 제거할 수 있습니다.
노드가 예기치 않게 클러스터에서 제거되는 경우 Kafka Connect는 해당 노드의 작업을 클러스터의 다른 노드에 자동으로 배포합니다. 또한 Kafka Connect는 안전하게 복제되는 Kafka 클러스터 내부에 커넥터 구성, 상태 및 오프셋 정보를 저장하기 때문에 Connect worker를 실행하는 노드가 손실되더라도 데이터가 손실되지 않습니다.
Connect 플러그인 설치
Kafka Connect는 확장 가능하도록 설계되어 개발자가 사용자 지정 커넥터, 변환 또는 변환기를 만들고 사용자가 이를 설치하고 실행할 수 있습니다. Kafka Connect 플러그인은 하나 JAR 파일입니다.
Kafka Connect는 plugin.path에 정의된 플러그인 경로를 사용하여 플러그인을 찾습니다.
카프카 홈디렉토리 밑에 conf 디렉토리의 connect-standalone.properties 파일에 plugin.path를 설정합니다.
콤마(,)로 분리하여 여러 경로를 설정할 수 있습니다.
아래는 worker 환경설정의 예입니다.
plugin.path=/mnt/d/share/kafka/plugins
플러그인을 설치하려면 플러그인 경로에 JAR를 배치하거나, JAR파일이 포함된 디렉토리의 절대 경로를 plugin.path에 추가합니다.
위의 예제에서는 Connect를 실행하는 시스템에 /mnt/d/share/kafka/plugins 디렉토리를 만들고 JAR를 배치합니다.
Connect worker를 시작하면 각 worker는 플러그인 경로에 있는 모든 커넥터, 변환 및 변환기 플러그인을 검색합니다. 커넥터, 변환 또는 변환기를 사용할 때 Connect worker는 먼저 해당 플러그인에서 클래스를 로드한 다음 Kafka Connect 런타임 및 Java 라이브러리를 로드합니다.
ZIP 파일 내용을 추출하고 내용을 원하는 위치에 복사합니다. 예를 들어 라는 이름의 디렉토리 plugin.path를 만든 다음 커넥터 플러그인 내용을 복사합니다.
Connect 속성 파일의 플러그인 경로에 추가합니다. 예를 들어, plugin.path=/usr/local/share/kafka/plugins. Kafka Connect는 플러그인 경로를 사용하여 플러그인을 찾습니다. 플러그인 경로는 Kafka Connect의 작업자 구성 에 정의된 쉼표로 구분된 디렉토리 목록입니다 .
해당 구성으로 Connect 작업자를 시작하십시오. Connect는 해당 플러그인 내에 정의된 모든 커넥터를 검색합니다.
Connect가 실행 중인 각 시스템에 대해 이 단계를 반복합니다. 각 커넥터는 각 작업자에서 사용할 수 있어야 합니다.
Worker 구성 및 실행
독립 실행형 모드 또는 분산 모드에서 worker를 실행하는 방법에 대해 알아봅니다.
독립 실행형 모드
독립 실행형 모드는 일반적으로 개발 및 테스트 또는 경량의 단일 에이전트 환경(예: 웹 서버 로그를 Kafka로 전송)에 사용됩니다. 다음은 독립 실행형 모드에서 worker를 시작하는 예제 입니다. kafka bin 디렉토리에 connect-standalone.sh 파일이 존재합니다.
첫 번째 매개변수( connect-standalone.properties)는 worker 구성 속성 파일 입니다. Kafka 클러스터 및 직렬화 형식과 같은 설정을 제어할 수 있습니다.
두 번째 매개변수( connector1.properties)는 커넥터 구성 속성 파일입니다. 모든 커넥터에는 worker와 함께 로드되는 구성 속성이 있습니다. 예제와 같이 이 명령어로 여러 커넥터를 시작할 수 있습니다.
동일한 호스트 시스템에서 여러 독립 실행형 작업자를 실행하는 경우 다음 두 구성 속성은 각 worker에 대해 고유해야 합니다.
offset.storage.file.filename: 커넥터 오프셋의 저장 파일 이름입니다. 이 파일은 독립 실행형 모드에서 로컬 파일 시스템에 저장됩니다. 두 작업자에 대해 동일한 파일 이름을 사용하면 오프셋 데이터가 삭제되거나 다른 값으로 덮어쓰여집니다.
listeners: REST API가 수신하는 URI 목록 형식 protocol://host:port,protocol2://host2:port- 프로토콜은 HTTP 또는 HTTPS입니다. 0.0.0.0모든 인터페이스에 바인딩하려면 호스트 이름을 지정하거나 기본 인터페이스에 바인딩하려면 호스트 이름을 비워 둘 수 있습니다.
첫 번째 매개변수는 작업자에 대한 구성입니다. 여기에는 Kafka 연결 매개변수, 직렬화 형식 및 오프셋 커밋 빈도와 같은 설정이 포함됩니다. 모든 worker(독립 실행형 및 분산형 모두)에는 몇 가지 구성이 필요합니다.
bootstrap.servers - Kafka에 대한 연결을 부트스트랩하는 데 사용되는 Kafka 서버 목록
key.converter- Kafka Connect 형식과 Kafka에 작성된 직렬화된 형식 간에 변환하는 데 사용되는 변환기 클래스입니다. 이것은 Kafka에서 읽거나 쓰는 메시지의 키 형식을 제어하며 이는 커넥터와 독립적이므로 모든 커넥터가 모든 직렬화 형식과 함께 작동할 수 있습니다. 일반적인 형식의 예로는 JSON 및 Avro가 있습니다.
value.converter- Kafka Connect 형식과 Kafka에 작성된 직렬화된 형식 간에 변환하는 데 사용되는 변환기 클래스입니다. 이것은 Kafka에서 읽거나 쓰는 메시지의 값 형식을 제어하며 이것은 커넥터와 독립적이므로 모든 커넥터가 모든 직렬화 형식과 함께 작동할 수 있습니다. 일반적인 형식의 예로는 JSON 및 Avro가 있습니다.
독립 실행형 모드와 관련된 중요한 구성 옵션은 다음과 같습니다.
offset.storage.file.filename - 오프셋 데이터를 저장할 파일
여기에 구성된 매개변수는 구성, 오프셋 및 topic에 액세스하기 위해 Kafka Connect에서 사용하는 생산자 및 소비자를 위한 것입니다. Kafka 소스 작업에서 사용하는 생산자와 Kafka 싱크 작업에서 사용하는 소비자 구성의 경우 동일한 매개변수를 사용할 수 있지만 접두사 producer.및 consumer. 각각 구성하는 것이 좋습니다.
worker 구성에서 접두사 없이 상속되는 경우는 Kafka 클라이언트 매개변수는 bootstrap.servers 입니다. 예외는 연결을 허용하기 위해 추가 매개변수가 필요한 보안 클러스터입니다. 이러한 매개변수는 작업자 구성에서 관리 액세스에 대해 한 번, Kafka 소스에 대해 한 번, Kafka 싱크에 대해 한 번 등 최대 세 번 설정해야 합니다.
kafka connector에는 connector1.properties sample 파일이 없습니다. 아래 파일을 참고하여 접속하고자 하는 환경을 설정해줘야 합니다.
database.hostname=MySQL 서버의 IP address 또는 hostname
database.port=MySQL 서버 port
database.user=데이터베이스 계정
database.password=데이터베이스 계정의 패스워드
database.server.name=모니터링되는 특정 MySQL 데이터베이스 서버/클러스터에 대한 네임스페이스를 식별하는 논리적 이름입니다. 논리적 이름은 이 커넥터에서 나오는 모든 Kafka topic 이름에 대한 접두사로 사용되므로 고유해야 합니다. 기본값은 host:port입니다. 여기서 host는 database.hostname 속성 값이고 port는 database.port 속성 값입니다. Confluent는 기본값을 의미 있는 이름으로 변경할 것을 권장합니다.
database.include.list : 모니터링할 데이터베이스 이름(쉼표로 추가) 화이트리스트에 포함되지 않은 데이터베이스 이름은 모니터링에서 제외됩니다. 기본적으로 모든 데이터베이스가 모니터링됩니다.
database.history.kafka.topic=커넥터가 데이터베이스 스키마 기록을 저장할 Kafka topic 이름입니다.
include.schema.changes:커넥터가 데이터베이스 서버 ID와 이름이 같은 Kafka topic에 데이터베이스 스키마의 변경 사항을 게시해야 하는지의 여부를 지정하는 참/거짓 값입니다. 각 스키마 변경은 데이터베이스 이름을 포함하고 값에 DDL 문을 포함하는 키를 사용하여 기록됩니다. .
2.3.0부터 클라이언트 구성 재정의는 접두사를 사용하여 커넥터별로 개별적으로 구성할 수 있으며 producer.override.Kafka consumer.override.소스 또는 Kafka 싱크에 대해 각각 구성할 수 있습니다. 이러한 재정의는 나머지 커넥터 구성 속성에 포함됩니다.
나머지 매개변수는 커넥터 구성 파일입니다.
분산 모드는 작업의 균형을 조정하고 동적으로 확장(또는 축소)할 수 있으며 활성 작업과 구성 및 오프셋 커밋 데이터에 내결함성을 제공합니다. 실행은 독립 실행형 모드와 매우 유사합니다.
차이점은 시작되는 클래스와 Kafka Connect 프로세스가 구성을 저장할 위치, 작업을 할당하는 방법, 오프셋 및 작업 상태를 저장할 위치를 결정하는 방법을 변경하는 구성 매개변수에 있습니다. 분산 모드에서 Kafka Connect는 오프셋, 구성 및 작업 상태를 Kafka topic에 저장합니다. 원하는 수의 파티션 및 복제 요소를 달성하기 위해 오프셋, 구성 및 상태에 대한 주제를 수동으로 생성하는 것이 좋습니다. Kafka Connect를 시작할 때 토픽이 아직 생성되지 않은 경우 기본 파티션 수와 복제 팩터를 사용하여 토픽이 자동 생성되며 이는 용도에 가장 적합하지 않을 수 있습니다.
특히 위에서 언급한 공통 설정 외에 다음 구성 매개변수는 클러스터를 시작하기 전에 설정하는 것이 중요합니다.
group.id(기본값 connect-cluster) - Connect 클러스터 그룹을 형성하는 데 사용되는 클러스터의 고유 이름. 소비자 그룹 ID와 충돌해서는 안 됩니다.
config.storage.topic(기본값 connect-configs) - 커넥터 및 작업 구성을 저장하는 데 사용할 주제입니다. 이것은 단일 파티션, 고도로 복제되고 압축된 주제여야 합니다. 자동 생성된 주제가 여러 파티션을 가질 수 있거나 압축이 아닌 삭제를 위해 자동으로 구성될 수 있으므로 올바른 구성을 보장하기 위해 주제를 수동으로 생성해야 할 수도 있습니다.
offset.storage.topic(기본값 connect-offsets) - 오프셋을 저장하는 데 사용할 주제입니다. 이 항목에는 많은 파티션이 있어야 하고 복제되어야 하며 압축을 위해 구성되어야 합니다.
status.storage.topic(기본값 connect-status) - 상태를 저장하는 데 사용할 주제입니다. 이 항목에는 여러 파티션이 있을 수 있으며 압축을 위해 복제 및 구성해야 합니다.
분산 모드에서 커넥터 구성은 명령줄에서 전달되지 않습니다. 대신 아래에 설명된 REST API를 사용하여 커넥터를 생성, 수정 및 파괴하십시오.