최근들어 streaming 관련 데이터 파이프라인 기술에 흥미가 생길 시점에 마침, 회사에서 Confluent Cloud를 활용해보는 신규 프로젝트를 해볼 수 있는 기회를 얻게되었습니다.
이번 포스트는 CDC 등의 기초 이론만 알고 있던 제가 약 2개월 가량의 프로젝트를 어떤 이유에서, 어떤 결과를 통해서 진행했는지에 대한, 경험 위주의 포스트로 정리 됩니다.
[Change Data Capture]
간단하게 RDBMS에서 특정 시점을 기준으로 변경된 데이터를 캡처해 처리하기 위한 방법입니다.
특히 대량의 정형 데이터를 정기적으로 추출하고 이동해야 하는 DL(DataLake) DW(DataWarehouse)
기반의 파이프라인에서는 ETL/ELT보다는 CDC를 이용하면
많은 양의 데이터를 이동하는 시간을 크게 줄여, 보다 효율적인 파이프라인을 구성할 수 있다고 합니다.
실제로 현재 많이 사용되고 있는 RDB들은 SQL Server, Oracle
같이, RDB 자체에 내장된 CDC 기능들이 존재하는데, 대부분이 설명하는 로그 기반의 CDC 기능입니다.
저는 저 로그들을 사용해서 DB 내부에서 어떠한 작업을 하는게 DB 자체에 부하를 주지 않을까? 라고 생각을 했었습니다만…
로그 기반의 CDC는 Replication된 Transaction System의 Log Table을 수집하기 때문에, DB에 추가적인 부하를 주지 않는다고 합니다.
때문에 DB의 변경 사항을 기록하는 트랜잭션 로그
를 사용하는 것이라고 합니다.
결론적으론 CDC도 Redo나 아카이브로그를 읽어 System Table에 한번 더 저장하는 거니까, 용량적으로나 부하가 더 가는게 아닌가?? 라는 생각이 있었지만
데이터 동기화가 필요한 시스템 별로 사용용도가 상이하기에, Replication과 비교해봐야겠다. -> 이건 뒤에서 이야기 해보겠습니다.
위에서 말한 내용을 그림과 같이 설명하면, 테이블에 특정 시점 기준으로, 아래와 같이 Transaction이 발생하면, System에 영향이나 부하를 주지 않고 로그 파일에 기록됩니다.
그리고 Log Miner나 다른 CDC Tool (이번에는 Confluent)를 통해서 아래 해당 Transaction 변경사항을 읽어, 확인하는 것이 가능합니다.
저는 Azure Cloud 환경에서 Confleunt Cloud를 사용한 Real-Time Pipeline을 다음과 같이 구축했습니다.
여기서 SQL Server은 이미 고객의 On-Prem 환경에 구축되어 있는 Server로 해당 MSSQL과 Azure Cloud 간의 네트워크 연동을 위해 Azure의 VPN G/W를 사용했습니다.
보통 Target RDB를 지정해서, 직접적으로 저장하는 형태의 CDC 구성이 많았는데, 이번 경우에는 증분 데이터를 Parquet File 형태로 저장하는 DataLake 방식으로 구성했습니다.
실제 진행한 프로젝트에서는 정형/비정형 데이터들을 유동적으로 수집하고, 주기적으로 테이블의 스키마가 가변적으로 바뀔 수 있는 상황이라는 특성에 걸맞게, DataLake로 데이터를 적재한다면, Batch/Streaming 구성에서 발생하는 데이터를 하나의 스토리지에서 관리가 가능해, 유동적으로 스키마나 데이터를 처리할 수 있는 이유로 선택했습니다. (확장성)
실질적으로 증분 데이터를 수집하고, 저장하는데 핵심적인 역할을 하는 Kafka Source/Sinkg Connector의 경우, Kubernetes 환경에서 Debezium을 배포해서 사용하는 방법도 있었지만, 결국 Kubernetes 환경을 유지/관리하기 위해선 추가적으로 Helm을 사용한다던가, 특정 어플리케이션을 Build 해야하는 과정이 파이프라인 안에 추가되는 것이기 때문에 파이프라인의 기술적 고도화/모던화만 고려해선 안되고, 앞으로 해당 아키텍처를 기반으로 실제 서비스를 운영할 고객의 러닝커브와 운영 효율적인 부분의 강점을 생각해, 동일한 Debezium Release를 UI로 쉽게 배포할 수 있는 Confluent Cloud를 선택했습니다. (아래 그림은 실제 프 로젝트에서 CDC와 관련된 부분만 발최한 아키텍처 입니다.)
위의 아키텍처에서의 CDC 파이프라인의 흐름을 요약하면, Confluent를 사용해 초기적재 이후 실시간 변경분까지 수집하고, ADLS에 File형태로 적재되도록 설계했습니다.
- 초기데이터도 Confluent를 사용해서 수집 (Snapshot mode) - 테이블 별 초기적재가 완료되면, 새로 들어오는 Change Event가 바로 반영될 수 있도록 설정 - 실제 수집된 데이터는 평면화 및 분류가 된 이후에 ADLS에 Parquet 파일 형태로 적재
설계 및 아키텍처를 기반으로 설명하면 아래와 같은 단계의 흐름으로 Change Event가 반영되는 파이프라인 입니다.
우선적으로는 설명하자면, Source Connector의 경우에는 실제 Debezium에서도 지원하고 있는 SQL Server Source Connector를 사용했는데요
첨언을 하자면 기본적으로 Debezium Source Connector가 정상적으로 동작하기 위해서는, 필수적인 요소들이 몆가지 존재합니다.
- DataBase Level의 CDC Option 활성화
- Table Level의 CDC Option 활성화 (실제 MSSQL의 경우 테이블 별로 system table이 추가로 생겨 해당 테이블에 데이터를 저장합니다.)
- 각 테이블 별 PK Key의 여부
- Debezium에서 사용할 계정에 권한이 부여되어야 합니다.
우선 데이터를 수집하는 과정을 진행하기 전, 위에 명시되어 있는 + 네트워크 작업들을 선제적으로 진행한 뒤, Source Connector는 아래와 같이 구성하였습니다.
기본적으로 Confluent Cloud에서도 Debezium과 동일한 형태의 JSON 파일형식으로 Metadata를 만들어서 적용시키는 형태의 Connector가 사용됩니다.
실제로 프로젝트를 진행함에 있어서, 중요하다고 생각하는 설정과 관련해서는 정리하겠지만, 나머지 설정의 경우 Confluent Connector 문서를 참고해주세요.
{ "name": "sqlserver_cdc_connector_001", "config": { "connector.class": "SqlServerCdcSource", "tasks.max": "1", "database.port": "1433", "database.server.name": "**", [암호화] "database.hostname": "**", [암호화] "database.user": "**", [암호화] "database.password": "**", [암호화] "database.dbname": "**", [암호화] "auto.create.topics.enable": "true", "cleanup.policy":"delete", "decimal.handling.mode": "double", "database.history.skip.unparseable.ddl": "true", "snapshot.mode": "initial", "snapshot.isolation.mode": "read_uncommitted", "signal.data.collection": "nasa1515.dbo.nasa1515_db", [예시] "after.state.only" : "false", "time.precision.mode": "connect", "table.include.list": "nasa1515.dbo.nasa1515_db", [예시] "transforms":"unwrap_after, masknickname", "transforms.unwrap_after.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap_after.add.fields": "db,schema,table,op,change_lsn,commit_lsn,source.ts_ms", "transforms.unwrap_after.delete.handling.mode": "rewrite", "predicates": "if_topics", "predicates.if_topics.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.if_topics.pattern": "nasa1515.dbo.nasa1515_db", [예시] "transforms.masknickname.predicate": "if_topics", "transforms.masknickname.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.masknickname.fields": "user_id,user_name,user_phone", "transforms.masknickname.replacement": "****" "output.data.format": "AVRO", "output.key.format": "AVRO", } }
이번 프로젝트에서 제일 당황스러웠던 것은, 고객의 Source DB에는 Repliation이나 Backup이 없어, 운용되고 있는 Source에서 데이터를 수집해야하는 것이었습니다.
이에 DB에서 발생하는 트랜잭션을 최대한 줄이기위해 "snapshot.isolation.mode"
를 read_uncommitted으로 설정해 초기 데이터부터 적재하며 수집했습니다.
사실 DB 부하의 최소화를 위해서는, SQL Server DB의 Full-backup File로 초기데이터를 적재한 뒤에 증분 데이터만 Confluent로 수집을 하는게 맞다고 생각합니다.
다만, 프로젝트의 제한된 일정에는100TB
이상의 Full-backup File 데이터를 이동/백업/복원하기에는 시간이 부족해 어쩔수 없이 내린 결정이라 아쉽습니다.
수집대상의 특정 테이블의 컬럼 데이터들의 경우 보안에 민감한 고객들의 주요 개인정보가 담겨져 있는 데이터가 노출이 되어 있었습니다 (이름, 휴대폰번호, 이메일.. 등등)
해당 데이터들을 마스킹하고, 뒷단의 전처리 작업의 효율 증가를 위해 (토픽에 쌓인 메세지의 크기를 줄여 성능 최적화) ("after.state.only"
)로 “Before/After/Source” 형태의 Payload로 수집되는 데이터들을 After State
의 데이터만 수집하도록 1차적으로 데이터를 평면화하였습니다. (아래 그림은 After.state.only 설정전의 Structure 설명입니다.)
1차적으로 분류되어진 데이터를, 실제로 필요한 컬럼들만 추출하여 분류하는 ExtractNewRecordState
를 사용하여 추가적으로 Topic에 쌓여질 데이터를 분류하였습니다.
"transforms":"unwrap_after, masknickname", "transforms.unwrap_after.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap_after.add.fields": "db,schema,table,op,change_lsn,commit_lsn,source.ts_ms", "transforms.unwrap_after.delete.handling.mode": "rewrite",
add.fields 에서 제일 중요한 것은 op 컬럼으로, 실제로Update
, Insert
등 해당 이벤트 메세지가 실제로 어떤 트랜잭션인지 구분할 수 있습니다.
Delete는 delete.handling.mode
설정으로 “__deleted” Key가 추가되어 value 값으로 구분되어 처리됩니다. (해당 SMT로 아래와 같은 형태의 데이터가 Meta에 추가됩니다)
{ ... "__db": "nasadb", "__schema": "dbo", "__table": "nasa1515_test_table", "__op": "u", "__change_lsn": "00:00000970:0003", "__commit_lsn": "00:00000970:0004", "__source_ts_ms": 12222227667, "__deleted": "false" ... }
SMT를 사용한 데이터 마스킹의 경우, ExtractNewRecordState
과정으로 평면화된 데이터 중, 특정 Topic (특정 테이블)의 특정 Columns이 마스킹의 대상임으로
해당 조건을 정의한 뒤에, 조건에 맞는 필드들만 마스킹이되어야 하는 로직으로 구성되어야 하기에 TopicNameMatches
와 MaskField$Value
Type의 SMT를 사용했습니다.
"predicates": "if_topics", "predicates.if_topics.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.if_topics.pattern": "nasa1515.dbo.nasa1515_db", [예시] "transforms.masknickname.predicate": "if_topics", "transforms.masknickname.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.masknickname.fields": "user_id,user_name,user_phone", "transforms.masknickname.replacement": "****"
TopicNameMatches
의 경우 Source Connector에서 수집되는 테이블 별 데이터에서, 실제로 Transforms이 발생해야 할 테이블을 구분 IF 절의 기능으로 사용했습니다.MaskField$Value
는 "transforms.masknickname.predicate": "if_topics"
의 pattern에 맞는, Topic일 경우에만 실행되며, 미리 정의되어 있는 fields의 값을 치환하는데 사용했습니다. MaskField$Value의 경우 문자열을 단순히 Replacement 해주는 용도로만 사용할 수 있습니다, 실제로 패턴과 조건이 있는 마스킹이 필요하면 K-sql을 사용해야합니다.
기본적으로 Kafka Connect의 구성요소에는 Connector와 Broker 사이에서 데이터를 Serialization/Deserialization
하는 Converter가 존재합니다.
요약하자면, Connector와 Broker 사이의 주고받는 데이터의 형식을 정의하는 역할을 한다고 이해하시면 되겠습니다.
일반적으론 Json/Avro Converter가 주로 사용되는데요.
두 Data Type의 가장 큰 차이는, Json의 경우 메세지에 스키마를 포함하고 있다
는 부분입니다, 때문에 스키마 변경에 유연한 파이프라인이 목적이었긴 하지만, 실제 RDB의 특성상 운용 중인 Table들의 스키마 변경은 비번히 발생하지 않고, 트랜잭션은 매초 발생할텐데, JSON을 사용한다면 매번 트랜잭션마다 스키마를 포함한 데이터가 전달되기 때문에 RDB Source를 수집하는 CDC 파이프라인에는 비효율적이라고 판단했고,
반면에 Avro의 경우 Schema Registry
라는 별도의 스키마 저장 공간에 스키마를 저장하기에 실제로 주고받는 데이터의 양이 적고, Confluent에서 지원되는 Schema Registry의 경우 Schema의 Version이나 관리 측면의 장점도 존재하여 Avro Converter를 쓰기로 결정했습니다.
이제 Source Connector는 Kafka brocker(Confluent)에 연결된 각 테이블의 이름 별로 Topic을 생성한 뒤, 변환한 이벤트 메세지들을 Partition안에 데이터를 적재합니다.
이제 Topic에 저장되어 있는 이벤트 메세지들을, Sink Connector를 사용해 Parquet 파일형태로 변환하여, ADLS(Storage)에 적재합니다.
Kakfa Topic 안에 쌓여있는 메세지를, kafka Bootstrap을 사용해서 바로 끌고올 수 있는 방법이 있지만, 뒷단의 전처리 과정에서, 여러가지 Mart 구조를 갖게될 수 있으므로
보다 유연하게 (여러 Source의 데이터를 하나의 스토리지의 파일로 관리할 수 있도록)하기 위해, 파일로 저장하는 방식의 과정을 진행하게 됩니다.
{ "name": "adls_gen2_sink_connector_001", "config": { "connector.class": "AzureDataLakeGen2Sink", "tasks.max":"2", "time.interval": "HOURLY", "kafka.endpoint": "**:9092", [암호화] "kafka.service.account.id": "**", [암호화] "kafka.region": "koreacentral", "azure.datalake.gen2.access.key": "****************", [암호화] "azure.datalake.gen2.account.name": "***", [암호화] "azure.datalake.gen2.client.key": "", [암호화] "kafka.auth.mode": "SERVICE_ACCOUNT" "topics": "nasa.dbo.nasa_student", [예시] "input.data.format": "AVRO", "output.data.format": "PARQUET", "topics.dir": "topics", "Path format": "year=YYYY/month=MM/day=dd/hour'=HH" "schema.context.name": "default" "flush.size": "10000", "rotate.schedule.interval.ms" : "60000", "rotate.interval.ms": "${time.interval}", "max.poll.records": "500", "max.poll.interval.ms": "300000" } }
Sink Connector에서는 topics
설정에 있는 Topic을 대상으로, "input.data.format"
, "output.data.format"
두개의 설정으로, 실제 Topic에 쌓여있는AVRO 데이터를
Parquet으로 변환한 뒤,
Path format`으로 실제 파일이 저장되는 디렉토리의 Partition을 지정해 저장하는 동작이 실행됩니다.
이외의 다른 Config 설정들은, 실제 Kakfa와, ADLS에 연결하기 위한 접속정보나, 암호 정보들이 대부분 입니다.
SinkConnector는 기존의 Kafka Consumer와 동일한 Polling Method를 사용합니다.
저장되어 있는 Topic의 Partition 데이터를 poll()
요청 후, 해당 Method의 Request
값에 대해 SinkConnector에 내장되어 있는 후처리 동작을 하는 방식입니다.
Max Poll Records
설정으로 Batch로 가져올 Message의 양을 조절하고 Max Poll Interval
로 Message Batch의 Time interval을 조정해서 해당 조건에 맞는 Record를 가져옵니다. Uploder Obj
(해당 Connector의 경우 ADLS obj)로 보내고, 데이터를 조건에 맞는 Partitioner로 분할합니다. Flush File
설정의 Flush Size
로 저장될 Record의 양을 조절하고 offset.flush.interval.ms
로 File Upload Interval을 조정하여, Parquet 파일 형태로 저장한 뒤 스토리지에 업로드 하게 됩니다. (Confluent의 Standard Cluster의 경우 설정 적용에 제한적이었음.)
따라서 rotate_interval.ms
, rotate.schedule.interval.ms
두개의 설정을 이용해, 앞선 조건이 만족하지 않더라도 강제적으로 Upload Obj의 Session을 종료하고 File을 Upload 하도록 만들었습니다. 따라서 무조건 설정된 Rotate 주기마다 파일이 생성되게 되고, Time Based Partitioner
로 분할 저장되게 됩니다. (파일 갯수가 많아 짐에 따라 후에 Spark의 Read I/O에 성능에 영향이 있을 수 있으니 전략을 잘 수립해야함)
해당 전략을 사용한다면, 강제로 Session을 닫는 동작이 일어나기에 excactly-once
을 만족할 수 없습니다.프로젝트의 주요 목표는 LakeStoage로 적재된 CDC/Batch에 대한 동시 처리 파이프라인을 구축하는 것에 있어서 DataBricks를 사용해서 진행했습니다.
DataBricks에서 Spark Streaming의 Microbatch를 사용해 증분 데이터를 반영하는 로직을 구현했고, 프로젝트에서 작업한 적재 대상 데이터는 다음과 같은 특징이 있었습니다.
1. 3TB 2. 14억건 3. Daily 증분 량 : 10GB 4. 테이블은 1000개 이상
총 초기적재될 용량이 많은 것에 이어서, 일 별 증분량, 거기다가 한번에 증분 및 실시간 처리해야 할 테이블이 1000개 이상이었습니다.
때문에 파이프라인의 관리가 조금 더 유연하게 설계되어야하고, Streaming Source Table의 증분 반영 로직은 병렬처리
가 필수적이었습니다.
처음에는 asyncio나 feature를 사용해서 비동기 방식으로 처리하려고 했으나, 단 한번 Streaming Session만 실행시켜주면 되는 상황으로 판단해, 병렬처리를 하는 방식으로 우회했습니다.
전체적인 코드는 공개할 수 없지만,
각 테이블마다 상이한 PK 조건을 저장한 Table을 토대로 적합한 SparkSeesion을 만들어 할당해주는 Class를 만들어서 해결했습니다.
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.window import Window from delta.tables import * from datetime import datetime # pk 조건 만드는 함수 (upsertToDelta에서 조건문 만드는데 사용) class cls_upsert: def __init__(self, tableName, tablePks, targetSchema, checkpoint_location, source_data_location, schema): self.tableName = tableName # read tableName self.tablePKs = tablePks # read tablePK self.targetSchema = targetSchema # merge 대상 table schema name self.checkpoint_location = checkpoint_location # checkpoint 경로 self.source_data_location = source_data_location # self.schema = schema # source self.tbl_target = DeltaTable.forName(spark, f"{self.targetSchema}.streaming_{self.tableName}") # PK 메소드 def create_condition(pk_array:str) -> dict[str]: search=0 return_string = '' if len(pk_array) == 1: return_string = return_string + 'target.'+pk_array[0]+' = using.'+pk_array[0] else: for i in pk_array: return_string = return_string + 'target.' + i + ' = using.' + i if len(pk_array)-1 == search: break; search = search + 1 return_string = return_string + ' and ' return return_string # microBatch merge processing 함수 정의 () def upsertToDelta(self, microBatchDF, batchId) -> str: window_spec = Window.partitionBy(*self.tablePKs).orderBy(desc("__source_ts_ms")) max_ts_ms = row_number().over(window_spec) using_df = microBatchDF.filter( col("__op") != 'r' ) \ .withColumn("rank", max_ts_ms) \ .filter(col("rank") == 1) \ .drop('__db','__schema','__table','__change_lsn' ,'__commit_lsn','__source_ts_ms' , '__deleted', 'year', 'month', 'day', 'hour', "rank") # pk 조건문 만들기 condition = create_condition(self.tablePKs) self.tbl_target.alias("target") \ .merge(using_df.alias("using"), condition) \ .whenMatchedDelete( condition="using.__op = 'd'", ) \ .whenMatchedUpdateAll( condition="using.__op = 'u'", ) \ .whenNotMatchedInsertAll( condition="using.__op = 'c' or using.__op = 'u'" ) \ .execute() # Streming object를 생성하는 메소드 def make_streamingObject(self): return spark.readStream.format("cloudFiles") \ .option("cloudFiles.format", "parquet") \ .schema(self.schema) \ .load(self.source_data_location) \ .writeStream \ .format("delta") \ .trigger(processingTime="10 minute") \ .foreachBatch(lambda microBatchDF, batchId: self.upsertToDelta(microBatchDF, batchId)) \ .outputMode("update") \ .option("checkpointLocation", self.checkpoint_location)
도대체 왜??? 갑자기…?? 이슈사항을 재현해서 확인해봤습니다.
Spark Streaming의 foreachMicroBatch()중의 특정 CheckPoint 사용하게 되면, 지정한 Data Path를 지속적으로 확인해서 파일리스트를 가져오게되고, 해당 파일을 증분 파일과 필터링할 비교 리스트를 갱신하는데 사용합니다. 즉, processingTime/Trigger 조회 주기를 설정하지 않으면 기본 값인 500ms마다 hierarchical 구조의 디렉토리의 All-Depth를 조회하고 업데이트하기 때문에 발생했습니다…
최대한 많은 양을 담으려고 노력했지만, 대외비인 내용들이 많이 포함되어 있어서, Batch 로직이던, 전처리 로직이던 담을 수 없는 부분들이 많았습니다.
최종적으로 작업 기간은 전체 데이터의 약 15TB의 이상의 초기 데이터를 적재하는데 15일 이상이 소요되었는데 (Confluent의 Standard Cluster의 제한 사항 때문에, 중간 중간에 문제가 많아서, 결국 용량이 높았던 Table은 따로 Backup File을 Restore 하는 방식을 사용하긴 했습니다..), 한정된 전체 작업 시간 중에 초기 적재 기간이 길었다 보니 중간 전처리 과정이나 설계 과정에서 예상치 못한 문제가 발생하지는 않을지, 혹은 서비스에 영향을 미치지 않을지에 대한 우려가 있었지만 다행히 큰 이슈 없이 무사히 잘 마무리되었습니다. 그리고 프로젝트 종료 후 현재까지 정상적으로 잘 운영되고 있습니다. 결과적으로 아래와 같은 개선? 사항을 얻을 수 있었습니다.
(사실 뒷단 Mart 작업이나, PowerBI 작업이 뚜렷한 인사이트인데…대외비..)
추후에 CDC나 Confleunt, Streaming 관련해서 운영적이나, 설계 측 관점에 대한 업데이트가 있다면 새로운 포스트를 통해서 찾아뵙도록 하겠습니다. 감사합니다.