람다 함수

람다 함수 또는 줄여서 "람다"를 사용하여 다양한 방식으로 입력 값에 적용할 수 있는 간단한 인라인 함수를 표현합니다. 

예를 들어 컬렉션의 각 요소에 람다 함수를 적용하여 결과 컬렉션을 변환할 수 있습니다. 또한 람다를 사용하여 컬렉션의 요소를 필터링하거나 컬렉션을 단일 값으로 줄일 수 있습니다. 

람다의 장점은 전체 UDF를 구현할 필요가 없는 방식으로 사용자 정의 기능을 표현할 수 있습니다.

가이드 에서 람다 함수를 사용하는 방법을 알아봅니다.

 

 

'ksqlDB' 카테고리의 다른 글

ksql - Connectors - 수정  (0) 2022.01.13
ksql - User-defined functions  (0) 2022.01.13
ksql - Time and Windows - 수정  (0) 2022.01.13
ksql - joins  (0) 2022.01.13
ksql - 쿼리  (0) 2022.01.13

 

Confluent 플랫폼용 JDBC 커넥터(소스 및 싱크)

Kafka Connect JDBC 소스 커넥터를 사용하여 JDBC 드라이버가 있는 모든 관계형 데이터베이스에서 Apache Kafka® topic으로 데이터를 가져올 수 있습니다. JDBC 싱크 커넥터를 사용하여 Kafka topic에서 JDBC 드라이버가 있는 모든 관계형 데이터베이스로 데이터를 내보낼 수 있습니다. JDBC 커넥터는 다양한 데이터베이스를 지원합니다.

JDBC 커넥터 설치

이 커넥터는 기본적으로 Confluent Platform과 함께 번들로 제공됩니다. Confluent Platform이 설치되어 실행 중인 경우 추가적으로 설치가 필요 없습니다.

Confluent Platform이 설치되어 실행되지 않은 경우 Confluent Hub 클라이언트 (권장)를 사용하여 커넥터를 설치하거나 ZIP 파일을 수동으로 다운로드할 수 있습니다.

다중 노드 Connect 클러스터를 실행 중인 경우 클러스터의 모든 Connect 작업자에 JDBC 커넥터 및 JDBC 드라이버 JAR를 설치해야 합니다.

Confluent Hub를 사용하여 커넥터 설치

전제 조건Confluent Hub 클라이언트 가 설치되어 있어야 합니다. 이것은 Confluent Enterprise와 함께 기본적으로 설치됩니다.

Confluent Platform 설치 디렉터리로 이동하고 다음 명령을 실행하여 최신( latest) 커넥터 버전을 설치합니다. Connect가 실행될 모든 시스템에 커넥터를 설치해야 합니다.

confluent-hub install confluentinc/kafka-connect-jdbc:latest
 

버전을 지정하여 특정 버전을 설치할 수 있습니다. 아래와 같이 latest를 5.5.1  로 변경하여 설치할 수 있습니다.

confluent-hub install confluentinc/kafka-connect-jdbc:5.5.1
 

커넥터를 수동으로 설치

커넥터용 ZIP 파일을 다운로드하여 압축을 푼 다음 수동 커넥터 설치 지침 을 따릅니다 .

License

이 커넥터는 Confluent Community License 하에 사용할 수 있습니다 .

구성 속성

소스 커넥터에 대한 구성 속성의 전체 목록은 JDBC 커넥터 소스 커넥터 구성 속성 을 참조하십시오 .

싱크 커넥터에 대한 구성 속성의 전체 목록은 JDBC 싱크 커넥터 구성 속성 을 참조하세요 .

메모

Kafka Connect를 Confluent Cloud 에 연결하는 방법의 예는 Connect Kafka Connect to Confluent Cloud 의 분산 클러스터 를 참조하십시오 .

JDBC 드라이버 설치

JDBC 소스 및 싱크 커넥터 는 애플리케이션이 광범위한 데이터베이스 시스템에 연결하고 사용할 수 있도록 하는 JDBC(Java Database Connectivity) API 를 사용합니다. 이것이 작동하려면 커넥터에 사용할 특정 데이터베이스 시스템에 대한 JDBC 드라이버 가 있어야 합니다.

커넥터는 일부 데이터베이스 시스템용 JDBC 드라이버와 함께 제공되지만 다른 데이터베이스 시스템에서 커넥터를 사용하기 전에 해당 데이터베이스 시스템에 대한 최신 JDBC 4.0 드라이버를 설치해야 합니다. 세부 정보는 JDBC 드라이버마다 다르지만 기본 단계는 다음과 같습니다.

  1. 사용할 각 데이터베이스 시스템에 대한 JDBC 4.0 드라이버 JAR 파일을 찾으십시오.
  2. 이 JAR 파일을 share/java/kafka-connect-jdbc각 Connect 작업자 노드의 Confluent Platform 설치 디렉터리에 넣습니다.
  3. 모든 Connect 작업자 노드를 다시 시작하십시오.

이 섹션의 나머지 부분에서는 보다 일반적인 데이터베이스 관리 시스템에 대한 특정 단계를 간략하게 설명합니다.

일반 지침

다음은 고려해야 할 추가 지침입니다.

  • 사용 가능한 최신 버전의 JDBC 4.0 드라이버를 사용하십시오. 최신 버전의 JDBC 드라이버는 대부분의 데이터베이스 관리 시스템 버전을 지원하며 더 많은 버그 수정이 포함되어 있습니다.
  • Connect 작업자를 실행하는 데 사용되는 Java 버전에 대해 올바른 JAR 파일을 사용하십시오. 일부 JDBC 드라이버에는 여러 Java 버전에서 작동하는 단일 JAR이 있습니다. 다른 드라이버에는 Java 8용 JAR과 Java 10 또는 11용 JAR이 있습니다. 사용 중인 Java 버전에 대해 올바른 JAR 파일을 사용하는지 확인하십시오. 잘못된 버전의 Java에 대해 JDBC 드라이버 JAR 파일을 설치하고 사용하려고 하면 JDBC 소스 커넥터 또는 JDBC 싱크 커넥터를 시작하는 데 실패할 가능성이 UnsupportedClassVersionError. 이 경우 설치한 JDBC 드라이버 JAR 파일을 제거하고 올바른 JAR 파일로 드라이버 설치 프로세스를 반복하십시오.
  • 위에서 share/java/kafka-connect-jdbc언급한 디렉토리는 Confluent Platform용입니다. 다른 설치를 사용하는 경우 Confluent JDBC 소스 및 싱크 커넥터 JAR ​​파일이 있는 위치를 찾고 대상 데이터베이스에 대한 JDBC 드라이버 JAR 파일을 동일한 디렉토리에 배치합니다.
  • 데이터베이스 관리 시스템에 특정한 JDBC 드라이버가 올바르게 설치되지 않은 경우 JDBC 소스 또는 싱크 커넥터는 시작 시 실패합니다. 일반적으로 시스템에서 오류가 발생 합니다. 이 경우 지침에 따라 JDBC 드라이버를 다시 설치하십시오.No suitable driver found

마이크로소프트 SQL 서버

JDBC 소스 및 싱크 커넥터에는 Microsoft SQL Server에서 읽고 쓸 수 있는 오픈 소스 jTDS JDBC 드라이버 가 포함되어 있습니다. JDBC 4.0 드라이버가 포함되어 있으므로 Microsoft SQL Server에 대한 커넥터를 실행하기 전에 추가 단계가 필요하지 않습니다.

또는 jTDS JDBC 드라이버를 제거하고 오픈 소스 Microsoft JDBC 드라이버 를 설치할 수 있습니다 . 먼저 최신 버전의 JDBC 드라이버 아카이브(예: sqljdbc_7.2.2.0_enu.tar.gz영어)를 다운로드하고 파일 내용을 임시 디렉토리에 추출한 다음 Java 버전에 맞는 올바른 JAR 파일을 찾습니다. 예를 들어, 드라이버의 7.2.2.0 버전을 다운로드하는 경우 Java 8에서 Connect를 실행 하는 경우 또는 Java 11에서 Connect를 실행 하는 경우를 찾으십시오 .mssql-jdbc-7.2.2.jre8.jarmssql-jdbc-7.2.2.jre11.jar

그런 다음 JDBC 소스 또는 싱크 커넥터를 배포하기 전에 각 Connect 작업자 노드에서 다음 단계를 수행하십시오.

  1. share/java/kafka-connect-jdbc/jtds-1.3.1.jarConfluent Platform 설치에서 기존 파일을 제거 합니다.
  2. share/java/kafka-connect-jdbc/Confluent Platform 설치 디렉터리에 JAR 파일을 설치합니다.
  3. Connect 작업자를 다시 시작하십시오.

잘못된 버전의 Java용 JDBC 드라이버 JAR 파일을 설치하고 SQL Server 데이터베이스를 사용하는 JDBC 소스 커넥터 또는 JDBC 싱크 커넥터를 시작하려고 하면 커넥터가 UnsupportedClassVersionError. 이 경우 JDBC 드라이버 JAR 파일을 제거하고 올바른 JAR 파일로 드라이버 설치 프로세스를 반복하십시오.

Kerberos 인증 예외

Kafka 클러스터가 Kerberos(SASL_SSL)로 보호되고 커넥터가 통합 Kerberos 인증으로 구성된 Microsoft SQL Server 데이터베이스에 액세스하는 경우 다음 예외가 발생합니다.

connect-distributed: Caused by: javax.security.auth.login.LoginException: Unable to obtain Principal Name for authentication
 

이 예외를 해결하려면 아래와 같이 Connect 속성 파일을 로드하기 전에 시스템 속성을 변경해야 합니다.

export KAFKA_OPTS="-Djavax.security.auth.useSubjectCredsOnly=false"

bin/connect-distributed etc/kafka/connect-distributed.properties
 

이 시스템 속성에 대한 자세한 내용은 Oracle 설명서 를 참조하십시오 .

PostgreSQL 데이터베이스

JDBC 소스 및 싱크 커넥터에는 PostgreSQL 데이터베이스 서버에서 읽고 쓸 수 있는 오픈 소스 PostgreSQL JDBC 4.0 드라이버 가 포함되어 있습니다. JDBC 4.0 드라이버가 포함되어 있으므로 PostgreSQL 데이터베이스에 대한 커넥터를 실행하기 전에 추가 단계가 필요하지 않습니다.

오라클 데이터베이스

Oracle은 Oracle 용 JDBC 드라이버를 제공합니다 . 최신 버전을 찾아 Java 8에서 Connect를 실행 중인 경우 또는 Java 11에서 Connect를 실행 중인 경우 다운로드하십시오. 그런 다음 이 JAR 파일 하나를 Confluent Platform 설치의 디렉토리에 배치하고 모든 Connect 작업자 노드를 다시 시작하십시오.ojdbc8.jar ojdbc10.jarshare/java/kafka-connect-jdbc

JDBC 드라이버 및 동반 JAR이 있는 파일 을 다운로드하는 경우 tar.gz의 파일 내용을 tar.gz 임시 디렉토리로 추출하고 readme 파일을 사용하여 필요한 JAR 파일을 판별하십시오. JDBC 드라이버 JAR 파일 및 기타 필수 컴패니언 JAR 파일을 share/java/kafka-connect-jdbc각 Connect 작업자 노드의 Confluent Platform 설치 디렉터리에 복사한 다음 모든 Connect 작업자 노드를 다시 시작합니다.

잘못된 버전의 Java용 JDBC 드라이버 JAR 파일을 설치하고 Oracle 데이터베이스를 사용하는 JDBC 소스 커넥터 또는 JDBC 싱크 커넥터를 시작하려고 하면 커넥터가 UnsupportedClassVersionError. 이 경우 JDBC 드라이버 JAR 파일을 제거하고 올바른 JAR 파일로 드라이버 설치 프로세스를 반복하십시오.

IBM DB2

IBM은 DB2 버전에 따라 달라지는 DB2용 JDBC 드라이버를 제공합니다. 일반적으로 최신 JDBC 4.0 드라이버를 선택하고 다운로드 옵션 중 하나를 선택합니다. 다운로드한 파일 db2jcc4.jar내에서 파일 의 압축을 풀고 찾아 파일 만 Confluent Platform 설치 디렉터리에 넣습니다 .tar.gzdb2jdcc4.jarshare/java/kafka-connect-jdbc

예를 들어, 압축 tar.gz파일(예: v10.5fp10_jdbc_sqlj.tar.gz)을 다운로드한 경우 다음 단계를 수행하십시오.

  1. tar.gz파일 내용을 임시 디렉토리에 추출합니다 .
  2. db2_db2driver_for_jdbc_sqlj압축을 푼 파일에서 ZIP 파일(예: )을 찾습니다.
  3. zip파일의 내용을 다른 임시 디렉토리에 추출하십시오 .
  4. 파일을 찾아 각 Connect 작업자 노드의 Confluent Platform 설치 디렉터리에 db2jdcc4.jar복사 share/java/kafka-connect-jdbc 한 다음 모든 Connect 작업자 노드를 다시 시작합니다.
  5. 두 개의 임시 디렉토리를 제거하십시오.

메모

share/java/kafka-connect-jdbcIBM 다운로드의 다른 파일을 Confluent Platform 설치 의 디렉토리에 두지 마십시오 . AS/400에서 실행되는 DB2용 UPSERT는 현재 Confluent JDBC Connector에서 지원되지 않습니다.

MySQL 서버

MySQL은 여러 플랫폼 에서 MySQL용 Connect/J JDBC 드라이버를 제공합니다. 플랫폼 독립 옵션을 선택 하고 압축 TAR 아카이브 를 다운로드하십시오 . 이 파일에는 JAR 파일과 소스 코드가 모두 포함되어 있습니다.

tar.gz이 파일의 내용을 임시 디렉토리 에 추출하십시오 . 추출된 파일 중 하나는 jar파일(예: mysql-connector-java-8.0.16.jar)이며 이 JAR 파일만 각 share/java/kafka-connect-jdbcConnect 작업자 노드의 Confluent Platform 설치 디렉터리에 복사한 다음 모든 Connect 작업자 노드를 다시 시작합니다.

SAP HANA

SAP는 SAP HANA JDBC 드라이버 를 제공하고 Maven Central 에서 사용할 수 있도록 합니다 . 최신 버전의 JAR 파일(예: ngdbc-2.4.56.jar)을 다운로드 share/java/kafka-connect-jdbc하여 각 Connect 작업자 노드의 Confluent Platform 설치 디렉터리에 넣은 다음 모든 Connect 작업자 노드를 다시 시작합니다.

SQLite 임베디드 데이터베이스

JDBC 소스 및 싱크 커넥터에는 로컬 SQLite 데이터베이스에서 읽고 쓸 수 있는 오픈 소스 SQLite JDBC 4.0 드라이버 가 포함되어 있습니다. SQLite는 내장 데이터베이스이기 때문에 이 구성은 데모용입니다.

기타 데이터베이스

다른 데이터베이스용 JDBC 4.0 드라이버 JAR 파일을 찾아 필요한 JAR 파일만 share/java/kafka-connect-jdbc각 Connect 작업자 노드의 Confluent Platform 설치 디렉터리에 넣은 다음 모든 Connect 작업자 노드를 다시 시작합니다.

 

'ksqlDB' 카테고리의 다른 글

ksql - Lambda Functions  (0) 2022.01.13
ksql - User-defined functions  (0) 2022.01.13
ksql - Time and Windows - 수정  (0) 2022.01.13
ksql - joins  (0) 2022.01.13
ksql - 쿼리  (0) 2022.01.13

사용자 정의 함수

ksqlDB는 내장 함수를 제공합니다.

ksqlDB에 없는 기능을 사용해야 하는 경우에 사용자 정의 함수를 사용하면 Java 후크를 사용하여 새 함수를 추가할 수 있습니다. 

방법 가이드 에서 사용법을 배우 거나 참조 문서 에서 중요한 정보를 찾아보십시오.

 

 

'ksqlDB' 카테고리의 다른 글

ksql - Lambda Functions  (0) 2022.01.13
ksql - Connectors - 수정  (0) 2022.01.13
ksql - Time and Windows - 수정  (0) 2022.01.13
ksql - joins  (0) 2022.01.13
ksql - 쿼리  (0) 2022.01.13

Time and Windows

Important

This page refers to timestamps as a field in records. For information on the TIMESTAMP data type, see Timestamp types.

In ksqlDB, a record is an immutable representation of an event in time. Each record carries a timestamp, which determines its position on the time axis.

This is the default timestamp that ksqlDB uses for processing the record. The timestamp is set either by the producer application or by the Apache Kafka® broker, depending on the topic's configuration. Records may be out-of-order within the stream.

Timestamps are used by time-dependent operations, like aggregations and joins.

Time semantics

Timestamps have different meanings, depending on the implementation. A record's timestamp can refer to the time when the event occurred, or when the record was ingested into Kafka, or when the record was processed. These times are event-time, ingestion-time, and processing-time.

Event-time

The time when a record is created by the data source. Achieving event-time semantics requires embedding timestamps in records when an event occurs and the record is produced.

For example, if the record is a geo-location change reported by a GPS sensor in a car, the associated event-time is the time when the GPS sensor captured the location change.

Ingestion-time

The time when a record is stored in a topic partition by a Kafka broker. Ingestion-time is similar to event-time, as a timestamp is embedded in the record, but the ingestion timestamp is generated when the Kafka broker appends the record to the target topic.

Ingestion-time can approximate event-time if the time difference between the creation of the record and its ingestion into Kafka is small.

For use cases where event-time semantics aren't possible, ingestion-time may be an alternative. Consider using ingestion-time when data producers don't embed timestamps in records, as in older versions of Kafka's Java producer client, or when the producer can't assign timestamps directly, like when it doesn't have access to a local clock.

Processing-time

The time when the record is consumed by a stream processing application. The processing-time can occur immediately after ingestion-time, or it may be delayed by milliseconds, hours, days, or longer.

For example, imagine an analytics application that reads and processes the geo-location data reported from car sensors, and presents it to a fleet-management dashboard. In this case, processing-time in the analytics application might be many minutes or hours after the event-time, as cars can move out of mobile reception for periods of time and have to buffer records locally.

Stream-time

The maximum timestamp seen over all processed records so far.

Important

Don't mix streams or tables that have different time semantics.

Timestamp assignment

A record's timestamp is set either by the record's producer or by the Kafka broker, depending on the topic's timestamp configuration. The topic's message.timestamp.type setting can be either CreateTime or LogAppendTime.

  • CreateTime: The broker uses the the record's timestamp as set by the producer. This setting enforces event-time semantics.
  • LogAppendTime: The broker overwrites the record's timestamp with the broker's local time when it appends the record to the topic's log. This setting enforces ingestion-time semantics. If LogAppendTime is configured, the producer has no control over the timestamp.

ksqlDB doesn't support processing-time operations directly, but you can implement user-defined functions (UDFs) that access the current time. For more information, see Functions.

By default, when ksqlDB imports a topic to create a stream, it uses the record's timestamp, but you can add the WITH(TIMESTAMP='some-field') clause to use a different field from the record's value as the timestamp. The optional TIMESTAMP_FORMAT property indicates how ksqlDB should parse the field. The field you specify can be an event-time or an ingestion-time. This approach implements payload-time semantics.

Important

If you use the WITH(TIMESTAMP=...) clause, this timestamp must be expressible as a Unix epoch time in milliseconds, which is the number of milliseconds that have elapsed since 1 January 1970 at midnight UTC/GMT. Also, you can specify the timestamp as a string when you provide a TIMESTAMP_FORMAT. For more information, see Timestamp Formats.

When working with time you should also make sure that additional aspects of time, like time zones and calendars, are correctly synchronized — or at least understood and traced — throughout your streaming data pipelines. It helps to agree on specifying time information in UTC or in Unix time, like seconds since the Unix epoch, everywhere in your system.

Timestamps of ksqlDB output streams

When a ksqlDB application writes new records to Kafka, timestamps are assigned to the records it creates. ksqlDB uses the underlying Kafka Streams implementation for computing timestamps. Timestamps are assigned based on context:

  • When new output records are generated by processing an input record directly, output record timestamps are inherited from input record timestamps.
  • When new output records are generated by a periodic function, the output record timestamp is defined as the current internal time of the stream task.
  • For stateless operations, the input record timestamp is passed through. For flatMap and siblings that emit multiple records, all output records inherit the timestamp from the corresponding input record.

Timestamps for aggregations and joins

For aggregations and joins, timestamps are computed by using the following rules.

  • For joins (stream-stream, table-table) that have left and right input records, the timestamp of the output record is assigned max(left.ts, right.ts).
  • For stream-table joins, the output record is assigned the timestamp from the stream record.
  • For aggregations, the max timestamp is computed over all records, per key, either globally (for non-windowed) or per-window.

Producers and timestamps

A producer application can set the timestamp on its records to any value, but usually, it choses a sensible event-time or the current wall-clock time.

If the topic's message.timestamp.type configuration is set to CreateTime, the following holds for the producer:

  • When a producer record is created, it contains no timestamp, by default.
  • The producer can set the timestamp on the record explicitly.
  • If the timestamp isn't set when the producer application calls the producer.send() method, the current wall-clock time is set automatically.

In all three cases, the time semantics are considered to be event-time.

Timestamp extractors

When ksqlDB imports a topic to create a stream, it gets the timestamp from the topic's messages by using a timestamp extractor class. Timestamp extractors implement the TimestampExtractor interface.

Concrete implementations of timestamp extractors may retrieve or compute timestamps based on the actual contents of data records, like an embedded timestamp field, to provide event-time or ingestion-time semantics, or they may use any other approach, like returning the current wall-clock time at the time of processing to implement processing-time semantics.

By creating a custom timestamp extractor class, you can enforce different notions or semantics of time, depending on the requirements of your business logic. For more information see default.timestamp.extractor.

Windows in SQL queries

Representing time consistently enables aggregation operations on streams and tables, like SUM, that have distinct time boundaries. In ksqlDB, these boundaries are named windows.

A window has a start time and an end time, which you access in your queries by using the WINDOWSTART and WINDOWEND system columns.

Important

ksqlDB is based on the Unix epoch time in the UTC timezone, and this can affect time windows. For example, if you define a 24-hour tumbling time window, it will be in the UTC timezone, which may not be appropriate if you want to have daily windows in your timezone.

Windowing lets you control how to group records that have the same key for stateful operations, like aggregations or joins, into time spans. ksqlDB tracks windows per record key.

Note

A related operation is grouping, which groups all records that have the same key to ensure that records are properly partitioned, or "keyed", for subsequent operations. When you use the GROUP BY clause in a query, windowing enables you to further sub-group the records of a key.

Windowing queries must group by the keys that are selected in the query.

When using windows in your SQL queries, aggregate functions are applied only to the records that occur within a specific time window. Records that arrive out-of-order are handled as you might expect: although the window end time has passed, the out-of-order records are still associated with the correct window.

Window types

There are three ways to define time windows in ksqlDB: hopping windows, tumbling windows, and session windows. Hopping and tumbling windows are time windows, because they're defined by fixed durations they you specify. Session windows are dynamically sized based on incoming data and defined by periods of activity separated by gaps of inactivity.

Window typeBehaviorDescription
Tumbling Window Time-based Fixed-duration, non-overlapping, gap-less windows
Hopping Window Time-based Fixed-duration, overlapping windows
Session Window Session-based Dynamically-sized, non-overlapping, data-driven windows

Hopping window

Hopping windows are based on time intervals. They model fixed-sized, possibly overlapping windows. A hopping window is defined by two properties: the window's duration and its advance, or "hop", interval. The advance interval specifies how far a window moves forward in time relative to the previous window. For example, you can configure a hopping window with a duration of five minutes and an advance interval of one minute. Because hopping windows can overlap, and usually they do, a record can belong to more than one such window.

All hopping windows have the same duration, but they might overlap, depending on the length of time specified in the ADVANCE BY property.

For example, if you want to count the pageviews for only Region_6 by female users for a hopping window of 30 seconds that advances by 10 seconds, you might run a query like this:

 
 

The hopping window's start time is inclusive, but the end time is exclusive. This is important for non-overlapping windows, in which each record must be contained in exactly one window.

Tumbling window

Tumbling windows are a special case of hopping windows. Like hopping windows, tumbling windows are based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window's duration. A tumbling window is a hopping window whose window duration is equal to its advance interval. Since tumbling windows never overlap, a record will belong to one and only one window.

All tumbling windows are the same size and adjacent to each other, which means that whenever a window ends, the next window starts.

For example, if you want to compute the the five highest-value orders per zip code per hour in an orders stream, you might run a query like this:

 
 

Here's another example: to detect potential credit card fraud in an authorization_attempts stream, you might run a query for the number of authorization attempts on a particular card that's greater than three, during a time interval of five seconds.

 
 

The tumbling window's start time is inclusive, but the end time is exclusive. This is important for non-overlapping windows, in which each record must be contained in exactly one window.

Session window

A session window aggregates records into a session, which represents a period of activity separated by a specified gap of inactivity, or "idleness". Any records with timestamps that occur within the inactivity gap of existing sessions are merged into the existing sessions. If a record's timestamp occurs outside of the session gap, a new session is created.

A new session window starts if the last record that arrived is further back in time than the specified inactivity gap.

Session windows are different from the other window types, because:

  • ksqlDB tracks all session windows independently across keys, so windows of different keys typically have different start and end times.
  • Session window durations vary. Even windows for the same key typically have different durations.

Session windows are especially useful for user behavior analysis. Session-based analyses range from simple metrics, like counting user visits on a news website or social platform, to more complex metrics, like customer-conversion funnel and event flows.

For example, to count the number of pageviews per region for session windows with a session inactivity gap of 60 seconds, you might run the following query, which sessionizes the input data and performs the counting/aggregation step per region:

 
 

The start and end times for a session window are both inclusive, in contrast to time windows.

A session window contains at least one record. It's not possible for a session window to have zero records.

If a session window contains exactly one record, the record's ROWTIME timestamp is identical to the window's own start and end times. Access these by using the WINDOWSTART and WINDOWEND system columns.

If a session window contains two or more records, then the earliest/oldest record's ROWTIME timestamp is identical to the window's start time, and the latest/newest record's ROWTIME timestamp is identical to the window's end time.

Windowed joins

ksqlDB supports using windows in JOIN queries by using the WITHIN clause.

For example, to find orders that have shipped within the last hour from an orders stream and a shipments stream, you might run a query like:

 
 

For more information on joins, see Join Event Streams with ksqlDB.

Out-of-order events

Frequently, events that belong to a window can arrive out-of-order, for example, over slow networks, and a grace period may be required to ensure the events are accepted into the window. ksqlDB enables configuring this behavior, for each of the window types.

For example, to allow events to be accepted for up to two hours after the window ends, you might run a query like:

 
 

Events that arrive after the grace period has passed are called late and aren't included in the aggregation result.

Window retention

For each window type, you can configure the number of windows in the past that ksqlDB retains. This capability is very useful for interactive applications that use ksqlDB as their primary serving data store.

For example, to retain the computed windowed aggregation results for a week, you might run the following query:

 
 

Note that the specified retention period should be larger than the sum of window size and any grace period.

'ksqlDB' 카테고리의 다른 글

ksql - Connectors - 수정  (0) 2022.01.13
ksql - User-defined functions  (0) 2022.01.13
ksql - joins  (0) 2022.01.13
ksql - 쿼리  (0) 2022.01.13
ksql - 테이블  (0) 2022.01.13

Join Index

 

Joining collections

 

Partitioning requirements

 

Synthetic key columns

 

 

 

 

'ksqlDB' 카테고리의 다른 글

ksql - User-defined functions  (0) 2022.01.13
ksql - Time and Windows - 수정  (0) 2022.01.13
ksql - 쿼리  (0) 2022.01.13
ksql - 테이블  (0) 2022.01.13
ksql - Streams  (0) 2022.01.12

쿼리

ksqlDB에는 영구(persistent), 푸시(push), 풀(pull)의 세 가지 쿼리가 있습니다. 

 

영구 쿼리

영구 쿼리는 이벤트 행을 무기한으로 실행하는 서버 측 쿼리입니다. 기존 스트림 또는 테이블에서 새 스트림 및 새 테이블 을 만들어 영구 쿼리를 실행합니다.

Push 쿼리

푸시 쿼리 는 실시간으로 변하는 이벤트를 구독하는  클라이언트에 의해 발행된 쿼리입니다.

푸시 쿼리의 예로 특정 사용자의 지리적 위치를 구독하는 것입니다. 쿼리는 지도 좌표를 요청하는 푸시 쿼리이기 때문에 위치 변경이 발생하는 즉시 클라이언트에 "푸시"됩니다. 이는 프로그래밍 방식으로 제어되는 마이크로서비스, 실시간 앱 또는 모든 종류의 비동기 제어 흐름을 빌드하는 데 유용합니다.

푸시 쿼리는 SQL과 유사한 언어를 사용하여 표현됩니다. 특정 키에 대한 스트림이나 테이블을 쿼리하는 데 사용할 수 있습니다. 또한 푸시 쿼리는 키 조회에만 국한되지 않습니다. 필터, 선택, 그룹화 기준, 파티션 기준 및 조인을 포함한 전체 SQL 세트를 지원합니다.

푸시 쿼리를 사용하면 결과에 대한 구독으로 스트림 또는 구체화된 테이블을 쿼리할 수 있습니다. 스트림을 반환하는 쿼리를 포함하여 모든 쿼리의 출력을 구독할 수 있습니다. 푸시 쿼리는 스트림 또는 구체화된 테이블로 실시간으로 새로운 정보에 조회할 수 있습니다. 비동기식 애플리케이션 흐름에 적합합니다. 요청/응답 흐름은 풀 쿼리를 참조하십시오 .

ksqlDB REST API에 HTTP 요청을 전송하여 푸시 쿼리를 실행하면 API가 무기한 길이의 청크 응답을 다시 보냅니다.

푸시 쿼리의 결과는 지원 Kafka topic에 유지되지 않습니다. Kafka 주제에 대한 쿼리 결과를 유지해야 하는 경우 CREATE TABLE AS SELECT 또는 CREATE STREAM AS SELECT 문을 사용합니다.

Pull 쿼리

풀 쿼리는 기존 RDBS에 대한 쿼리와 같이 "지금"의 결과를 검색하는 클라이언트가 수행한 쿼리입니다.

특정 사용자에 대한 지리적 위치에 대한 풀 쿼리는 현재 지도 좌표를 요청합니다. 풀 쿼리이기 때문에 현재까지의 데이터를 반환되고 연결이 닫힙니다. 이는 페이지 로드 시 사용자 인터페이스를 한 번 렌더링하는 데 이상적입니다. 일반적으로 모든 종류의 동기 제어 흐름에 적합합니다.

풀 쿼리를 사용하면 구체화된 뷰의 현재 상태를 가져올 수 있습니다. 구체화된 뷰는 새 이벤트가 도착하면 점진적으로 업데이트되기 때문에 풀 쿼리는 짧은 수행 시간으로 실행됩니다. 요청/응답 흐름에 매우 적합합니다. 비동기식 애플리케이션 흐름의 경우 푸시 쿼리를 참조하십시오 . 풀 쿼리는 ANSI SQL을 따릅니다.

ksqlDB REST API에 HTTP 요청을 전송하여 pull 쿼리를 실행하면 API가 단일 응답으로 응답합니다.

 

 

'ksqlDB' 카테고리의 다른 글

ksql - Time and Windows - 수정  (0) 2022.01.13
ksql - joins  (0) 2022.01.13
ksql - 테이블  (0) 2022.01.13
ksql - Streams  (0) 2022.01.12
ksql - Materialized Views  (0) 2022.01.12

https://noti.st/rmoff/NrxcrM/slides 참고

테이블

테이블은 시간이 지남에 따라 데이터가 변경되는 컬렉션입니다. 

이벤트의 시간적 순서를 나타내는 스트림과 달리 테이블은 "지금" 현재의 사실을 나타냅니다.

 

예를 들어, 테이블을 사용하여 누군가가 살았던 장소를 모델링 하면,

 살았던 장소가 "서울 -> 제주도 -> 대전" 순으로 변경된 경우 테이블에는 최종 데이터인 "대전" 데이터를 가집니다.

 

테이블은 각 행의 키를 활용하여 작동합니다. 일련의 행이 키가 같은 경우,  데이터는 해당 키의 최신 정보를 나타냅니다. 

 

 

EVENT - STREAM - TABLE 관계

 

테이블 생성

 

'ksqlDB' 카테고리의 다른 글

ksql - joins  (0) 2022.01.13
ksql - 쿼리  (0) 2022.01.13
ksql - Streams  (0) 2022.01.12
ksql - Materialized Views  (0) 2022.01.12
ksqlDB - stream vs table  (0) 2022.01.12

stream

스트림

스트림은 일련의 발생 사실을 나타내는 컬렉션입니다. 분할되고 변경 불가능하고 추가만 가능합니다.

이벤트의 집합으로 생각하면 됩니다.

 

예를 들어, 스트림의 행은 "Alice가 Bob에게 $100를 보냈습니다", "Charlie가 Bob에게 $50를 보냈습니다"와 같은 일련의 금융 거래를 모델링할 수 있습니다.

 

이벤트가 스트림에 입력되면 절대 변경할 수 없습니다. 스트림 끝에 새 행을 추가할 수 있지만 기존 행은 업데이트하거나 삭제할 수 없습니다.

 

각 행은 특정 파티션에 저장됩니다. 모든 행에는 암시적이든 명시적이든 해당 ID를 가르키는 키가 있습니다. 

동일한 키를 가진 모든 행은 동일한 파티션에 있습니다.

 

'ksqlDB' 카테고리의 다른 글

ksql - 쿼리  (0) 2022.01.13
ksql - 테이블  (0) 2022.01.13
ksql - Materialized Views  (0) 2022.01.12
ksqlDB - stream vs table  (0) 2022.01.12
ksqlDB - Stream Processing  (0) 2022.01.12

+ Recent posts