ksqlDB

ksqlDB - Stream Processing

필유아사 2022. 1. 12. 21:44

스트림 처리

 Apache Kafka® 주제에 대해 스키마를 사용하여 컬렉션을 만드는 것은 유용하지만, 애플리케이션을 만드는 것 자체로는 유용성이 제한됩니다. 컬렉션을 선언하면 현재 형식의 이벤트만 사용할 수 있습니다. 그러나 스트리밍 응용 프로그램을 만드는 데 중요한 부분은 이벤트를 변환, 필터링, 결합 및 집계하는 것입니다.

ksqlDB에서는 기존 컬렉션에서 새 컬렉션을 파생하고 컬렉션 간의 변경 사항을 설명하여 이벤트를 조작합니다. 컬렉션이 새 이벤트로 업데이트되면 ksqlDB는 컬렉션에서 파생된 컬렉션을 실시간으로 업데이트합니다. 

 

ksqlDB에서 스트림 처리 SELECT은 기존 컬렉션에 대한 명령문을 사용하여 새 컬렉션을 생성하는 것 입니다. 내부의 결과는 SELECT선언된 외부 컬렉션으로 전달됩니다. ksqlDB는 내부 SELECT문 에서 열 이름과 유형을 유추하기 때문에 새 컬렉션을 파생할 때 스키마를 선언할 필요가 없습니다 . ROWTIME 컬럼은 카프카에 기록된 데이터의 타임 스탬프를 정의하고, ROWPARTITION및 ROWOFFSET 컬럼은 각각의 파티션을 정의하고, 소스 레코드의 오프셋을 정의합니다.

다음은 서로 다른 컬렉션 유형 간에 파생되는 몇 가지 예입니다.

 

기존 스트림에서 새 스트림 파생

아래와 같은 스트림이 있다고 가정합니다.

CREATE STREAM rock_songs (artist VARCHAR, title VARCHAR)
    WITH (kafka_topic='rock_songs', partitions=2, value_format='json');

 

컬럼의 모든 스트림을 대문자로 변환된 새 스트림으로 파생할 수 있습니다.

CREATE STREAM title_cased_songs AS
    SELECT artist, UCASE(title) AS capitalized_song
    FROM rock_songs
    EMIT CHANGES;

새로운 스트림이 rock_songs topic에 삽입될 때마다 대문자로 변환된 스트림이 title_cased_songs 스트림에 추가됩니다 .

 

기존 스트림에서 새 테이블 파생

다음과 같은 테이블과 스트림이 있다고 가정합니다.

CREATE TABLE products (product_name VARCHAR PRIMARY KEY, cost DOUBLE)
    WITH (kafka_topic='products', partitions=1, value_format='json');

CREATE STREAM orders (product_name VARCHAR KEY)
    WITH (kafka_topic='orders', partitions=1, value_format='json');


orders 스트림과 products 테이블을 조인하여 데이터를 집계하는 orders 테이블을 생성할 수 있습니다.

CREATE TABLE order_metrics AS
    SELECT p.product_name, COUNT(*) AS count, SUM(p.cost) AS revenue
    FROM orders o JOIN products p ON p.product_name = o.product_name
    GROUP BY p.product_name EMIT CHANGES;

이 집계 테이블은 각 제품의 수익금액과 주문수를 저장합니다.

 

기존 테이블에서 새 테이블 파생

다음의 집계 테이블이 있다고 가정합니다.

CREATE TABLE page_view_metrics AS
    SELECT url, location_id, COUNT(*) AS count
    FROM page_views GROUP BY url EMIT CHANGES;


테이블에서 행을 필터링하여 다른 테이블을 생성할 수 있습니다.

CREATE TABLE page_view_metrics_mountain_view AS
    SELECT url, count FROM page_view_metrics
    WHERE location_id = 42 EMIT CHANGES;

 

여러 스트림에서 새 스트림 파생

다음 두 스트림이 있습니다.

CREATE STREAM impressions (user VARCHAR KEY, impression_id BIGINT, url VARCHAR)
    WITH (kafka_topic='impressions', partitions=1, value_format='json');

CREATE STREAM clicks (user VARCHAR KEY, url VARCHAR)
    WITH (kafka_topic='clicks', partitions=1, value_format='json');

 

초기 광고 노출 후 1분 이내에 주어진 url이 클릭되었음을 나타내는 파생 스트림을 만들 수 있습니다 .

CREATE STREAM clicked_impressions AS
    SELECT * FROM impressions i JOIN clicks c WITHIN 1 minute ON i.user = c.user
    WHERE i.url = c.url
    EMIT CHANGES;

impressions 에 스트림이 수신되고 1분 이내에 동일한 행이 clicks에 수신 될 때마다 clicked_impressions 스트림으로 전송됩니다.