위 그림과 같이, 프로그램이 실행되기 위해서는 운영체제(OS)가 프로그램의 정보를 메모리에 로드 해야 한다.

또한 프로그램이 실행되는 동안 CPU가 코드를 처리하기 위해서는, 메모리가 명령어와 데이터들을 저장해야 한다.

 

메모리 구조

프로그램이 실행되기 위해서는 먼저 프로그램이 메모리에 로드(load)되어야 합니다.

또한, 프로그램에서 사용되는 변수들을 저장할 메모리도 필요합니다.

따라서 컴퓨터의 운영체제는 프로그램의 실행을 위해 다양한 메모리 공간을 제공하고 있습니다.

프로그램이 운영체제로부터 할당받는 대표적인 메모리 공간은 4가지 있습니다.

  1. 코드(code) 영역
  2. 데이터(data) 영역
  3. 스택(stack) 영역
  4. 힙(heap) 영역

코드(code) 영역

메모리의 코드(code) 영역은 실행할 프로그램의 코드가 저장되는 영역으로 텍스트(code) 영역이라고도 부릅니다.

CPU는 코드 영역에 저장된 명령어를 하나씩 가져가서 처리하게 됩니다.

데이터(data) 영역

메모리의 데이터(data) 영역은 프로그램의 전역 변수와 정적(static) 변수가 저장되는 영역입니다.

데이터 영역은 프로그램의 시작과 함께 할당되며, 프로그램이 종료되면 소멸합니다.

스택(stack) 영역

메모리의 스택(stack) 영역은 함수의 호출과 관계되는 지역 변수와 매개변수가 저장되는 영역입니다.

스택 영역은 함수의 호출과 함께 할당되며, 함수의 호출이 완료되면 소멸합니다.

이렇게 스택 영역에 저장되는 함수의 호출 정보를 스택 프레임(stack frame)이라고 합니다.

스택 영역은 푸시(push) 동작으로 데이터를 저장하고, 팝(pop) 동작으로 데이터를 인출합니다.

이러한 스택은 후입선출(LIFO, Last-In First-Out) 방식에 따라 동작하므로, 가장 늦게 저장된 데이터가 가장 먼저 인출됩니다.

스택 영역은 메모리의 높은 주소에서 낮은 주소의 방향으로 할당됩니다.

힙(heap) 영역

메모리의 힙(heap) 영역은 사용자가 직접 관리할 수 있는 ‘그리고 해야만 하는’ 메모리 영역입니다.

힙 영역은 사용자에 의해 메모리 공간이 동적으로 할당되고 해제됩니다.

힙 영역은 메모리의 낮은 주소에서 높은 주소의 방향으로 할당됩니다.

스택과 힙의 장단점

스택

매우 빠른 액세스

변수를 명시 적으로 할당 해제 할 필요가 없습니다.

공간은 CPU에 의해 효율적으로 관리되고 메모리는 단편화되지 않습니다.

지역 변수 만

스택 크기 제한 (OS에 따라 다름)

변수의 크기를 조정할 수 없습니다.

변수는 전역 적으로 액세스 할 수 있습니다.

메모리 크기 제한 없음

(상대적으로) 느린 액세스

효율적인 공간 사용을 보장하지 못하면 메모리 블록이 할당 된 후 시간이 지남에 따라 메모리가 조각화되어 해제 될 수 있습니다.

메모리를 관리해야합니다 (변수를 할당하고 해제하는 책임이 있습니다)

변수는 C언어 realloc() or 자바 new

 

원문 : https://junghyun100.github.io/힙-스택차이점/

 

'이것저것' 카테고리의 다른 글

curl  (0) 2022.01.18
Windows Powershell 항상 admin으로 실행하기  (0) 2022.01.02
전처리기  (0) 2021.12.31
apt vs apt-get 차이  (2) 2021.12.28
작업스케쥴러에 프로그램 등록후 자동시작이 안되는 문제  (0) 2021.12.17

Parent and Child Cursors

(상위 커서/하위 커서, 부모 커서/자식 커서)

 

"커서"는 사용자가 실행하는 SQL 문에 할당되는 라이브러리 캐시의 메모리 영역입니다. 이 메모리 영역은 SQL 텍스트, SQL 실행 계획, 통계 등과 같은 SQL 문에 대한 주요 정보를 저장합니다.

각 SQL 문에는 하나의 상위 커서와 하나 이상의 하위 커서가 있습니다. 상위 및 하위 커서가 무엇인지 설명합니다.
커서 = 메모리 영역을 염두에 두십시오.

 

왜 커서가 두 종류인가?

두 종류의 커서(부모 및 자식)가 있는 것은 Oracle 데이터베이스 설계에 따른 것입니다.

실행하는 각 SQL 문에 대해 Oracle 엔진은 상위 및 하위 커서라는 두 개의 커서를 생성합니다. 동일한 SQL 문에 대해 다른 바인드 값이나 두 개의 다른 스키마 또는 다른 리터럴 값 등이 있을 수 있는 것과 같은 다른 차이점이 있을 수 있기 때문에 두 개의 커서가 생성됩니다.

상위 커서는 SQL 문을 보유하고 하위 커서는 정보를 보유합니다. 차이점과 관련이 있습니다. 이것은 본질적으로 SQL 문이 하드 또는 소프트 구문 분석을 수행할 것인지 여부를 결정하는 요소로 자식 커서를 만듭니다.

PARENT CURSOR

  • 커서의 SQL 텍스트를 저장합니다. 두 문장이 동일한 경우 동일한 상위 커서를 공유합니다.
  • 모든 상위 커서는 그에 대해 생성된 하나 이상의 하위 커서와 함께 실행됩니다.
  • 상위 커서는 V$SQLAREA 뷰에 표시됩니다. v$sqlarea의 VERSION_COUNT 열은 이 부모 커서에 몇 개의 자식 커서가 있는지 알려줍니다.

 

CHILD CURSOR

  • 부모 커서는 적어도 하나의 자식 커서를 가진다.
  • 상위 커서는 SQL 텍스트를 저장하지만, 하위 커서는 환경 정보, 통계 정보, 바인드 변수 정보, 실행 세부 정보와 같은 SQL 문과 관련된 중요한 정보를 저장합니다.
  • SQL 텍스트가 자식 커서에 저장되지 않으므로 자식 커서는 메모리 공간을 덜 차지합니다.
  • 모든 자식 커서는 부모에 속해야 합니다.
  • 쿼리에 대해 자식 커서는 하드 파싱을 할지 또는 소프트 파싱을 할지를 결정합니다. SQL 쿼리가 동일해서 부모 커서는 동일하지만 자식  커서가 공유되지 않고 하드 파싱(재컴파일)을 하는 상황이 있을 수 있습니다.
  • 상위 커서는 V$SQL 보기에 표시됩니다.
  • V$SQL_SHARED_CURSOR는 옵티마이저가 커서를 비공유로 표시하기로 결정한 이유를 제공하므로 매우 유용한 보기입니다. 따라서 SQL 문이 동일하지만 하드파싱이 발생 때 이 뷰를 참조합니다.

 

V$SQL_SHARED_CURSOR

동일한 상위 커서에 대해 둘 이상의 하위 커서가 생성되되어었을 때 기존 하위 커서와 공유되지 않는 이유를 설명합니다. 뷰의 각 열은 커서를 공유할 수 없는 이유를 식별합니다. 공유되지 않은 사유의 열 값은 "Y" 로 표시됩니다.

특정 자식은 여러 가지 이유로 공유에 실패했을 수 있습니다. 이 이유는 다른 기존 자식 커서를 사용하려는 이유가 됩니다.

 

cursor_sharing 파라미터

CURSOR_SHARING은 어떤 SQL이 동일한 커서를 공유할 수 있는지를 결정합니다. 세 가지 값이 있습니다.

 

EXACT

동일한 텍스트가 있는 SQL 구문만 동일한 커서를 공유할 수 있습니다.

 

FORCE

일부 문자가 다를 경우 그 문자가 SQL문의 의미에 영향을 미치지 않는 한 강제로 커서를 공유합니다.

 

SIMILAR

문자가 SQL문의 의미나 실행계획 최적화에 영향을 미치지 않는 한 일부 문자가 달라도 동일한 커서로 공유하도록 합니다.
cursor_sharing의 default는 EXACT입니다.

 

예를 들어 아래 두 개의 서로 다른 SQL 문이 있습니다.

select * from EMP WHERE EMPNO=7369;
select * from EMP where EMPNO=7369;

둘 다 동일한 결과를 생성하지만 이들은 서로 다른 SQL입니다. "where"를 보면 첫 번째 문장에서는 대문자로, 두 번째 문장에서는 소문자로 작성되어 있는데 Oracle optimizer는 두 개를 상이한 SQL로 판단합니다.

 

V$SQLAREA 뷰를 조회합니다. 부모 커서에 대한 정보를 보여줍니다.

V$SQLAREA 에서는 SQL문에 대한 Parsing 정보 (sql text, sql_id, parse calls, executions 등)와 메모리 사용량, CPU 사용량, 수행시간, 모듈정보 등 외에도 많을 정보를 확인할 수 있습니다.

select *
from  v$sqlarea 
where upper(sql_text) like upper('%select * from EMP%7369%')
and   upper(sql_text) not like upper('%v$sqlarea%');

-- SQL 수행경과
SQL_TEXT	SQL_ID	SHARABLE_MEM	PERSISTENT_MEM	RUNTIME_MEM	SORTS	VERSION_COUNT	LOADED_VERSIONS	OPEN_VERSIONS	USERS_OPENING	FETCHES	EXECUTIONS	PX_SERVERS_EXECUTIONS	END_OF_FETCH_COUNT	USERS_EXECUTING	LOADS	FIRST_LOAD_TIME	INVALIDATIONS	PARSE_CALLS	DISK_READS	DIRECT_WRITES	DIRECT_READS	BUFFER_GETS	APPLICATION_WAIT_TIME	CONCURRENCY_WAIT_TIME	CLUSTER_WAIT_TIME	USER_IO_WAIT_TIME	PLSQL_EXEC_TIME	JAVA_EXEC_TIME	ROWS_PROCESSED	COMMAND_TYPE	OPTIMIZER_MODE	OPTIMIZER_COST	OPTIMIZER_ENV	OPTIMIZER_ENV_HASH_VALUE	PARSING_USER_ID	PARSING_SCHEMA_ID	PARSING_SCHEMA_NAME	KEPT_VERSIONS	ADDRESS	HASH_VALUE	OLD_HASH_VALUE	PLAN_HASH_VALUE	FULL_PLAN_HASH_VALUE	MODULE	MODULE_HASH	ACTION	ACTION_HASH	SERIALIZABLE_ABORTS	OUTLINE_CATEGORY	CPU_TIME	ELAPSED_TIME	OUTLINE_SID	LAST_ACTIVE_CHILD_ADDRESS	REMOTE	OBJECT_STATUS	LITERAL_HASH_VALUE	LAST_LOAD_TIME	IS_OBSOLETE	IS_BIND_SENSITIVE	IS_BIND_AWARE	CHILD_LATCH	SQL_PROFILE	SQL_PATCH	SQL_PLAN_BASELINE	PROGRAM_ID	PROGRAM_LINE#	EXACT_MATCHING_SIGNATURE	FORCE_MATCHING_SIGNATURE	LAST_ACTIVE_TIME	BIND_DATA	TYPECHECK_MEM	IO_CELL_OFFLOAD_ELIGIBLE_BYTES	IO_INTERCONNECT_BYTES	PHYSICAL_READ_REQUESTS	PHYSICAL_READ_BYTES	PHYSICAL_WRITE_REQUESTS	PHYSICAL_WRITE_BYTES	OPTIMIZED_PHY_READ_REQUESTS	LOCKED_TOTAL	PINNED_TOTAL	IO_CELL_UNCOMPRESSED_BYTES	IO_CELL_OFFLOAD_RETURNED_BYTES	CON_ID	IS_REOPTIMIZABLE	IS_RESOLVED_ADAPTIVE_PLAN
select * from EMP WHERE EMPNO=7369	2zp7d0gmunwfj	23539	8888	6752	0	1	1	0	0	1	1	0	1	0	1	2022-01-02/00:18:14	0	1	0	0	0	7	0	0	0	0	0	0	1	3	FIRST_ROWS	3	E289FB89D011220170020080AEF5C3E2CFFA332056414555519521105545551545545558591555449665851D5511058555555155515122555415A0EA0C5551454265415454449081566E001696C6A355451501025415404416FD157151551555550001550A16214545D1C31444A1011015595510250153355555155551E91F1411855B0501655D56456144551525245005F9A4160090505165551695504415957945000544000A5AD01122010502000002000000100000000100002000000210D007000000D00700002003000101000038F8000200000090010000E80300D0112201026564640202643202320000020003020A0A05050A1400020000F4010000640A0A0A0A64E803000064C010030200020000FFFF00000202324000400000E803000010270000002000000A1E03C80000080000010000102700000304810A80969800E80300004B64FFFF0000FFFF00000010000014A08601006440204E00000702C8C823F8FF0000FFFF00000204400A1450320A0A00001000	3703031112	95	95	SCOTT	0	000000008CA91018	3886707153	3891276153	3956160932	3634526668	TOAD 12.8.0.49	2077942562		0	0		835	1118		000000008CA908C0	N	VALID	0	2022-01-02 오전 12:18:14	N	N	N	0				0	0	3.83665796575199E18	1.6946033956547E19	2022-01-02 오전 12:18:14		0	0	0	0	0	0	0	0	1	2	0	0	0	N	
select * from EMP where EMPNO=7369	fah6htfu5fdjg	23539	8888	6752	0	1	1	1	0	1	1	0	1	0	1	2022-01-02/00:18:15	0	1	0	0	0	7	0	0	0	0	0	0	1	3	FIRST_ROWS	3	E289FB89D011220170020080AEF5C3E2CFFA332056414555519521105545551545545558591555449665851D5511058555555155515122555415A0EA0C5551454265415454449081566E001696C6A355451501025415404416FD157151551555550001550A16214545D1C31444A1011015595510250153355555155551E91F1411855B0501655D56456144551525245005F9A4160090505165551695504415957945000544000A5AD01122010502000002000000100000000100002000000210D007000000D00700002003000101000038F8000200000090010000E80300D0112201026564640202643202320000020003020A0A05050A1400020000F4010000640A0A0A0A64E803000064C010030200020000FFFF00000202324000400000E803000010270000002000000A1E03C80000080000010000102700000304810A80969800E80300004B64FFFF0000FFFF00000010000014A08601006440204E00000702C8C823F8FF0000FFFF00000204400A1450320A0A00001000	3703031112	95	95	SCOTT	0	000000008C9AD6E0	3025614383	4198020274	3956160932	3634526668	TOAD 12.8.0.49	2077942562		0	0		1245	1280		000000008A6EEB18	N	VALID	0	2022-01-02 오전 12:18:15	N	N	N	0				0	0	3.83665796575199E18	1.6946033956547E19	2022-01-02 오전 12:18:15		0	0	0	0	0	0	0	0	1	2	0	0	0	N

 

Oracle DB 에서 실행한 SQL 내역을 조회하고자 할때 v$sql 딕셔너리 뷰를 이용할 수 있습니다.

V$SQL을 조회해 봅니다. 아래와 같이 2개의 SQL 문이 조회됩니다.

v$sql 에는 실행한 SQL 문과 실행 시작시간, 종료시간, 경과시간 등의 다양한 정보가 들어있습니다.

select sql_text, sql_id, sharable_mem, loaded_versions, open_versions, executions, 
       loads, parse_calls, buffer_gets, rows_processed, elapsed_time,
       is_obsolete, is_bind_sensitive, is_bind_aware
from   v$sql 
where  upper(sql_text) like upper('%select * from EMP%7369%')
and    upper(sql_text) not like upper('%v$sql%');

-- SQL 수행결과
SQL_TEXT	SQL_ID	SHARABLE_MEM	LOADED_VERSIONS	OPEN_VERSIONS	EXECUTIONS	LOADS	PARSE_CALLS	BUFFER_GETS	ROWS_PROCESSED	ELAPSED_TIME	IS_OBSOLETE	IS_BIND_SENSITIVE	IS_BIND_AWARE
select * from EMP WHERE EMPNO=7369	2zp7d0gmunwfj	23539	1	0	1	1	1	7	1	1118	N	N	N
select * from EMP where EMPNO=7369	fah6htfu5fdjg	23539	1	1	1	1	1	7	1	1280	N	N	N

 

V$SQL_SHARED_CURSOR

로드된 각 자식 커서에 대한 정보를 보여줍니다.
커서를 공유할 수 없는 이유를 설명하는 컬럼을 가지고 있습니다.

select * from v$sql_shared_cursor
where  sql_id in('2zp7d0gmunwfj','fah6htfu5fdjg');

-- SQL 수행 결과
SQL_ID	ADDRESS	CHILD_ADDRESS	CHILD_NUMBER	UNBOUND_CURSOR	SQL_TYPE_MISMATCH	OPTIMIZER_MISMATCH	OUTLINE_MISMATCH	STATS_ROW_MISMATCH	LITERAL_MISMATCH	FORCE_HARD_PARSE	EXPLAIN_PLAN_CURSOR	BUFFERED_DML_MISMATCH	PDML_ENV_MISMATCH	INST_DRTLD_MISMATCH	SLAVE_QC_MISMATCH	TYPECHECK_MISMATCH	AUTH_CHECK_MISMATCH	BIND_MISMATCH	DESCRIBE_MISMATCH	LANGUAGE_MISMATCH	TRANSLATION_MISMATCH	BIND_EQUIV_FAILURE	INSUFF_PRIVS	INSUFF_PRIVS_REM	REMOTE_TRANS_MISMATCH	LOGMINER_SESSION_MISMATCH	INCOMP_LTRL_MISMATCH	OVERLAP_TIME_MISMATCH	EDITION_MISMATCH	MV_QUERY_GEN_MISMATCH	USER_BIND_PEEK_MISMATCH	TYPCHK_DEP_MISMATCH	NO_TRIGGER_MISMATCH	FLASHBACK_CURSOR	ANYDATA_TRANSFORMATION	PDDL_ENV_MISMATCH	TOP_LEVEL_RPI_CURSOR	DIFFERENT_LONG_LENGTH	LOGICAL_STANDBY_APPLY	DIFF_CALL_DURN	BIND_UACS_DIFF	PLSQL_CMP_SWITCHS_DIFF	CURSOR_PARTS_MISMATCH	STB_OBJECT_MISMATCH	CROSSEDITION_TRIGGER_MISMATCH	PQ_SLAVE_MISMATCH	TOP_LEVEL_DDL_MISMATCH	MULTI_PX_MISMATCH	BIND_PEEKED_PQ_MISMATCH	MV_REWRITE_MISMATCH	ROLL_INVALID_MISMATCH	OPTIMIZER_MODE_MISMATCH	PX_MISMATCH	MV_STALEOBJ_MISMATCH	FLASHBACK_TABLE_MISMATCH	LITREP_COMP_MISMATCH	PLSQL_DEBUG	LOAD_OPTIMIZER_STATS	ACL_MISMATCH	FLASHBACK_ARCHIVE_MISMATCH	LOCK_USER_SCHEMA_FAILED	REMOTE_MAPPING_MISMATCH	LOAD_RUNTIME_HEAP_FAILED	HASH_MATCH_FAILED	PURGED_CURSOR	BIND_LENGTH_UPGRADEABLE	USE_FEEDBACK_STATS	CON_ID
2zp7d0gmunwfj	000000008CA91018	000000008CA908C0	0	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	0
fah6htfu5fdjg	000000008C9AD6E0	000000008A6EEB18	0	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	N	0

 

V$SQLAREA 는 부모 커서에 대한 정보를 담고 있고, V$SQL 은 자식 커서에 대한 정보를 담고 있습니다.

V$SQL_SHARED_CURSOR 은 커서를 공유하지 못한 사유에 대한 정보를 담고 있습니다.



< 참조>

 

[ 설치환경 ]

- Microsoft Windows 10 Pro(10.0.19041 N/A 빌드 19041)

- WSL 2(Ubuntu 20.04.3 LTS)

- java :  11.0.3 ( 버전확인 : java -version)

- Kafka :  3.0.0 ( 버전확인 : kafka-topics.sh --version )

- Zookeeper : 

 

 

이 문서서는 로컬 머신에 Kafka 클러스터를 설치 및 실행하는 방법을 보여주고 Kafka 아키텍처에 대한 몇 가지 중요한 개념을 설명합니다.

Apache Kafka 는 분산 스트리밍 플랫폼입니다. 분산 메시지 브로커에서 데이터 스트림 처리를 위한 플랫폼으로 사용할 수 있습니다.

카프카 설치

먼저 Kafka 실행 파일을 실행하려면 Java가 설치되어 있어야 합니다. 본 문서에서는 openjdk-11 을 설치했습니다.

sudo apt-get install openjdk-11-jdk

 


공식 다운로드 페이지에서 Kafka의 바이너리를 다운로드하고 원하는 위치에 tar 파일의 압축을 풉니다.

tar -xvzf kafka_2.13-3.0.0.tgz

kafka_2.13-3.0.0이라는 폴더가 생성되며

kafka_2.13-3.0.0 하위 디렉토리에 bin, config, libs, license, site-docs 디렉토리가 생성됩니다.

 

시스템 아키텍처

클러스터를 실행하기 위해 아래와 같은 프로세스들을 시작해야 합니다.

  1. Zookeeper : 클러스터 노드 간의 상태를 유지하기 위해 Kafka에서 사용
  2. Kafka brokers : 데이터를 저장하고 내보내는 파이프라인의 "파이프"
  3. Producers : 클러스터에 데이터를 입력
  4. Consumers : 클러스터로부터 데이터를 조회

이 다이어그램의 각 블록은 네트워크의 다른 시스템에 구성할 수도 있습니다.

 

주키퍼 시작

Zookeeper 는 coordination을 위해 서버 상태를 저장하는 데, 키-값 형태로 저장하는 저장소입니다.

Kafka를 실행하려면 Zookeeper 서버가 필요하므로 먼저  Zookeeper 인스턴스를 시작해야 합니다.

kafka_2.13-3.0.0 안에 몇 가지 유용한 파일이 있습니다.

  • bin/zookeeper-server-start.sh : 서버 실행 쉘
  • config/zookeeper.properties : Zookeeper 서버가 실행할 구성환경 파일

 

아래 명령어로 주키퍼 서버를 실행합니다. kafka_2.13-3.0.0/ 디렉토리에서 실행해야 합니다.

bin/zookeeper-server-start.sh config/zookeeper.properties

 

주키퍼 서버를 백그라운드로 실행하려면 -daemon 옵션을 추가하여 기동합니다.

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

 

설치 중간에 Zookeeper 아스키아트 문자도 보입니다.

foreground로 실행되니 최종 "Creating new log file" 보이면 다음 단계를 진행합니다.

[2022-01-01 00:34:06,106] INFO   ______                  _                                           (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO  |___  /                 | |                                          (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,107] INFO     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __    (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,108] INFO   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |     (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,109] INFO                                               | |                      (org.apache.zookeeper.server.ZooKeeperServer)
[2022-01-01 00:34:06,110] INFO                                               |_|                      (org.apache.zoo
.....   .....
[2022-01-01 00:34:10,369] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
[2022-01-01 00:34:10,373] INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider)
[2022-01-01 01:08:12,522] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2022-01-01 01:08:14,923] WARN fsync-ing the write ahead log in SyncThread:0 took 2389ms which will adversely effect operation latency.File size is 67108880 bytes. See the ZooKeeper troubleshooting guide (org.apache.zookeeper.server.persistence.FileTxnLog)

config/zookeeper.properties 파일을 보면 clientPort 속성이 2181로 설정된 것을 볼 수 있습니다. 이 속성은 Zookeeper 서버가 현재 수신 대기 중인 포트입니다.

# File: kafka_2.13-3.0.0/config/zookeeper.properties

# the port at which the clients will connect
clientPort=2181

 

Kafka Broker 실행하기

Kafka 브로커는 클러스터의 핵심이며 데이터가 저장되고 배포되는 파이프라인 역할을 합니다.
Zookeeper를 시작한 방법과 유사하게,

브로커 서버를 시작하고(bin/kafka-server-start.sh) 구성(config/server.properties)하기 위한 두 개의 파일이 있습니다.
kafka의 분산환경 구성을 해야하기 때문에 다이어그램과 같이 3개의 브로커를 시작하겠습니다.
config/server.properties 파일을 열면 여러 구성 옵션이 표시됩니다(지금은 대부분 무시할 수 있음).
그러나 각 브로커 인스턴스에 대해 고유해야 하는 세 가지 속성이 있습니다.

File: kafka_2.13-3.0.0/config/server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# The address the socket server listens on. It will get the value returned from 
listeners=PLAINTEXT://:9092

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181

* 주키퍼 서버들 멀티로 구성한 경우에는 zookeeper.connect 에 나열해주면 됩니다.

예시) zookeeper.connect=localhost:2181, localhost:2182, localhost:2183

서버가 3개이므로 각 서버에 대해 각각의 구성 파일을 만들겠습니다.

config/server.properties 파일을 복사하고 각 서버 인스턴스에 대해 각각의 property 파일을 만듭니다.

cp config/server.properties config/server.1.properties
cp config/server.properties config/server.2.properties
cp config/server.properties config/server.3.properties

 

파일의 각 복사본에 대해 3가지 속성을 각각 고유하게 변경합니다.

server.1.properties

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1

server.2.properties

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs2

server.3.properties

broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/tmp/kafka-logs3

log 디렉토리를 생성합니다.

mkdir /tmp/kafka-logs1
mkdir /tmp/kafka-logs2
mkdir /tmp/kafka-logs3

이제 broker instances를 실행할 수 있습니다. 

서로 다른 터미널 세션에서 아래 명령을 각각 실행합니다. 본 문서의 모든 명령어는 kafka_2.13-3.0.0/ 디렉토리에서 수행해야 합니다. foreground로 실행되니 프로세스를 중지하면 안됩니다.

bin/kafka-server-start.sh config/server.1.properties
bin/kafka-server-start.sh config/server.2.properties
bin/kafka-server-start.sh config/server.3.properties
 

백그라운드로 실행하려면 -daemon 옵션을 추가합니다.

bin/kafka-server-start.sh -daemon config/server.1.properties
bin/kafka-server-start.sh -daemon config/server.2.properties
bin/kafka-server-start.sh -daemon config/server.3.properties

 

브로커가 성공적으로 시작되면 'started' 메시지가 표시되어야 합니다.

.....
[2022-01-01 01:08:20,944] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,944] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,945] INFO Kafka startTimeMs: 1640966900932 (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 01:08:20,948] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
.....

Topics 생성

클러스터에 데이터를 입력하기 전에 데이터가 속할 topic을 생성해야 합니다. 새로운 세션창에서 다음 명령을 실행합니다.

'my-kafka-topic' Topic 이 생성되었다는 메시지가 출력됩니다.

bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 3
Created topic my-kafka-topic

여기에서 몇 가지 옵션을 설명하겠습니다.

  • 파티션을 사용하면 데이터를 분할할 브로커 수를 결정할 수 있습니다. 일반적으로 브로커 수와 동일하게 설정합니다. 3개의 브로커를 설정했으므로 이 옵션을 3으로 설정합니다.
  • replication-factor는 원하는 데이터 복사본의 수를 나타냅니다(브로커 중 하나가 다운되는 경우에도 다른 브로커에 데이터가 남아 있음). 이 값을 3로 설정했으므로 데이터는 브로커에 복사본 두 개를 더 갖습니다. 

그림으로 표시하면 아래와 같습니다. 색깔이 있는 파티션은 leader이고 색이없는 파티션은 follower입니다.

bootstrap-server는 활성 Kafka 브로커 중 하나의 주소를 가리킵니다. 모든 브로커는 Zookeeper를 통해 서로에 대해 알고 있으므로 어느 브로커를 선택하든 상관 없습니다.

 

Kafka 버전 2.x.x 이하를 사용하는 경우 --bootstrap-server localhost:9093 대신
--zookeeper localhost:2181(Zookeeper 인스턴스의 주소)을 사용해야 합니다.

 

Topics 조회

사용 가능한 모든 주제(Topics)를 나열하려면 bin/kafka-topics.sh 을 실행하면 됩니다. 이 경우 이전 섹션에서 만든 하나의 주제를 출력합니다.

bin/kafka-topics.sh --list --bootstrap-server localhost:9093

my-kafka-topic

 

주제에 대한 자세한 내용을 보려면 동일한 명령과 함께 --describe 인수를 사용할 수 있습니다.

bin/kafka-topics.sh --describe --topic my-kafka-topic --bootstrap-server localhost:9093

 

주제의 파티션 및 복제본에 대한 세부 정보를 출력합니다. Partition, Leader/follower, Replicas, Isr(In Sync Replica) 정보를 보여줍니다. 여기서 ISR은 kafka 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 나타냅니다. 만일, 브로커 중 1대의 서버가 중지된 상태라면 Isr 은 2개 만 표시됩니다. 3번 브로커 서버가 중지되었다면 Leader는 1 또는 2가 되고 Isr 은 1,2 가 됩니다.

Topic: my-kafka-topic   TopicId: 2c7cvTC1QGKy2bu18revoA PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: my-kafka-topic   Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: my-kafka-topic   Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: my-kafka-topic   Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

 

Topics 삭제

이전버전 : 

이전 버전에서는 kafka server.properties파일에 설정을 하고 kafka broker를 재기동해야 topic을 삭제할 수 있습니다.

# config/server.properties파일
delete.topic.enable = true

server.properties에 delete.topic.enable=true 설정을 하고 kafka borker를 재기동 한 다음, kafka-topics.sh로 삭제를 해주면 됩니다. 

$  bin/kafka-topics.sh --delete --topic my-kafka-topic --bootstrap-server localhost:9093

최신버전 : (3.0 version)

KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다. 
브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)

elete.topic.enable 기본 설정은 true입니다.

$  bin/kafka-topics.sh --delete --topic my-kafka-topic --bootstrap-server localhost:9093

 

동적 파라미터 설정

 

topic config 확인 :

bin/kafka-configs.sh --bootstrap-server localhost:9093  --entity-type topics --entity-name movie --describe


broker config 확인 :

bin/kafka-configs.sh --bootstrap-server localhost:9093  --entity-type brokers --entity-name 1 --describe

 

동적 파라미터 변경 및 topic 삭제 :

  1. bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-default --alter --add-config delete.topic.enable=true
  2. bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-kafka-topic
  3. bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-default --alter --delete-config delete.topic.enable

==> 오류(apache kafka 에는 동적으로 변경된다고 설명되어 있는데 오류 발생

1.

Caused by: org.apache.kafka.common.errors.InvalidRequestException: Invalid config value for resource ConfigResource(type=BROKER, name=''): Cannot update these configs dynamically: Set(delete.topic.enable)

2.

Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)

3.

Invalid config(s): delete.topic.enable

 

아직 동적 config 설정이 정상작동 하지 않는 것 같다. 패치버전이나 더 상위 버전이 나오는 다시 확인해 본다.

 

Zookeeper 쉘을 이용한 방법

* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.

zookeeper-shell.sh 를 이용하여서 znode를 삭제할 수 있습니다.

rmr명령은 더이상 사용되진 않는다. delete나 deleteall로 삭제합니다. delete 를 사용할 경우 message가 남아있는 경우 삭제가 되지 않습니다. 이럴 경우, deleteall로 삭제할 수 있습니다.

$ bin/zookeeper-shell.sh localhost:2181
ls /brokers/topics 
## 실행결과
[__consumer_offsets, movie, music, my-kafka-topic]

rmr /brokers/topics/my-kafka-topic
## 실행결과. rmr은 사용할 수 없다.
Command not found: Command not found rmr 

## The zkCli provides the rmr (deprecated) or deleteall command for this purpose


delete /brokers/topics/my-kafka-topic
## 실행결과
Node not empty: /brokers/topics/my-kafka-topic

deleteall /brokers/topics/my-kafka-topic

ls /brokers/topics 
[__consumer_offsets, movie, music]

quit

## 삭제된 topic을 다시 생성해본다.

$ bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2
## 오류가 발생한다. topic을 kafka-topics.sh 로 조회하면 목록이 보인다.
## zookeeper와 kafka broker 간이 연동이 잘 안되는 것 같다.
Error while executing topic command : Topic 'my-kafka-topic' already exists

* 주키퍼로 삭제할지 말고 kafka-topics.sh 로 삭제하는 것을 권장합니다. 주키퍼는 znode 관리용입니다.

 

Producers: Topic에 메시지 보내기

"생산자(Producer)"는 데이터를 Kafka 클러스터에 넣는 프로세스입니다.

bin 디렉토리의 명령어는 콘솔에 텍스트를 입력할 때마다 클러스터에 데이터를 입력하는 콘솔 생성자(Producer)를 제공합니다.

콘솔 생산자(Producer)를 시작하려면 다음 명령을 실행합니다.

bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-kafka-topic
  • Broker-list는 생산자가 방금 프로비저닝한 브로커의 주소를 가리킵니다.
  • topic은 데이터가 입력하려는 주제(topic)를 지정합니다.

이제 데이터를 입력하고 Enter 키를 치면, Kafka 클러스터에 텍스트를 입력할 수 있는 명령 프롬프트가 표시됩니다.

  • 컨슈머가 데이터를 가져가도 Topic 데이터는 삭제되지 않습니다.
>first insert
>두번째 입력

프로듀서가 데이터를 보낼 때 '파티션키'를 지정해서 보낼 수 있습니다.

파티션키를 지정하지 않으면, 라운드로빈 방식으로 파티션에 저장하고,

파티션키를 지정하면, 파티셔너가 파티션키의 HASH 값을 구해서 특정 파티션에 할당합니다.

 

카프카에서 kafka-console-producer.sh로 Consumer에게 메세지를 보낼 때 기본적으로 key값이 null로 설정되어 있습니다. 이럴 때 설정에서 parse.key 및 key.separator 속성을 설정하면 key와 value가 포함된 메시지를 보낼 수 있습니다.

$ /bin/kafka-console-producer.sh \
  --broker-list localhost:9093 \
  --topic my-kafka-topic \
  --property "parse.key=true" \
  --property "key.separator=:"
  • parse.key : key와 value 파싱 활성화 여부 
  • key.separator : key와 value를 구분해주는 구분자
  • print.key : 화면에 key 를 보여줄 지 여부를 지정
  • print.value : 화면에 value 를 보여줄 지 여부를 지정

파일로 메시지 보내기

kafka-console-producer --broker-list localhost:9092 --topic test_topic < file.log

 

Consumers

Kafka consumers 를 사용하여 클러스터에서 데이터를 읽을 수 있습니다. 이 경우 이전 섹션에서 생성한 데이터를 읽습니다.
consumer를  시작하려면 다음 명령을 실행합니다. 아래와 같이 위에서 pruducer가 입력한 데이터가 출력됩니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning

first insert
두번째 입력
  • bootstrap-server는 클러스터의 브로커 중 하나일 수 있습니다.
  • Topic은 생산자가 클러스터에 데이터를 입력한 Topic(주제)과 동일해야 합니다.
  • from-beginning은 클러스터에 현재 가지고 있는 모든 메시지를 원한다고 클러스터에 알립니다.
  • 컨슈머 그룹이 다른 새로운 컨슈머가 auto.offset.reset=earliest 설정으로 데이터를 0번부터 가져갈 수 있습니다. 설정하지 않으면 새롭게 토픽에 생성된 메세지만 읽어옵니다.

위의 명령을 실행하면 콘솔에 로그온한 생산자가 입력한 모든 메시지가 즉시 표시됩니다.
소비자가 실행되는 동안 생산자가 메시지를 입력하면 실시간으로 콘솔에 출력되는 것을 볼 수 있습니다.

 

이런 식으로 Kafka는 지속적인 메시지 대기열처럼 작동하여 소비자가 아직 읽지 않은 메시지를 저장하고 소비자가 실행되는 동안 새 메시지가 오는 대로 전달합니다.

 

카프카에서는 consumer 그룹이라는 개념이 있는데 --consumer-property group.id=group-01 형식으로 consumer 그룹을 지정할 수 있습니다. 카프카 브로커는 컨슈머 그룹 중 하나의 컨슈머에게만 이벤트를 전달합니다. 동일한 이벤트 처리를 하는 컨슈머를 clustering 한 경우에 컨슈머 그룹으로 지정하면 클러스터링된 컨슈머 중 하나의 서버가 데이터를 수신합니다.

 

key와 value를 콘솔창에 표시하기 위해서는 --property print.key=true --property key.separator=: 를 설정합니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning \
   --property print.key=true --property key.separator=:

"key":"second key value"
"secone message":null
key3:third

 

Consumer Group

기존의 Message Queue 솔루션에서는 컨슈머가 메시지를 가져가면, 해당 메세지를 큐에서 삭제된다. 즉, 하나의 큐에 대하여 여러 컨슈머가 붙어서 같은 메세지를 컨슈밍할 수 없다. 하지만 Kafka는, 컨슈머가 메세지를 가져가도 큐에서 즉시 삭제되지 않으며, 하나의 토픽에 여러 컨슈머 그룹이 붙어 메세지를 가져갈 수 있다.

또한 각 consumer group마다 해당 topic의 partition에 대한 별도의 offset을 관리하고, group에 컨슈머가 추가/제거 될 때마다 rebalancing을 하여 group 내의 consumer에 partition을 할당하게 된다. 이는 컨슈머의 확장/축소를 용이하게 하고, 하나의 MQ(Message Queue)를 컨슈머 별로 다른 용도로 사용할 수 있는 확장성을 제공한다.

 

아래 명령어로 컨슈머 그룹을 지정합니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --group [그룹 이름]
  • 그룹에 속하는 컨슈머가 여러개이면 로드밸런싱을 통해 자동으로 메세지를 분배

2. 그룹 정보 확인

kafka-console-groups.sh --bootstrap-server localhost:9092 --describe --group [그룹 이름]
  • 위에 대한 명령어로 아래의 정보들을 얻을 수 있다.
    -
    • CURRENT-OFFSET : Consumer Group이 Kafka에서 읽은 offset
    • LOG-END-OFFSET : 해당 topic, partition의 마지막 offset
    • LAG : LOG-END-OFFSET 과 CURRENT-OFFSET의 차이
  • LAG의 경우 topic의 partition단위로 읽어야 할 남은 데이터 수를 의미한다

 

Offsets

오프셋 리셋

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [그룹 이름] --reset-offsets --to-earlist --execute --topic [토픽 이름]
  • 오프셋을 리셋하여 데이터를 다시 읽어드릴 때 사용
  • reset-offsets의 옵션으로 --to-earlist , --to-datetime, --from-file, --to-current, --shift-by 등이 있다
    --shift-by -2 를 하면 오프셋이 2칸 전으로 이동한다
  • execute의 옵션으로 --all-topics 와 --topic [그룹 이름] 이 있다

 

복제 테스트 : 브로커가 Offline 되면 어떤 일이?

이제 시스템에 Kafka 클러스터가 설정되었으므로 데이터 복제를 테스트해 보겠습니다.

실행한 3개의 브로커 중 첫 번째 브로커를 Control + C 키로 종료합니다. 

# 첫번째 브로커 중지

[2022-01-01 02:09:13,095] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-01-01 02:09:13,097] INFO Broker and topic stats closed (kafka.server.BrokerTopicStats)
[2022-01-01 02:09:13,098] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2022-01-01 02:09:13,099] INFO [KafkaServer id=1] shut down completed (kafka.server.KafkaServer)


실행한 3개의 브로커 중 하나를 종료해도 클러스터가 클러스터가 제대로 실행되는지 아래 명령어를 실행합니다.
다른 터미널 창에서 다른 소비자를 시작해 보겠습니다.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic my-kafka-topic --from-beginning --group group2

real-time data
두번째 입력
awesome!!!
first insert

 

여기에 추가한 것은 이 소비자를 다른 소비자와 구별하는 그룹 옵션입니다. 명령을 실행하면 처음부터 입력된 모든 메시지가 표시됩니다. 동일한 소비자인 경우에는 이미 데이터를 읽었기 때문에 데이터가 보지이 않습니다.


중개인 중 하나가 중지되었지만 데이터는 손실되지 않았습니다. 이는 이전에 설정한 복제 계수(replication-factor)를 2로 설정하여 데이터 사본이 여러 브로커에 존재하도록 했기 때문입니다.

 

? 의문점 : 위에서 보는 것처럼 입력한 순서대로 출력되지 않았습니다. 한글때문인지 아니면 원래 순서가 없는 것인지 정확한 이유는 파악하지 못했습니다. 추후 확인이 필요한 사안입니다.

 

 

다른 브로커를 중단하면 어떻게 될까요? 

본 문서에서는 두번 째 브로커도 중지하고 테스트 해보겠습니다. 

이 경우에는 데이터 조회가 되지 않네요? 브로커를 3개로 구성한 경우 2개의 브로커가 장애가 발생한 경우 정상 작동하지 않습니다. 즉, 브로커의 과반수 이상에 장애가 발생하면 정상 작동하지 않습니다.

Kafka 생산자 및 소비자 구현

클러스터를 실행하고 나면 애플리케이션 코드에서 생산자와 소비자를 구현할 수 있습니다.
Golang 또는 Node.js에서 Kafka 생산자 및 소비자를 구현하는 방법은 링크된 문서를 참고하시면 됩니다.

 

<참고사이트>

https://www.sohamkamani.com/install-and-run-kafka-locally/

 

 

 

<참고사이

 

[ 참고 자료 ]

https://medium.com/@kiranps11/kafka-and-zookeeper-multinode-cluster-setup-3511aef4a505

https://www.instaclustr.com/blog/the-power-of-kafka-partitions-how-to-get-the-most-out-of-your-kafka-cluster/

 

#if, #endif는 전처리기 과정을 거치면 코드는 다음과 같이 바뀝니다.

 

gcc 컴파일 옵션

 

 

make 와 makefile의 관계 :

https://bowbowbow.tistory.com/12

 

 

 

 

1.3 Quick Start

STEP 1: GET KAFKA

최신 Kafka 릴리스를 다운로드하고 압축을 풉니다.

$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0

STEP 2: START THE KAFKA ENVIRONMENT

로컬 환경에 Java 8 이상이 설치되어 있어야 하고, zookeeper 가 설치 및 실행되고 있어야 합니다.
zookeeper가 설치되어 있을 경우, zookeeper 를 시작합니다.

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

다른 터미널 세션을 열고 kafka 서버를 실행합니다.

$ bin/kafka-server-start.sh config/server.properties

모든 서비스가 성공적으로 시작되면 기본 Kafka 환경이 실행되고 사용할 준비가 된 것입니다.

STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS

Kafka는 여러 시스템에서 이벤트(문서에서 레코드 또는 메시지라고도 함)를 읽고, 쓰고, 저장하고, 처리할 수 있는 분산 이벤트 스트리밍 플랫폼입니다.
이벤트의 예로는 결제 거래, 휴대폰의 지리적 위치 업데이트, 배송 주문, IoT 장치 또는 의료 장비의 센서 측정 등이 있습니다. 이러한 이벤트는 주제별로 구성 및 저장됩니다. 매우 단순화된 주제는 파일 시스템의 폴더와 유사하고 이벤트는 해당 폴더의 파일로 생각하면 이해가 쉽습니다.
이벤트를 작성하기 전에 topic을 작성해야 합니다. 다른 터미널 세션을 열고 다음을 실행하여 topic을 생성합니다.

$ bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092

Kafka의 모든 명령줄 도구에는 추가 옵션이 있습니다. 인수 없이 kafka-topics.sh 명령을 실행하여 사용 정보를 표시합니다. 예를 들어 topic의 파티션 수와 같은 세부 정보를 표시합니다.

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: 1ejyYwvJQF65aAk_5N-lgg PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: quickstart-events        Partition: 0    Leader: 3       Replicas: 3     Isr: 3

STEP 4: WRITE SOME EVENTS INTO THE TOPIC

Kafka 클라이언트는 이벤트 쓰기(또는 읽기)를 위해 네트워크를 통해 Kafka 브로커와 통신합니다. 일단 수신되면 브로커는 필요한 기간 동안(또는영원히) 내구성 및 내결함성 방식으로 이벤트를 저장합니다.
콘솔 생산자 클라이언트를 실행하여 주제에 이벤트를 작성합니다. 입력하는 각 행은 topic에 이벤트로 작성됩니다.

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

Ctrl-C를 사용하여 생산자 클라이언트를 중지할 수 있습니다.

STEP 5: READ THE EVENTS

다른 터미널 세션을 열고 콘솔 소비자 클라이언트를 실행하여 방금 생성한 이벤트를 읽습니다. 최초 수행 시 "internal topic __consumer_offsets" 이 자동 생성됩니다.

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl-C를 사용하여 언제든지 소비자 클라이언트를 중지할 수 있습니다.
이벤트는 Kafka에 영구적으로 저장되기 때문에 원하는 만큼 많은 소비자가 이벤트를 읽을 수 있습니다. 또 다른 터미널 세션을 열고 이전 명령을 다시 실행하여 이를 쉽게 확인할 수 있습니다.

 

Topics 삭제

이전버전 : 

이전 버전에서는 kafka server.properties파일에 설정을 하고 kafka broker를 재기동해야 topic을 삭제할 수 있습니다. 단, 이벤트를 읽고 있는 consumber는 에러(Error while fetching metadata with correlation)가 발생합니다.  

# config/server.properties파일
delete.topic.enable = true

server.properties에 delete.topic.enable=true 설정을 하고 kafka borker를 재기동 한 다음, kafka-topics.sh로 삭제를 해주면 됩니다. 

$  bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

최신버전 : (3.0 version)

KIP-226(Kafka Improvement Proposal)은 브로커 구성의 동적 업데이트에 대한 지원을 추가했습니다. 
브로커를 다시 시작하지 않고 클러스터에서 주제를 동적으로 삭제할 수 있도록 허용합니다.(동적 허용을 하지 않으려면 server.properties 구성에서 "delete.topic.enable" 구성을 "false"로 설정하면 됩니다.)

elete.topic.enable 기본 설정은 true입니다.

$  bin/kafka-topics.sh --delete --topic quickstart-events --bootstrap-server localhost:9092

 

동적 파라미터 설정

STEP 6: IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT

관계형 데이터베이스 또는 기존 메시징 시스템과 같은 기존 시스템과 이미 이러한 시스템을 사용하는 많은 애플리케이션에 많은 데이터가 있을 수 있습니다. Kafka Connect를 사용하면 외부 시스템에서 Kafka로 또는 그 반대로 데이터를 지속적으로 수집할 수 있습니다. 이 프로세스를 더욱 쉽게 만들기 위해 수백 개의 커넥터를 사용할 수 있습니다.
Kafka에서 데이터를 지속적으로 가져오거나 내보내는 방법에 대해 자세히 알아보려면 Kafka Connect section 섹션을 살펴보세요.

STEP 7: PROCESS YOUR EVENTS WITH KAFKA STREAMS

데이터가 Kafka에 이벤트로 저장되면 Java/Scala용 Kafka Streams 클라이언트 라이브러리를 사용하여 데이터를 처리할 수 있습니다. 이를 통해 입력 및/또는 출력 데이터가 Kafka 주제에 저장되는 실시간 애플리케이션 및 마이크로서비스를 구현할 수 있습니다. Kafka Streams는 클라이언트 측에서 표준 Java 및 Scala 애플리케이션을 작성하고 배포하는 단순성과 Kafka의 서버 측 클러스터 기술의 이점을 결합하여 이러한 애플리케이션을 확장성, 탄력성, 내결함성 및 분산성을 높입니다. 라이브러리는 정확히 한 번 처리, 상태 저장 작업 및 집계, 윈도우, 조인, 이벤트 시간 기반 처리 등을 지원합니다.

Kafka Streams WordCount 알고리즘을 구현하는 방법은 다음과 같습니다.

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

상세 내용은 Kafka Streams demoapp development tutorial 을 참고합니다.

STEP 8: TERMINATE THE KAFKA ENVIRONMENT

  1. Ctrl-C를 사용하여 생산자 및 소비자 클라이언트 중지
  2. Ctrl-C를 사용하여 Kafka 브로커 중지.
  3. Ctrl-C를 사용하여 Kafka 브로커 중지

생성한 이벤트를 포함하여 로컬 Kafka 환경의 데이터도 삭제하려면 다음 명령을 실행합니다.

$ rm -rf /tmp/kafka-logs /tmp/zookeeper

 

APACHE KAFKA

Apache Kafka란 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위한

오픈 소스 분산 이벤트 스트리밍 플랫폼입니다.

CORE CAPABILITIES

  • HIGH THROUGHPUT
  • SCALABLE
  • PERMANENT STORAGE
  • HIGH AVAILABILITY

ECOSYSTEM

  • BUILT-IN STREAM PROCESSING
  • CONNECT TO ALMOST ANYTHING
  • CLIENT LIBRARIES
  • LARGE ECOSYSTEM OPEN SOURCE TOOLS

TRUST & EASE OF USE

  • MISSION CRITICAL
  • TRUSTED BY THOUSANDS OF ORGS
  • VAST USER COMMUNITY
  • RICH ONLINE RESOURCES

WHY KAFKA?

이벤트 스트리밍이란?

이벤트 스트리밍은 인체의 디지털 중추 신경계라고 할 수 있으며  '상시 가동' 세상을 위한 기반 기술입니다.
기술적으로 말하면 이벤트 스트리밍은 데이터베이스, 센서, 모바일 장치, 클라우드 서비스 및 소프트웨어 애플리케이션과 같은 이벤트 소스에서 이벤트 스트림의 형태로 실시간으로 데이터를 캡처하는 방식입니다.

 

나중에 검색할 수 있도록 이러한 이벤트 스트림을 영구적으로 저장합니다. 소급적으로나 실시간으로 이벤트 스트림을 조작, 처리 및 반응합니다. 필요에 따라 이벤트 스트림을 다른 목적지 기술로 라우팅하는 단계를 포함합니다. 이벤트 스트리밍은 올바른 정보가 적시에 적절한 위치에 있도록 데이터의 지속적인 흐름과 해석을 보장합니다.

 

이벤트 스트리밍은 무엇에 사용할 수 있습니까?

이벤트 스트리밍은 수많은 산업 및 조직의 다양한 사용 사례에 적용됩니다.

  • 증권 거래소, 은행 및 보험과 같은 실시간으로 지불 및 금융 거래를 처리합니다.
  • 물류 및 자동차 산업과 같이 자동차, 트럭, 차량 및 선적을 실시간으로 추적하고 모니터링합니다.
  • 공장 및 풍력 발전 단지와 같은 IoT 장치 또는 기타 장비의 센서 데이터를 지속적으로 캡처하고 분석합니다.
  • 소매, 호텔 및 여행 산업, 모바일 애플리케이션과 같은 고객 상호 작용 및 주문을 수집하고 즉시 대응합니다.
  • 병원에서 치료 중인 환자를 모니터링하고 상태 변화를 예측하여 응급 상황에서 시기 적절한 치료를 보장합니다.
  • 회사의 여러 부서에서 생성된 데이터를 연결, 저장 및 사용 가능하게 만드는 것.
  • 데이터 플랫폼, 이벤트 중심 아키텍처 및 마이크로서비스의 기반 역할을 합니다.

 

Apache Kafka 는 이벤트 스트리밍 플랫폼

Kafka는 다음 세 가지 주요 기능의 단일 솔루션으로 엔드투엔드 이벤트 스트리밍 사용 사례를 구현할 수 있습니다.

  1. 다른 시스템에서 데이터를 import/export를 포함하여 이벤트 스트림을 게시(쓰기) 및 구독(읽기)
  2. 이벤트 스트림을 원하는 만큼 내구성 있고 안정적으로 저장
  3. 이벤트 스트림을 발생 시 또는 소급하여 처리

이 모든 기능은 분산, 확장성, 탄력성, 내결함성이 있으며 안전한 방식으로 제공됩니다.

Kafka는 베어메탈 하드웨어, 가상 머신, 컨테이너, 온프레미스 및 클라우드에 배포할 수 있습니다.

Kafka 환경을 자가 관리하거나 다양한 공급업체에서 제공하는 완전 관리형 서비스를 사용할 수 있습니다.

 

Kafka는 어떻게 작동하나?

Kafka는 고성능 TCP 네트워크 프로토콜을 통해 통신하는 서버와 클라이언트로 구성된 분산 시스템입니다. 

베어메탈 하드웨어, 가상 머신, 온프레미스 및 클라우드 환경의 컨테이너에 배포할 수 있습니다.

서버:

Kafka는 여러 데이터 센터 또는 클라우드 region들에 있는 다수의 서버 클러스터로 실행됩니다.

이러한 서버 중 일부는 브로커라고 하는 스토리지 계층을 구성합니다.

다른 서버는 Kafka Connect를 실행하여  Kafka를 관계형 데이터베이스 및 기타 Kafka 클러스터와 같은 기존 시스템과 통합하기 위해, 데이터를 이벤트 스트림으로 지속적으로 가져오고 내보냅니다.

미션 크리티컬한 사용 사례를 구현할 수 있도록 Kafka 클러스터는 확장성이 뛰어나고 내결함성이 있습니다. 서버 중 하나에 장애가 발생하면 다른 서버가 작업을 인계받아 데이터 손실 없이 지속적인 운영을 보장합니다.

 

클라이언트: 

네트워크 문제나 기계 장애가 발생한 경우에도 이벤트 스트림을 병렬로 대규모로 내결함성 방식으로 읽고, 쓰고, 처리하는 분산 애플리케이션 및 마이크로서비스를 만들 수 있습니다.

클라이언트는 고수준 Kafka Streams 라이브러리를 포함하여 Java 및 Scala에서 사용할 수 있으며 Go, Python, C/C++ 및 기타 여러 프로그래밍 언어 및 REST API 에서 사용할 수 있습니다.

 

Kafka 주요 개념과 용어

이벤트는 세상 또는 비즈니스에서 "무언가 발생"했다는 사실을 기록합니다.

문서에서는 기록 또는 메시지라고도 합니다. Kafka에서 데이터를 읽거나 쓸 때 이벤트 형식으로 이 작업을 수행합니다.

개념적으로 이벤트에는 키, 값, 타임스탬프 및 선택적 메타데이터 헤더가 있습니다.

 

다음은 이벤트의 예입니다.

  • Event key: "Alice"
  • Event value: "Made a payment of $200 to Bob"
  • Event timestamp: "Jun. 25, 2020 at 2:06 p.m."

생산자(Producers)는

Kafka에 이벤트를 게시(쓰기)하는 클라이언트 응용 프로그램이고

소비자(consumers)는

이러한 이벤트를 구독(읽기 및 처리)하는 응용 프로그램입니다.

Kafka에서 생산자와 소비자는 완전히 분리되고 서로 알 수 없으며, 이는 Kafka가 높은 확장성을 달성하기 위한 핵심 설계 요소입니다. 예를 들어 생산자는 소비자를 기다릴 필요가 없습니다. Kafka는 이벤트를 정확히 한 번 처리하는 기능과 같은 다양한 보장성을 제공합니다.

 

이벤트는 주제(topics)로 구성되고 영구적으로 저장됩니다. 간단하게 주제는 파일 시스템의 폴더와 유사하고 이벤트는 해당 폴더의 파일입니다. 예로써 주제 이름은 "지불"일 수 있습니다.

Kafka의 토픽은 항상 다중 생성자 및 다중 구독자입니다. 주제에는 이벤트를 작성하는 0개, 1개 또는 많은 생성자와 이러한 이벤트를 구독하는 0개, 1개 또는 많은 소비자가 있을 수 있습니다. 주제의 이벤트는 필요한 만큼 자주 읽을 수 있습니다. 기존 메시징 시스템과 달리 이벤트는 소비 후 삭제되지 않습니다. 대신 주제별 구성 설정을 통해 Kafka가 이벤트를 유지해야 하는 기간을 정의할 수 있고, 이보다 오래된 이벤트가 삭제됩니다.

Kafka의 성능은 데이터 크기와 관련하여 사실상 일정하므로 데이터를 장기간 저장하는 것이 좋습니다.

 

주제는 분할(partitioned)되어 있으며, 이는 다른 Kafka 브로커에 있는 여러 "버킷(buckets)"에 분산되어 있음을 의미합니다.

데이터의 분산 배치는 클라이언트 애플리케이션이 동시에 많은 브로커에서 데이터를 읽고 쓸 수 있도록 하기 때문에 확장성에 매우 중요합니다. 새 이벤트가 주제에 게시되면 실제로 해당 주제의 파티션 중 하나에 추가됩니다.

동일한 이벤트 키(예: 고객 또는 차량 ID)가 있는 이벤트는 동일한 파티션에 기록되고 Kafka는 지정된 파티션의 모든 소비자가 항상 기록된 것과 정확히 동일한 순서로 해당 파티션의 이벤트를 읽도록 보장합니다.

Figure: 

이 예제 주제에는 4개의 파티션 P1~P4가 있습니다. 두 개의 서로 다른 생산자가 네트워크를 통해 서로 독립적으로 새 이벤트를 주제에 게시하고 있습니다. 동일한 키를 사용하는 이벤트(그림에서 색상으로 표시)는 동일한 파티션에 기록됩니다. 두 생산자 모두 동일한 파티션에 쓸 수도 있습니다.

데이터 내결함성 및 고가용성을 만들기 위해 모든 주제를 여러 지리적 지역이나 데이터 센터에 걸쳐 복제할 수 있습니다. 일이 잘못될 경우에 대비하여 데이터 복사본이 있는 여러 브로커가 있도록 합니다. 일반적인 프로덕션 설정은 복제 3팩터입니다. 즉, 항상 3개의 데이터 복사본이 있습니다. 이 복제는 주제 파티션 수준에서 수행됩니다.

 

We build for multiple versions of Scala

: 여러 버전의 Scala로 빌드되었다. 즉 Kafka는 Scala로 만들었다.

 

What is Scala?

스칼라는 함수형 객체지향 프로그래밍 언어 입니다. 스칼라는 자바의 복잡한 단점을 해결하기 위해 만들어 졌습니다. 스칼라는 자바 바이트 코드를 사용하기 때문에 JVM위에서 실행 시킬 수 있습니다. 또한 자바의 클래스들을 바로 사용할 수도 있고, 자바에서도 스칼라 코드들을 호출할 수 있습니다. 

 

 

 

VSCode(Visual Studio Code)로 작성한 프로그램에 argument를 전달하려면 

  • terminal에서 수행 시 argument를 전달하거나
  • VSCode에서 수행 시 launch.json 파일에 정의하여 전달하면 된다.

본 문서에서는 argument를 launch.json 파일에 설정하는 방법을 설명한다.

launch.json(디버깅 환경설정 파일) 설정

 launch.json(디버깅 환경설정 파일) 파일이 없다면 VSCode의 좌측 메뉴에서 [Run and Debug] 을 클릭하고

아래와 같이 나타난 화면에서 "create a launch.json file"을 클릭합니다.

"Select a debug configuration" 에서 Python File 을 선택합니다.

아래와 같이 launch.json 파일이 생성되는데 

"args": ["60"], 을 각 개인의 argument에 맞게 추가해 줍니다. 본문에서는 ["60"] 이렇게 설정했는데 "60" 은 실행되는 python 파일에 전달하는 첫번째 argument 값입니다.

argument 가 여러 개일 경우 ["arg1", "arg2", "arg3"] 식으로 나열하면 됩니다. 단, 숫자는 안되고 문자형식만 됩니다. 즉, argument가 60인 경우, "60" 으로 입력해야 합니다. 

## launch.json

{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "args": ["60"],
            "console": "integratedTerminal"
        }
    ]
}

이제 VSCode에서 작성한 argument가 필요한 python 파일을 실행해 보면, 위에서 설정한 argument 값을 받아서 처리하는 것을 확인할 수 있습니다.

 

 
아래는 launch.json 파일의 속성들에 대한 설명입니다.

launch.json

다음 속성은 실행구성에 필수 요소입니다.

  • type : 실행구성에 사용할 디버거 유형(node, php, python..등)
  • request : 실행구성의 요청유형 현재는 'launch', 'attach' 입니다.
  • name : 디버그 드롭다운에 나타나는 디버그 런치 이름

다음은 실행구성에서 사용할수 있는 선택적 속성입니다.

  • preLaunchTask : 디버그 세션이 시작되기 전에 작업을 시작하려면 이 속성을 tasks.json 에 지정된 이름으로 설정해야합니다.
  • internalConsoleOptions : 디버깅 세션동안 다버그 콘솔 패널의 가시성을 제어합니다.
  • debugServer : 디버그를 시작하는 대신 지정 포트에 연결합니다.

많은 디버거가 다음 속성 중 일부를 지원합니다.

  • program : 디버거를 시작할 때 실행할 실행 파일 또는 파일
  • args: 디버깅 할 프로그램에 전달된 인수
  • env: 환경 변수
  • cwd: 의존성 및 기타 파일을 찾기위한 현재 작업 디렉토리
  • port: 실행중인 프로세스에 연결할 때의 포트
  • stopOnEntry: 프로그램이 시작되면 즉시 중단
  • console: 어떤 종류를 콘솔을 사용할지 지정. 예를 들어 internalConsole, integratedTerminal, externalTerminal.

vscode는 문자열 내부의 변수 대체를 지원 launch.json하며 다음과 같이 미리 정의 된 변수가 있습니다.

  • ${workspaceFolder} : vscode에서 연 폴더의 경로
  • ${workspaceFolderBasename} : vscode에서 슬래시없이 열리는 폴더 이름 (/)
  • ${file} : 현재 열려있는 파일
  • ${relativeFile} : workspaceFolder 에서 현재 열린 파일
  • ${fileBasename} : 현재 열려있는 파일의 기본 이름
  • ${fileBasenameNoExtension} : 파일 확장명이 없는 현재 열린 파일의 기본 이름
  • ${fileDirname} : 현재 열려있는 파일의 디렉토리 이름
  • ${fileExtname} : 현재 열려있는 파일의 확장자
  • ${cwd} : 시작시 태스크 러너의 현재 작업 디렉토리
  • ${lineNumber} : 활성 파일에서 현재 선택된 행 번호

DB Browser for SQLite

SQLite DB를 관리할 수 있는 GUI 환경의 툴이다.

 

1. SQLite 브라우저 프로그램 다운로드

https://sqlitebrowser.org 에 접속하여 운영체제에 맞는 DB 브라우저를 다운로드하여 설치한다. 

 

2. DB Browser 실행

설치한 프로그램을 실행하거나 압축해제한 DB Browser for SQLite 실행파일을 수행하면 아래와 같은 화면이 나타난다.

우측 하단에 UTF-8 문자셋이 보인다. 기본적으로 문자를 UTF-8로 처리한다.

DB Browser

3. DB를 생성거나 기존 DB 열기

DB 생성  :

[새 데이터베이스(N)] 메뉴를 클릭하고 저장하고자 하는 폴더 위치와 파일명을 입력한다.

신규 DB 생성화면

그 다음에 테이블명을 입력하고, 필드명을 추가하여 신규 DB와 테이블을 생성한다.

table 생성 화면

[데이터 보기] 탭을 클릭한 후, 데이터를 입력해 본다.

 

기존 DB 열기

기존에 생성된 데이터베이스를 선택하여 DB를 오픈한다.

 

4. Table과 Field 생성

[편집] - [테이블생성] 메뉴를 클릭한 후 '테이블 정의 변경' 화면이 나오면 테이블명에 emp 를 입력하고, id(integer, PK, AI), name(text, NN), birth(text, NN), pic(blob) 필드를 추가하고 id는 primary key와 auto increment(AI)로 지정한다.

data 타입은 integer, text, blob, real, numeric이 지원된다.

Not null, Primary key. Auto Increment, Unique 제약 조건 및 default, check, collcation, FK 등도 지원된다.

5. 새 레코드 추가/삭제

[데이터 보기] 탭을 클릭한 후 새 레코드를 입력히거나 기준 레코드를 삭제할 수 있다.Oracle DB와는 다르게 Not Null 컬럼으로 지정한 컬럼에 아무 데이터도 입력하지 않았는데, 오류가 발생하지 않고 데이터가 저장되었다. 오라클을 제외한  일반 DB(mysql, mariaDB, MS Sql, informix 등)에서 처리하는 empty string이나 zero-byte string으로 처리한다. 

6. SQL 문은 일반 DBMS 사용법과 같다.

단, 데이터가 아무것도 없는 경우에는 empty string 으로 처리하고, 명시적으로 null을 입력할 경우에는 null로 처리한다.

 

 

*** 참고 ***

원격 DB 접속은 원천적으로 안된다. 서버리스 DB이므로 접속을 할 수 있는 데몬이나 리스너가 없다.

 

그래도 원격 접속을 할려면 아래 참조.

- NFS나 삼바 스토리지 share?

- 라이브러리?

https://forums.slimdevices.com/showthread.php?65898-Remote-access-of-SQLite-database

 

Maybe one of these alternatives would work ?
http://www.sqlite.org/cvstrac/wiki?p=SqliteNetwork

The easiest is probably to share the directory where the squeezecenter.db file is stored. I'm not sure if you'll get into trouble if you modify the squeezecenter.db while SqueezeCenter is connected to it, to be safe it's probably a good idea to shutdown SqueezeCenter before you do any modifications.

If you just want to be able to run a SQL statement from a remote machine, you can always try toe Database Query plugin. It's only available as a beta version for SQLite yet, see this thread for more information:
http://forums.slimdevices.com/showthread.php?t=65439

'SQLite' 카테고리의 다른 글

SQLite 설치 및 사용법  (0) 2021.12.30
SQLite 란  (0) 2021.12.28

+ Recent posts