도커로 ksqlDB 설치하기
1. 독립 실행형 ksqlDB 가져오기
ksqlDB는 Apache Kafka®에서 실행되므로 Kafka가 실행되고 있어야 합니다.
본 문서에서는 zookeeper, kafka broker는 설치되었다고 가정하고 ksqlDB 설치에 대해서만 설명합니다.
아래 내용으로 로컬 파일 시스템에 docker-compose.yml 파일을 생성합니다.
아래 docker-compose.yml 내용 중 KSQL_BOOTSTRAP_SERVERS: broker:9092 부분에서 broker는 각자의 bootstrap server의 IP로 변경합니다.
version은 링크 문서를 참조하여 각자 설치된 docker 엔진 버전에 따라서 설정합니다.(https://docs.docker.com/compose/compose-file/compose-file-v3/)
version: '3.8'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.23.1
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: 172.24.121.239:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.23.1
container_name: ksqldb-cli
entrypoint: /bin/sh
tty: true
참고 : 아래 docker-compose.yml 파일은 zookeeper, broker, ksqldb-server, ksqldb-cli 도커 서비스 전체를 생성하는 내용입니다. zookeeper와 broker가 설치되어 있지 않은 경우 아래 파일로 서비스를 생성하고 테스트를 하면 됩니다.
## 서비스 전체생성 (zookeeper, borker, ksqldb-server, ksqldb-cli 도커 생성)
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ksqldb-server:
image: confluentinc/ksqldb-server:0.23.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.23.1
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
2. ksqlDB 서버 시작
위에서 만든 docker-compose.yml 파일이 있는 디렉터리에서 아래 명령을 실행하여 서비스(ksqldb-server, ksqldb-cli)를 시작합니다. 서비스가 성공적으로 시작되면 ksqlDB 를 사용할 준비가 된 것입니다.
docker-compose 명령어는 기본적으로 docker-compose.yml 파일이나 docker-compose.yaml 파일을 읽습니다.
만일, 다른 파일명을 사용하려면 -f 옵션으로 파일명을 지정하면 됩니다.
아래 명령어는 foreground로 서비스를 실행합니다. 백그라운드로 실행하려면 -d 옵션을 추가하면 됩니다.
$ docker-compose up
$ docker-compose up -d
[+] Running 2/2
⠿ Container ksqldb-server Started
⠿ Container ksqldb-cli Started
docker 프로세스가 실행되었는지 docker ps 명령어로 확인합니다. STATUS 를 보면 'Up' 인 것을 확인할 수 있습니다.
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f6ffaf8b5195 confluentinc/ksqldb-cli:0.23.1 "/bin/sh" 6 minutes ago Up 15 seconds
ksqldb-cli
da8fe83bbfd4 confluentinc/ksqldb-server:0.23.1 "/usr/bin/docker/run" 32 minutes ago Up 6 minutes 0.0.0.0:8088->8088/tcp ksqldb-server
3. ksqlDB의 대화형 CLI 시작
이 명령을 실행하여 ksqlDB 서버에 연결하고 대화형 CLI(명령줄 인터페이스) 세션을 시작합니다.
오라클 DB 기준으로 sqlplus 명령어로 오라클 DB에 접속한 것과 동일합니다.
$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
위 명령어를 실행하면 ksqldb-server에 연결되고 CLI를 통해 sql을 수행할 수 있습니다. ksql> 프롬프트가 나타납니다.
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0
and will likely be removed in a future release.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2021 Confluent Inc.
CLI v0.23.1-rc9, Server v0.23.1-rc9 located at http://ksqldb-server:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql> version
Version: 0.23.1-rc9
snow topics 명령어로 kafka에 생성되어 있는 topic을 조회합니다.
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
default_ksql_processing_log | 1 | 1
---------------------------------------------------------------
kafka에서 my-kafka-topic 이름의 topic을 생성 및 조회해 봅니다.
## topic 생성
$ bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server 172.24.121.239:9092 --partitions 3 --replication-factor 2
## topic 조회
$ bin/kafka-topics.sh --list --bootstrap-server 172.24.121.239:9092
## 조회결과(kafka 내부 topic이 같이 조회될 수 있음)
default_ksql_processing_log
my-kafka-topic
ksql cli에서 topic을 조회해 봅니다. kafka에서 생성한 my-kafka-topic 이 조회됩니다.
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
default_ksql_processing_log | 1 | 1
my-kafka-topic | 3 | 2
---------------------------------------------------------------
4. 스트림 생성
스트림은 기본적으로 기본 Kafka topic과 스키마를 연결합니다. CREATE STREAM 문의 각 매개변수는 다음과 같습니다.
- kafka_topic - 스트림의 기반이 되는 Kafka topic의 이름입니다. topic이 존재하지 않는 경우에는 자동으로 생성하고, 이미 존재하는 주제에 대해서는 스트림이 생성됩니다.
- value_format - Kafka topic에 저장된 메시지의 인코딩입니다. JSON 인코딩의 경우 각 행은 키/값 형태인 JSON 객체로 저장됩니다. 예: {"profileId": "c2309eec", "위도": 37.7877, "경도": -122.4205}
- partitions - topic에 대해 생성할 파티션 수입니다 . 이미 존재하는 주제에는 이 매개변수가 필요하지 않습니다.
스트림
스트림은 일련의 사실을 나타내는 분할하여 관리할 수 있고, 변경이 불가능한 컬렉션입니다.
예를 들어, 스트림의 행은 "갑이 을에게 $100를 보냈습니다", "병이 을에게 $50를 보냈습니다"와 같은 일련의 금융 거래를 모델링할 수 있습니다.
행이 스트림에 삽입되면 절대 변경할 수 없습니다. 스트림 끝에 새 행을 추가할 수 있지만 기존 행은 업데이트하거나 삭제할 수 없습니다. 각 행은 특정 파티션에 저장됩니다. 모든 행에는 암시적이든 명시적이든 해당 ID를 나타내는 키가 있습니다. 동일한 키를 가진 모든 행은 동일한 파티션에 저장됩니다.
스트림을 생성하기 위해, 대화형 ksql CLI 세션에서 아래 명령문을 실행합니다.
ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
WITH (kafka_topic='locations', value_format='json', partitions=1);
Message
----------------
Stream created
----------------
5. 구체화된 뷰(Materialized View) 생성
구체화된 뷰를 사용하여 라이더의 최신 위치를 추적할 수도 있습니다. 이를 위해 이전에 생성된 스트림에 대해 SELECT 문을 실행하여 currentLocation 테이블을 생성합니다. 위치 데이터가 수신되면 테이블이 업데이트됩니다. 최신 위치를 표시하기 위해 집계 함수(LATEST_BY_OFFSET)를 사용할 수 있습니다. 구체화된 뷰도 하나의 topic로 볼 수 있습니다.
CREATE TABLE currentLocation AS
SELECT profileId,
LATEST_BY_OFFSET(latitude) AS la,
LATEST_BY_OFFSET(longitude) AS lo
FROM riderlocations
GROUP BY profileId
EMIT CHANGES;
Message
----------------------------------------------
Created query with ID CTAS_CURRENTLOCATION_3
----------------------------------------------
주어진 위치나 도시에서 라이더가 얼마나 멀리 떨어져 있는지를 캡처하는 파생 테이블(Table ridersNearMountainView)도 구체화된 뷰를 만들어 보겠습니다.
아래 SWL문을 복사하여 대화형 CLI 세션에 붙여넣고 Enter 키를 눌러 실행합니다.
CREATE TABLE ridersNearMountainView AS
SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
COLLECT_LIST(profileId) AS riders,
COUNT(*) AS count
FROM currentLocation
GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Message
-----------------------------------------------------
Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
-----------------------------------------------------
6. 스트림을 통해 푸시 쿼리 실행
이제 스트림에 대해 푸시 쿼리를 실행해 보겠습니다. 대화형 CLI 세션을 사용하여 주어진 쿼리를 실행합니다.
이 쿼리는 좌표가 Mountain View에서 5마일 이내에 있는 riderLocations 스트림의 모든 행을 출력합니다 .
ctrl + C 키를 누를 때까지 출력을 위해 대기 상태로 있습니다.
이벤트가 riderLocations 스트림에 기록될 때 데이터를 클라이언트로 푸시합니다. 즉 화면에 출력됩니다.
현재 쿼리를 실행 중인 CLI 세션을 그대로 둡니다. 쿼리가 데이터를 출력하도록 데이터를 riderLocations 스트림에 쓸 것입니다.
-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
7. 다른 CLI 세션 시작
위에서의 SELECT 쿼리 세션은 푸시 쿼리의 출력을 기다리는 중이므로, 다른 세션에서 데이터를 ksqlDB에 데이터를 기록해 보겠습니다. 다른 터미널 창을 열어 아래 명령어를 수행하여 ksqlDB에 접속합니다.
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
8. 이벤트로 스트림 채우기
새 CLI 세션에서 아래 INSERT 문을 실행하면서 (6)의 CLI SELECT 세션을 확인합니다.
푸시 쿼리는 riderLocations 스트림에 작성되는 즉시 행을 실시간으로 출력합니다. 조회 조건에 의해 3건이 출력됩니다.
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
(6)에서 수행한 화면에 출력되는 스트림 데이터입니다.
+--------------------------------------+--------------------------------------+--------------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+--------------------------------------+--------------------------------------+--------------------------------------+
|4ab5cbad |37.3952 |-122.0813 |
|8b6eae59 |37.3944 |-122.0813 |
|4a7c7b41 |37.4049 |-122.0822 |
9. 구체화된 뷰에 대해 pull 쿼리 실행
마지막으로 구체화된 뷰에 대해 풀 쿼리를 실행하여 현재 마운틴 뷰에서 10마일 이내에 있는 모든 라이더를 검색합니다.
지속적으로 실행되는 이전 푸시 쿼리와 달리 풀 쿼리는 구체화된 뷰에서 최신 결과를 보여줍니다.
ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
-- 실행결과
+--------------------------------------+--------------------------------------+--------------------------------------+
|DISTANCEINMILES |RIDERS |COUNT |
+--------------------------------------+--------------------------------------+--------------------------------------+
|0.0 |[4ab5cbad, 8b6eae59, 4a7c7b41] |3 |
|10.0 |[18f4ea86] |1 |
Query terminated
10. ksql 종료하기
10.1 KSQL CLI session 종료
KSQL CLI session 을 종료하려면 CTRL+D 를 누릅니다.
10.2 KSQL CLI push query 종료
KSQL CLI push query를 종료하려면 CTRL+C 를 누릅니다.
10.3 ksqlDB 및 ksql CLI 종료
docker-compose down 명령어로 ksqlDB 및 ksql CLI 를 종료하고 docker container를 삭제합니다.