구체화된 뷰

모든 데이터베이스에서 테이블의 주요 목적 중 하나는 데이터에 대한 효율적인 쿼리를 가능하게 하는 것입니다.

ksqlDB는 Key-Value 모델을 사용하여 Apache Kafka®에 이벤트를 변경 불가능하게 저장합니다.

이 모델에서 쿼리를 좀 더 효율적으로 활용할 수 있도록 하는 것이 구체화된 뷰(Materialized Views)입니다.

Oracle 또는 MS SQLServer DBMS에서 제공하는 Materialized Views 와 유사합니다.

스트림/테이블 이중성

스트림과 테이블은 밀접하게 관련되어 있습니다. 스트림은 테이블을 파생시키는 일련의 이벤트입니다. 

예를 들어, 대출 신청자의 신용 점수는 시간이 지남에 따라 변경될 수 있습니다. 변경된 일련의 신용 점수는 스트림입니다. 이 데이터는 신청자의 현재 신용 점수를 설명하는 데이터로 활용할 수 있습니다.

현재 신용 점수를 나타내는 데이터는 신용 점수 변경 데이터의 결과입니다. 

기존 데이터베이스에는 redo log, bin-log, transaction log 등이 있지만 변경사항을 따로 저장하지 않는 경우 변경 사항을 조회하는 것은 쉽지 않습니다. 리두 로그는 Kafka 변경 로그보다 보존 기간이 짧을 수 있습니다. 압축된 Kafka 변경 로그는 데이터베이스 스냅샷과 동일합니다. 쿼리로 변경 사항만 조회할 수 있습니다.

구체화된 뷰

구체화된 뷰 의 이점은 전체 테이블 데이터를 조회하는 대신 변경 사항의 최종 결과만 조회할 수 있습니다.

집계 함수를 적용할 수 있습니다. 새 이벤트 생성되면 구체화된 뷰는 집계 기능으로 최종 집계 데이터를 만듭니다. 이런 식으로 새 이벤트가 도착할 때 구체화된 뷰는 "전체를 다시 계산"하지 않고 새로운 이벤트만을 계산합니다.

즉, 구체화된 뷰를 쿼리하면 효율적으로 집계된 데이터를 조회할 수 있습니다.

 

ksqlDB에서 테이블은 구체화된 뷰로 구현되지 않을 수 있습니다. Kafka 토픽에 바로 테이블을 생성하면 구체화되지 않습니다. 구체화되지 않은 테이블은 매우 비효율적입니다. 반면에 테이블이 다른 컬렉션에서 파생된 경우 ksqlDB는 결과를 구체화하고 이에 대해 쿼리를 만들 수 있습니다.

 

ksqlDB는 각 테이블의 두 구성 요소를 모두 저장하여 스트림/테이블 이중성 개념을 활용합니다. 테이블의 현재 상태는 RocksDB 를 사용하여 특정 서버에 로컬로 임시 저장됩니다 . 테이블에 적용된 일련의 변경 사항은 Kafka 주제에 영구적으로 저장되며 Kafka 브로커 전체에 복제됩니다. 테이블 구체화가 있는 ksqlDB 서버가 실패하면 새 서버가 Kafka 변경 로그에서 테이블을 다시 구체화합니다.

 

 

'ksqlDB' 카테고리의 다른 글

ksql - 테이블  (0) 2022.01.13
ksql - Streams  (0) 2022.01.12
ksqlDB - stream vs table  (0) 2022.01.12
ksqlDB - Stream Processing  (0) 2022.01.12
도커로 ksqlDB 설치하기  (0) 2022.01.11

스트림과 테이블이란 무엇입니까?

스트림과 테이블은 모두 연속 데이터가 있는 Kafka topic의 래퍼입니다. 

 

스트림은 세상에서 발생하는 이벤트를 캡처한 데이터를 나타내며 다음과 같은 특징이 있습니다.

  • 무제한 : 데이터의 끝없는 연속 흐름을 저장하므로 스트림은 제한이 없습니다.
  • Immutable : 들어오는 모든 새 데이터는 현재 스트림에 추가하고 기존 레코드를 수정하지 않으며, 변경할 수 없습니다.

테이블은 키를 사용하여 데이터 저장 또는 해당 이벤트 스트림의 구체화된 뷰를 나타내며 다음과 같은 특징을 가지고 있습니다.

  • 제한 : 스트림의 스냅샷을 나타냅니다.
  • 변경 가능 :
    테이블에 동일한 키를 가진 데이터가 들어오는 경우 새 데이터(<Key, Value> 쌍)를 테이블에 추가됩니다. 동일한 키의 데이터가 존재하면 해당 키에 대한 최신 값으로 변경됩니다.

참고: Kafka 주제의 모든 레코드는 키와 값의 쌍으로 표시됩니다. 따라서 테이블에는 항상 주어진 키의 최신값이 표시됩니다.

 

[ 스트림과 테이블에 대한 설명  예제  ]

거래 및 계정잔액을 활용하여 자금 이체를 관리하려는 은행 시스템을 가정합니다.

 

Alice와 Bob 두 사용자의 초기 잔액이 각각 200$와 100$라고 가정합니다. 

다음은 이 두 사용자 사이에서 발생하는 일련의 자금이체 트랜잭션입니다.

  • 거래 1: 앨리스는 밥에게 100$를 줍니다.
  • 거래 2: 밥은 앨리스에게 50$를 줍니다.
  • 거래 3: 밥은 앨리스에게 100$를 줍니다.

 

여기서 스트림은 한 계정에서 다른 계정으로 이체된 돈을 기록하는 거래 이벤트의 변경 불가능한 데이터를 나타냅니다.

반면에 테이블은 사용자당 계정의 최신 상태를 반영합니다. 예를 들어 테이블은 현재 잔액을 저장합니다.

테이블은 계정의 최신 상태를 저장하고, 스트림은 트랜잭션 레코드를 저장합니다.

 

스트림과 테이블의 이중성

스트림 및 테이블은 밀접한 관계를 가지고 있습니다.

 

스트리밍

스트림은 시간 경과에 따른 스트림의 변경된 데이터를 테이블에 적용하므로 테이블의 트랜잭션 로그로 생각할 수 있습니다.

 

테이블

테이블은 특정 시점에서 스트림의 각 키에 대한 최신 값의 스냅샷으로 간주될 수 있습니다. 시간 경과에 따른 테이블 변경 사항을 추적하면  스트림이 생성됩니다.

 

데이터의 논리적 순서를 Stream 으로 표현할 수 있습니다.

 

 

[ 테이블 vs 스트림 데이터 처리 비교 ]

Events Stream Table
First event with key KEYA arrives Insert Insert
Another event with key KEYA arrives Insert Update
Event with key KEYA and value null arrives Insert Delete
Event with key null arrives Insert <ignored>

 

 

'ksqlDB' 카테고리의 다른 글

ksql - Streams  (0) 2022.01.12
ksql - Materialized Views  (0) 2022.01.12
ksqlDB - Stream Processing  (0) 2022.01.12
도커로 ksqlDB 설치하기  (0) 2022.01.11
ksqlDB - Events  (0) 2022.01.11

스트림 처리

 Apache Kafka® 주제에 대해 스키마를 사용하여 컬렉션을 만드는 것은 유용하지만, 애플리케이션을 만드는 것 자체로는 유용성이 제한됩니다. 컬렉션을 선언하면 현재 형식의 이벤트만 사용할 수 있습니다. 그러나 스트리밍 응용 프로그램을 만드는 데 중요한 부분은 이벤트를 변환, 필터링, 결합 및 집계하는 것입니다.

ksqlDB에서는 기존 컬렉션에서 새 컬렉션을 파생하고 컬렉션 간의 변경 사항을 설명하여 이벤트를 조작합니다. 컬렉션이 새 이벤트로 업데이트되면 ksqlDB는 컬렉션에서 파생된 컬렉션을 실시간으로 업데이트합니다. 

 

ksqlDB에서 스트림 처리 SELECT은 기존 컬렉션에 대한 명령문을 사용하여 새 컬렉션을 생성하는 것 입니다. 내부의 결과는 SELECT선언된 외부 컬렉션으로 전달됩니다. ksqlDB는 내부 SELECT문 에서 열 이름과 유형을 유추하기 때문에 새 컬렉션을 파생할 때 스키마를 선언할 필요가 없습니다 . ROWTIME 컬럼은 카프카에 기록된 데이터의 타임 스탬프를 정의하고, ROWPARTITION및 ROWOFFSET 컬럼은 각각의 파티션을 정의하고, 소스 레코드의 오프셋을 정의합니다.

다음은 서로 다른 컬렉션 유형 간에 파생되는 몇 가지 예입니다.

 

기존 스트림에서 새 스트림 파생

아래와 같은 스트림이 있다고 가정합니다.

CREATE STREAM rock_songs (artist VARCHAR, title VARCHAR)
    WITH (kafka_topic='rock_songs', partitions=2, value_format='json');

 

컬럼의 모든 스트림을 대문자로 변환된 새 스트림으로 파생할 수 있습니다.

CREATE STREAM title_cased_songs AS
    SELECT artist, UCASE(title) AS capitalized_song
    FROM rock_songs
    EMIT CHANGES;

새로운 스트림이 rock_songs topic에 삽입될 때마다 대문자로 변환된 스트림이 title_cased_songs 스트림에 추가됩니다 .

 

기존 스트림에서 새 테이블 파생

다음과 같은 테이블과 스트림이 있다고 가정합니다.

CREATE TABLE products (product_name VARCHAR PRIMARY KEY, cost DOUBLE)
    WITH (kafka_topic='products', partitions=1, value_format='json');

CREATE STREAM orders (product_name VARCHAR KEY)
    WITH (kafka_topic='orders', partitions=1, value_format='json');


orders 스트림과 products 테이블을 조인하여 데이터를 집계하는 orders 테이블을 생성할 수 있습니다.

CREATE TABLE order_metrics AS
    SELECT p.product_name, COUNT(*) AS count, SUM(p.cost) AS revenue
    FROM orders o JOIN products p ON p.product_name = o.product_name
    GROUP BY p.product_name EMIT CHANGES;

이 집계 테이블은 각 제품의 수익금액과 주문수를 저장합니다.

 

기존 테이블에서 새 테이블 파생

다음의 집계 테이블이 있다고 가정합니다.

CREATE TABLE page_view_metrics AS
    SELECT url, location_id, COUNT(*) AS count
    FROM page_views GROUP BY url EMIT CHANGES;


테이블에서 행을 필터링하여 다른 테이블을 생성할 수 있습니다.

CREATE TABLE page_view_metrics_mountain_view AS
    SELECT url, count FROM page_view_metrics
    WHERE location_id = 42 EMIT CHANGES;

 

여러 스트림에서 새 스트림 파생

다음 두 스트림이 있습니다.

CREATE STREAM impressions (user VARCHAR KEY, impression_id BIGINT, url VARCHAR)
    WITH (kafka_topic='impressions', partitions=1, value_format='json');

CREATE STREAM clicks (user VARCHAR KEY, url VARCHAR)
    WITH (kafka_topic='clicks', partitions=1, value_format='json');

 

초기 광고 노출 후 1분 이내에 주어진 url이 클릭되었음을 나타내는 파생 스트림을 만들 수 있습니다 .

CREATE STREAM clicked_impressions AS
    SELECT * FROM impressions i JOIN clicks c WITHIN 1 minute ON i.user = c.user
    WHERE i.url = c.url
    EMIT CHANGES;

impressions 에 스트림이 수신되고 1분 이내에 동일한 행이 clicks에 수신 될 때마다 clicked_impressions 스트림으로 전송됩니다.

'ksqlDB' 카테고리의 다른 글

ksql - Materialized Views  (0) 2022.01.12
ksqlDB - stream vs table  (0) 2022.01.12
도커로 ksqlDB 설치하기  (0) 2022.01.11
ksqlDB - Events  (0) 2022.01.11
ksqlDB  (0) 2022.01.11

 

1. 독립 실행형 ksqlDB 가져오기

ksqlDB는 Apache Kafka®에서 실행되므로  Kafka가 실행되고 있어야 합니다. 

본 문서에서는 zookeeper, kafka broker는 설치되었다고 가정하고 ksqlDB 설치에 대해서만 설명합니다.

아래 내용으로 로컬 파일 시스템에 docker-compose.yml 파일을 생성합니다.

아래 docker-compose.yml 내용 중 KSQL_BOOTSTRAP_SERVERS: broker:9092 부분에서 broker는 각자의 bootstrap server의 IP로 변경합니다.

version은 링크 문서를 참조하여 각자 설치된 docker 엔진 버전에 따라서 설정합니다.(https://docs.docker.com/compose/compose-file/compose-file-v3/)

version: '3.8'
services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.23.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: 172.24.121.239:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.23.1
    container_name: ksqldb-cli
    entrypoint: /bin/sh
    tty: true

 

참고 : 아래 docker-compose.yml 파일은 zookeeper, broker, ksqldb-server, ksqldb-cli 도커 서비스 전체를 생성하는 내용입니다. zookeeper와 broker가 설치되어 있지 않은 경우 아래 파일로 서비스를 생성하고 테스트를 하면 됩니다.

## 서비스 전체생성 (zookeeper, borker, ksqldb-server, ksqldb-cli 도커 생성)
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.23.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.23.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

 

 

2. ksqlDB 서버 시작

위에서 만든 docker-compose.yml 파일이 있는 디렉터리에서 아래 명령을 실행하여 서비스(ksqldb-server, ksqldb-cli)를 시작합니다. 서비스가 성공적으로 시작되면 ksqlDB 를 사용할 준비가 된 것입니다.

docker-compose 명령어는 기본적으로 docker-compose.yml 파일이나 docker-compose.yaml 파일을 읽습니다.

만일, 다른 파일명을 사용하려면 -f 옵션으로 파일명을 지정하면 됩니다.

아래 명령어는 foreground로 서비스를 실행합니다. 백그라운드로 실행하려면 -d 옵션을 추가하면 됩니다.

$ docker-compose up
백그라운드로 실행하려면 아래와 같이 -d 옵션을 추가하면 됩니다. 본 문서에서는 백그라운드로 실행합니다.
$ docker-compose up -d

[+] Running 2/2
 ⠿ Container ksqldb-server  Started    
 ⠿ Container ksqldb-cli     Started

docker 프로세스가 실행되었는지 docker ps 명령어로 확인합니다. STATUS 를 보면 'Up' 인 것을 확인할 수 있습니다.

$ docker ps
CONTAINER ID   IMAGE                               COMMAND                  CREATED          STATUS             PORTS                    NAMES
f6ffaf8b5195   confluentinc/ksqldb-cli:0.23.1      "/bin/sh"                6 minutes ago    Up 15 seconds
                 ksqldb-cli
da8fe83bbfd4   confluentinc/ksqldb-server:0.23.1   "/usr/bin/docker/run"    32 minutes ago   Up 6 minutes       0.0.0.0:8088->8088/tcp   ksqldb-server

 

3. ksqlDB의 대화형 CLI 시작

이 명령을 실행하여 ksqlDB 서버에 연결하고 대화형 CLI(명령줄 인터페이스) 세션을 시작합니다. 

오라클 DB 기준으로 sqlplus 명령어로 오라클 DB에 접속한 것과 동일합니다.

$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

위 명령어를 실행하면 ksqldb-server에 연결되고 CLI를 통해 sql을 수행할 수 있습니다. ksql> 프롬프트가 나타납니다.

OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 
and will likely be removed in a future release.
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================
Copyright 2017-2021 Confluent Inc.

CLI v0.23.1-rc9, Server v0.23.1-rc9 located at http://ksqldb-server:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> version
Version: 0.23.1-rc9

 

snow topics 명령어로 kafka에 생성되어 있는 topic을 조회합니다. 
ksql> show topics;

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 default_ksql_processing_log | 1          | 1
---------------------------------------------------------------

 

kafka에서 my-kafka-topic 이름의 topic을 생성 및 조회해 봅니다.

## topic 생성
$ bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server 172.24.121.239:9092 --partitions 3 --replication-factor 2

## topic 조회 
$ bin/kafka-topics.sh --list --bootstrap-server 172.24.121.239:9092

## 조회결과(kafka 내부 topic이 같이 조회될 수 있음)
default_ksql_processing_log
my-kafka-topic

 

ksql cli에서 topic을 조회해 봅니다. kafka에서 생성한 my-kafka-topic 이 조회됩니다.

ksql> show topics;

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 default_ksql_processing_log | 1          | 1
 my-kafka-topic              | 3          | 2
---------------------------------------------------------------

 

  4. 스트림 생성

 스트림은 기본적으로 기본 Kafka topic과 스키마를 연결합니다. CREATE STREAM 문의 각 매개변수는 다음과 같습니다.

  • kafka_topic - 스트림의 기반이 되는 Kafka topic의 이름입니다. topic이 존재하지 않는 경우에는 자동으로 생성하고, 이미 존재하는 주제에 대해서는 스트림이 생성됩니다.
  • value_format - Kafka topic에 저장된 메시지의 인코딩입니다. JSON 인코딩의 경우 각 행은 키/값 형태인 JSON 객체로 저장됩니다. 예: {"profileId": "c2309eec", "위도": 37.7877, "경도": -122.4205}
  • partitions - topic에 대해 생성할 파티션 수입니다 . 이미 존재하는 주제에는 이 매개변수가 필요하지 않습니다.

 

스트림

스트림은 일련의 사실을 나타내는 분할하여 관리할 수 있고, 변경이 불가능한 컬렉션입니다. 

예를 들어, 스트림의 행은 "갑이 을에게 $100를 보냈습니다", "병이 을에게 $50를 보냈습니다"와 같은 일련의 금융 거래를 모델링할 수 있습니다.

행이 스트림에 삽입되면 절대 변경할 수 없습니다. 스트림 끝에 새 행을 추가할 수 있지만 기존 행은 업데이트하거나 삭제할 수 없습니다. 각 행은 특정 파티션에 저장됩니다. 모든 행에는 암시적이든 명시적이든 해당 ID를 나타내는 키가 있습니다. 동일한 키를 가진 모든 행은 동일한 파티션에 저장됩니다.



스트림을 생성하기 위해, 대화형  ksql CLI 세션에서 아래 명령문을 실행합니다.

ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
  WITH (kafka_topic='locations', value_format='json', partitions=1);
  
   Message
----------------
 Stream created
----------------

 

5. 구체화된 뷰(Materialized View) 생성

구체화된 뷰를 사용하여 라이더의 최신 위치를 추적할 수도 있습니다. 이를 위해 이전에 생성된 스트림에 대해 SELECT 문을 실행하여 currentLocation 테이블을 생성합니다. 위치 데이터가 수신되면 테이블이 업데이트됩니다. 최신 위치를 표시하기 위해 집계 함수(LATEST_BY_OFFSET)를 사용할 수 있습니다. 구체화된 뷰도 하나의 topic로 볼 수 있습니다.

 

테이블 및 구체화된 뷰에 대한 자세한 내용은 ksql 문서를 확인하세요.

CREATE TABLE currentLocation AS
  SELECT profileId,
         LATEST_BY_OFFSET(latitude) AS la,
         LATEST_BY_OFFSET(longitude) AS lo
  FROM riderlocations
  GROUP BY profileId
  EMIT CHANGES;
  
   Message
----------------------------------------------
 Created query with ID CTAS_CURRENTLOCATION_3
----------------------------------------------

 

주어진 위치나 도시에서 라이더가 얼마나 멀리 떨어져 있는지를 캡처하는 파생 테이블(Table ridersNearMountainView)도 구체화된 뷰를 만들어 보겠습니다.

아래 SWL문을 복사하여 대화형 CLI 세션에 붙여넣고 Enter 키를 눌러 실행합니다.

CREATE TABLE ridersNearMountainView AS
  SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
         COLLECT_LIST(profileId) AS riders,
         COUNT(*) AS count
  FROM currentLocation
  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
  
   Message
-----------------------------------------------------
 Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
-----------------------------------------------------

 

6. 스트림을 통해 푸시 쿼리 실행

이제 스트림에 대해 푸시 쿼리를 실행해 보겠습니다. 대화형 CLI 세션을 사용하여 주어진 쿼리를 실행합니다.
이 쿼리는 좌표가 Mountain View에서 5마일 이내에 있는 riderLocations 스트림의 모든 행을 출력합니다 .

ctrl + C 키를 누를 때까지 출력을 위해 대기 상태로 있습니다.

이벤트가 riderLocations 스트림에 기록될 때 데이터를 클라이언트로 푸시합니다. 즉 화면에 출력됩니다.

현재 쿼리를 실행 중인 CLI 세션을 그대로 둡니다. 쿼리가 데이터를 출력하도록 데이터를 riderLocations 스트림에 쓸 것입니다.

-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;

 

7. 다른 CLI 세션 시작

위에서의 SELECT 쿼리 세션은 푸시 쿼리의 출력을 기다리는 중이므로, 다른 세션에서 데이터를 ksqlDB에 데이터를 기록해 보겠습니다. 다른 터미널 창을 열어 아래 명령어를 수행하여 ksqlDB에 접속합니다.

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

 

8. 이벤트로 스트림 채우기

새 CLI 세션에서 아래 INSERT 문을 실행하면서 (6)의 CLI SELECT 세션을 확인합니다.
푸시 쿼리는 riderLocations 스트림에 작성되는 즉시 행을 실시간으로 출력합니다. 조회 조건에 의해 3건이 출력됩니다.

INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);

(6)에서 수행한 화면에 출력되는 스트림 데이터입니다.

+--------------------------------------+--------------------------------------+--------------------------------------+
|PROFILEID                             |LATITUDE                              |LONGITUDE                             |
+--------------------------------------+--------------------------------------+--------------------------------------+
|4ab5cbad                              |37.3952                               |-122.0813                             |
|8b6eae59                              |37.3944                               |-122.0813                             |
|4a7c7b41                              |37.4049                               |-122.0822                             |

 

9. 구체화된 뷰에 대해  pull 쿼리 실행

마지막으로 구체화된 뷰에 대해 풀 쿼리를 실행하여 현재 마운틴 뷰에서 10마일 이내에 있는 모든 라이더를 검색합니다.
지속적으로 실행되는 이전 푸시 쿼리와 달리  쿼리는 구체화된 뷰에서 최신 결과를 보여줍니다.

ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;

-- 실행결과
+--------------------------------------+--------------------------------------+--------------------------------------+
|DISTANCEINMILES                       |RIDERS                                |COUNT                                 |
+--------------------------------------+--------------------------------------+--------------------------------------+
|0.0                                   |[4ab5cbad, 8b6eae59, 4a7c7b41]        |3                                     |
|10.0                                  |[18f4ea86]                            |1                                     |
Query terminated
 

10. ksql 종료하기

10.1 KSQL CLI session 종료

KSQL CLI session 을 종료하려면 CTRL+D 를 누릅니다.

 

10.2 KSQL CLI push query 종료

KSQL CLI push query를 종료하려면 CTRL+C 를 누릅니다.

 

10.3 ksqlDB 및  ksql CLI 종료

docker-compose down 명령어로 ksqlDB 및  ksql CLI 를 종료하고 docker container를 삭제합니다.

 

'ksqlDB' 카테고리의 다른 글

ksql - Materialized Views  (0) 2022.01.12
ksqlDB - stream vs table  (0) 2022.01.12
ksqlDB - Stream Processing  (0) 2022.01.12
ksqlDB - Events  (0) 2022.01.11
ksqlDB  (0) 2022.01.11

이벤트란?

ksqlDB는 스트림 처리 애플리케이션을 위해 만들어진 데이터베이스입니다. 스트림 처리의 주요 초점은 이벤트의 무제한 스트림에 대한 계산을 모델링하는 것입니다.

발생하고 기록된 모든 것이 이벤트입니다. 품목 판매 또는 송장 제출과 같이 비즈니스에서 발생하는 일이 될 수 있습니다. 또는 요청이 수신될 때 웹 서버에서 내보내는 로그일 수 있습니다. 특정 시점에서 발생하는 모든 것이 이벤트입니다.

이벤트는 스트림 처리에 기본적이기 때문에 ksqlDB의 핵심 데이터 단위입니다. ksqlDB의 모든 기능은 이벤트를 사용하여 문제를 쉽게 해결할 수 있도록 하는 데 중점을 두고 있습니다. 개별 이벤트에 대해 생각하기는 쉽지만 관련 이벤트를 함께 저장하는 방법을 알아내는 것은 조금 더 어렵습니다. Apache Kafka® 위에 ksqlDB가 구축됩니다.

Kafka는 이벤트 작업을 위한 분산 스트리밍 플랫폼입니다. 수평으로 확장 가능하고 내결함성이 있으며 매우 빠릅니다. 직접 작업하는 것은 낮은 수준일 수 있지만 개별 이벤트와 저장된 이벤트를 모두 모델링하기 위한 강력하고 독단적인 접근 방식이 있습니다.

 

"바퀴를 다시 발명하지 마라 - Don't reinvent the wheel"

 

ksqlDB는 Kafka의 레코드 개념과 매우 유사한 키/값 모델을 사용하여 이벤트를 나타냅니다. 키는 이벤트에 대한  ID를 나타냅니다. 값은 발생한 이벤트에 대한 정보를 나타냅니다. 키와 값의 이러한 조합을 이용하면 저장된 이벤트를 모델링할 수 있습니다. 동일한 키를 가진 여러 이벤트는 값에 관계없이 동일한 ID를 나타내기 때문입니다.

 

ksqlDB의 이벤트는 키와 값보다 더 많은 정보를 전달합니다. Kafka와 유사하게 이벤트가 발생한 시간도 설명합니다.

ksqlDB는 하위 수준의 스트림 프로세서로 작업하는 추상화를 높이는 것을 목표로 합니다. 일반적으로 이벤트는 마치 관계형 데이터베이스의 행인 것처럼 "행"이라고 합니다. 각 행은 일련의 열로 구성됩니다. 열은 이벤트의 키 또는 값에서 데이터를 읽습니다.

 

ksqlDB은 몇 개의 의사 컬럼을 지원합니다.

  • ROWTIME : 이벤트의 시간을 나타냅니다.
  • ROWPARTITION및 : 소스 레코드의 파티션을 나타냅니다.
  • ROWOFFSET : 소스 레코드의오프셋을 나타냅니다.

windowed sources 에는  WINDOWSTART및 WINDOWEND 시스템 열이 있습니다.

'ksqlDB' 카테고리의 다른 글

ksql - Materialized Views  (0) 2022.01.12
ksqlDB - stream vs table  (0) 2022.01.12
ksqlDB - Stream Processing  (0) 2022.01.12
도커로 ksqlDB 설치하기  (0) 2022.01.11
ksqlDB  (0) 2022.01.11
SQL 문으로 실시간 데이터 스트림을 즉시 처리

 

스트림 처리를 사용하면 데이터 스트림에서 즉각적인 통찰력을 얻을 수 있지만 이를 지원하기 위한 인프라를 설정하는 것은 복잡할 수 있습니다. 이것이 Confluent 가 스트림 처리 애플리케이션을 위해 특별히 구축된 데이터베이스인 ksqlDB를 개발한 이유 입니다.

 

KSQL은 streaming SQL 엔진으로서 SQL문법으로 real time application을 작성할 수 있다.

Kafka용 SQL엔진인 KSQL을 사용하면 실시간으로 데이터 스트림을 분석하는 SQL 쿼리를 작성할 수 있다.

KSQL은 Kafka Streams 기반이라 쿼리를 제출하면이 쿼리가 구문 분석되고 Kafka Streams 토폴로지가 빌드되고 실행된다. 이는 KSQL이 Kafka Streams가 제공하는 것과 유사한 개념을 제공하지만 모두 SQL 언어 (스트림 ( KStreams ), 테이블 ( KTables ), 조인, 윈도우 기능 등)로 제공한다. 

 

 

스트림 처리 아키텍처 간소화

ksqlDB는 데이터 스트림을 수집하고 강화하며 새로운 파생 스트림 및 테이블에 대한 쿼리를 제공하기 위한 단일 솔루션을 제공합니다. 즉, 배포, 유지 관리, 확장 및 보안을 위한 인프라가 줄어듭니다. 데이터 아키텍처에서 덜 움직이는 부분을 사용하면 정말 중요한 것인 혁신에 집중할 수 있습니다.

 

간단한 SQL 구문으로 실시간 애플리케이션 구축

친숙하고 가벼운 SQL 구문을 통해 관계형 데이터베이스에서 기존 앱을 구축할 때와 마찬가지로 쉽고 친숙하게 실시간 애플리케이션을 구축할 수 있습니다. Kafka Streams는 ksqlDB와 어떻게 비교됩니까? 잘. ksqlDB는 실시간 데이터 스트림을 강화, 변환 및 처리하기 위한 가볍고 강력한 Java 라이브러리인 Kafka Streams를 기반으로 구축되었습니다. 핵심 에 Kafka Streams 가 있다는 것은 ksqlDB가 잘 설계되고 쉽게 이해할 수 있는 추상화 계층을 기반으로 구축되었음을 의미합니다. 이제 초보자와 전문가 모두 재미있고 접근 가능한 방식으로 Kafka의 힘을 쉽게 잠금 해제하고 완전히 활용할 수 있습니다.

 

도커로 ksqlDB 설치하기

 

도커로 ksqlDB 설치하기

1. 독립 실행형 ksqlDB 가져오기 ksqlDB는 Apache Kafka®에서 실행되므로 Kafka가 실행되고 있어야 합니다. 본 문서에서는 zookeeper, kafka broker는 설치되었다고 가정하고 ksqlDB 설치에 대해서만 설명합니다..

yooloo.tistory.com

ksqlDB - Events

 

ksqlDB - Events

이벤트란? ksqlDB는 스트림 처리 애플리케이션을 위해 만들어진 데이터베이스입니다. 스트림 처리의 주요 초점은 이벤트의 무제한 스트림에 대한 계산을 모델링하는 것입니다. 발생하고 기록된

yooloo.tistory.com

ksqlDB - stream vs table

 

ksqlDB - stream vs table

스트림과 테이블이란 무엇입니까? 스트림과 테이블은 모두 연속 데이터가 있는 Kafka topic의 래퍼입니다. 스트림은 세상에서 발생하는 이벤트를 캡처한 데이터를 나타내며 다음과 같은 특징이 있

yooloo.tistory.com

ksqlDB - Stream Processing

 

ksqlDB - Stream Processing

스트림 처리  Apache Kafka® 주제에 대해 스키마를 사용하여 컬렉션을 만드는 것은 유용하지만, 애플리케이션을 만드는 것 자체로는 유용성이 제한됩니다. 컬렉션을 선언하면 현재 형식의 이벤트

yooloo.tistory.com

 

 

'ksqlDB' 카테고리의 다른 글

ksql - Materialized Views  (0) 2022.01.12
ksqlDB - stream vs table  (0) 2022.01.12
ksqlDB - Stream Processing  (0) 2022.01.12
도커로 ksqlDB 설치하기  (0) 2022.01.11
ksqlDB - Events  (0) 2022.01.11

+ Recent posts