HomeAbout
[STREMING PIPELINE] 실시간 스트리밍 데이터 파이프라인 (Real-Time) 구축: Confluent와 CDC를 활용한 실시간 데이터 처리 완벽 가이드
Data Engineering
[STREMING PIPELINE] 실시간 스트리밍 데이터 파이프라인 (Real-Time) 구축: Confluent와 CDC를 활용한 실시간 데이터 처리 완벽 가이드
NASA1515
NASA1515
February 10, 2024
7 min

시작하며

최근들어 streaming 관련 데이터 파이프라인 기술에 흥미가 생길 시점에 마침, 회사에서 Confluent Cloud를 활용해보는 신규 프로젝트를 해볼 수 있는 기회를 얻게되었습니다.
이번 포스트는 CDC 등의 기초 이론만 알고 있던 제가 약 2개월 가량의 프로젝트를 어떤 이유에서, 어떤 결과를 통해서 진행했는지에 대한, 경험 위주의 포스트로 정리 됩니다.


😡 그래서 CDC가 뭐야?

[Change Data Capture] 간단하게 RDBMS에서 특정 시점을 기준으로 변경된 데이터를 캡처해 처리하기 위한 방법입니다.
특히 대량의 정형 데이터를 정기적으로 추출하고 이동해야 하는 DL(DataLake) DW(DataWarehouse) 기반의 파이프라인에서는 ETL/ELT보다는 CDC를 이용하면
많은 양의 데이터를 이동하는 시간을 크게 줄여, 보다 효율적인 파이프라인을 구성할 수 있다고 합니다.

🙁 그래서 CDC는 어떻게 동작하는데?

실제로 현재 많이 사용되고 있는 RDB들은 SQL Server, Oracle 같이, RDB 자체에 내장된 CDC 기능들이 존재하는데, 대부분이 설명하는 로그 기반의 CDC 기능입니다.
저는 저 로그들을 사용해서 DB 내부에서 어떠한 작업을 하는게 DB 자체에 부하를 주지 않을까? 라고 생각을 했었습니다만…
로그 기반의 CDC는 Replication된 Transaction System의 Log Table을 수집하기 때문에, DB에 추가적인 부하를 주지 않는다고 합니다.
때문에 DB의 변경 사항을 기록하는 트랜잭션 로그를 사용하는 것이라고 합니다.

결론적으론 CDC도 Redo나 아카이브로그를 읽어 System Table에 한번 더 저장하는 거니까, 용량적으로나 부하가 더 가는게 아닌가?? 라는 생각이 있었지만
데이터 동기화가 필요한 시스템 별로 사용용도가 상이하기에, Replication과 비교해봐야겠다. -> 이건 뒤에서 이야기 해보겠습니다.

Transaction Log CDC

위에서 말한 내용을 그림과 같이 설명하면, 테이블에 특정 시점 기준으로, 아래와 같이 Transaction이 발생하면, System에 영향이나 부하를 주지 않고 로그 파일에 기록됩니다.
그리고 Log Miner나 다른 CDC Tool (이번에는 Confluent)를 통해서 아래 해당 Transaction 변경사항을 읽어, 확인하는 것이 가능합니다.

image

CDC Project Architecture

저는 Azure Cloud 환경에서 Confleunt Cloud를 사용한 Real-Time Pipeline을 다음과 같이 구축했습니다.
여기서 SQL Server은 이미 고객의 On-Prem 환경에 구축되어 있는 Server로 해당 MSSQL과 Azure Cloud 간의 네트워크 연동을 위해 Azure의 VPN G/W를 사용했습니다.
보통 Target RDB를 지정해서, 직접적으로 저장하는 형태의 CDC 구성이 많았는데, 이번 경우에는 증분 데이터를 Parquet File 형태로 저장하는 DataLake 방식으로 구성했습니다. 실제 진행한 프로젝트에서는 정형/비정형 데이터들을 유동적으로 수집하고, 주기적으로 테이블의 스키마가 가변적으로 바뀔 수 있는 상황이라는 특성에 걸맞게, DataLake로 데이터를 적재한다면, Batch/Streaming 구성에서 발생하는 데이터를 하나의 스토리지에서 관리가 가능해, 유동적으로 스키마나 데이터를 처리할 수 있는 이유로 선택했습니다. (확장성)

실질적으로 증분 데이터를 수집하고, 저장하는데 핵심적인 역할을 하는 Kafka Source/Sinkg Connector의 경우, Kubernetes 환경에서 Debezium을 배포해서 사용하는 방법도 있었지만, 결국 Kubernetes 환경을 유지/관리하기 위해선 추가적으로 Helm을 사용한다던가, 특정 어플리케이션을 Build 해야하는 과정이 파이프라인 안에 추가되는 것이기 때문에 파이프라인의 기술적 고도화/모던화만 고려해선 안되고, 앞으로 해당 아키텍처를 기반으로 실제 서비스를 운영할 고객의 러닝커브와 운영 효율적인 부분의 강점을 생각해, 동일한 Debezium Release를 UI로 쉽게 배포할 수 있는 Confluent Cloud를 선택했습니다. (아래 그림은 실제 프로젝트에서 CDC와 관련된 부분만 발최한 아키텍처 입니다.)

스크린샷 2024-02-13 오후 4 26 41

CDC 파이프라인 설계

위의 아키텍처에서의 CDC 파이프라인의 흐름을 요약하면, Confluent를 사용해 초기적재 이후 실시간 변경분까지 수집하고, ADLS에 File형태로 적재되도록 설계했습니다.

- 초기데이터도 Confluent를 사용해서 수집 (Snapshot mode)
- 테이블 별 초기적재가 완료되면, 새로 들어오는 Change Event가 바로 반영될 수 있도록 설정
- 실제 수집된 데이터는 평면화 및 분류가 된 이후에 ADLS에 Parquet 파일 형태로 적재

설계 및 아키텍처를 기반으로 설명하면 아래와 같은 단계의 흐름으로 Change Event가 반영되는 파이프라인 입니다.

  • ① On-prem <-> Azure의 암호화 네트워크를 위해서 VPN G/W를 통해서 통신되는 구조.
  • ② Source Connector에서 조회한 결과의 데이터가 VPN G/W를 지나, Azure의 Proxy Server의 Nginx에서 Confleunt로 호스팅되어 수집되는 구조.
  • ③ Confleunt Broker에 쌓인 Message는 Sink Connector로 ADLS에 실시간으로 적재되는 구조.
  • ④ ADLS에 적재되어 있는 데이터를, DataBricks 환경에서 Spark Streaming을 사용해, 실시간으로 Delta Table 화 및 전처리, 시각화 모델링되는 구조
  • ⑤ DataBricks에서 생성된 Mart Table을 Source로 PowerBI에서 시각화 대시보드 구현.

우선적으로는 설명하자면, Source Connector의 경우에는 실제 Debezium에서도 지원하고 있는 SQL Server Source Connector를 사용했는데요
첨언을 하자면 기본적으로 Debezium Source Connector가 정상적으로 동작하기 위해서는, 필수적인 요소들이 몆가지 존재합니다.

  1. DataBase Level의 CDC Option 활성화
  2. Table Level의 CDC Option 활성화 (실제 MSSQL의 경우 테이블 별로 system table이 추가로 생겨 해당 테이블에 데이터를 저장합니다.)
  3. 각 테이블 별 PK Key의 여부
  4. Debezium에서 사용할 계정에 권한이 부여되어야 합니다.

우선 데이터를 수집하는 과정을 진행하기 전, 위에 명시되어 있는 + 네트워크 작업들을 선제적으로 진행한 뒤, Source Connector는 아래와 같이 구성하였습니다.
기본적으로 Confluent Cloud에서도 Debezium과 동일한 형태의 JSON 파일형식으로 Metadata를 만들어서 적용시키는 형태의 Connector가 사용됩니다.
실제로 프로젝트를 진행함에 있어서, 중요하다고 생각하는 설정과 관련해서는 정리하겠지만, 나머지 설정의 경우 Confluent Connector 문서를 참고해주세요.

SQLServer Source Connector Json

{
    "name": "sqlserver_cdc_connector_001",
    "config": {
        "connector.class": "SqlServerCdcSource",
        "tasks.max": "1",
        "database.port": "1433",
        "database.server.name": "**", [암호화]
        "database.hostname": "**", [암호화]
        "database.user": "**", [암호화]
        "database.password": "**", [암호화]
        "database.dbname": "**", [암호화]
        "auto.create.topics.enable": "true",
        "cleanup.policy":"delete",
        "decimal.handling.mode": "double",
        "database.history.skip.unparseable.ddl": "true",
        "snapshot.mode": "initial",
        "snapshot.isolation.mode": "read_uncommitted",
        "signal.data.collection": "nasa1515.dbo.nasa1515_db", [예시]
        "after.state.only" : "false",
        "time.precision.mode": "connect",
        "table.include.list": "nasa1515.dbo.nasa1515_db", [예시]
        "transforms":"unwrap_after, masknickname",
        "transforms.unwrap_after.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap_after.add.fields": "db,schema,table,op,change_lsn,commit_lsn,source.ts_ms",
        "transforms.unwrap_after.delete.handling.mode": "rewrite",
        "predicates": "if_topics",
        "predicates.if_topics.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
        "predicates.if_topics.pattern": "nasa1515.dbo.nasa1515_db", [예시]
        "transforms.masknickname.predicate":  "if_topics",
        "transforms.masknickname.type": "org.apache.kafka.connect.transforms.MaskField$Value",
        "transforms.masknickname.fields": "user_id,user_name,user_phone",
        "transforms.masknickname.replacement": "****"
        "output.data.format": "AVRO",
        "output.key.format": "AVRO",
        }
    }      

이번 프로젝트에서 제일 당황스러웠던 것은, 고객의 Source DB에는 Repliation이나 Backup이 없어, 운용되고 있는 Source에서 데이터를 수집해야하는 것이었습니다.
이에 DB에서 발생하는 트랜잭션을 최대한 줄이기위해 "snapshot.isolation.mode"를 read_uncommitted으로 설정해 초기 데이터부터 적재하며 수집했습니다.

사실 DB 부하의 최소화를 위해서는, SQL Server DB의 Full-backup File로 초기데이터를 적재한 뒤에 증분 데이터만 Confluent로 수집을 하는게 맞다고 생각합니다.
다만, 프로젝트의 제한된 일정에는 100TB 이상의 Full-backup File 데이터를 이동/백업/복원하기에는 시간이 부족해 어쩔수 없이 내린 결정이라 아쉽습니다.


SMT (Single Message Transforms)

수집대상의 특정 테이블의 컬럼 데이터들의 경우 보안에 민감한 고객들의 주요 개인정보가 담겨져 있는 데이터가 노출이 되어 있었습니다 (이름, 휴대폰번호, 이메일.. 등등)
해당 데이터들을 마스킹하고, 뒷단의 전처리 작업의 효율 증가를 위해 (토픽에 쌓인 메세지의 크기를 줄여 성능 최적화) ("after.state.only")로 “Before/After/Source” 형태의 Payload로 수집되는 데이터들을 After State의 데이터만 수집하도록 1차적으로 데이터를 평면화하였습니다. (아래 그림은 After.state.only 설정전의 Structure 설명입니다.)

이미지 설명

1차적으로 분류되어진 데이터를, 실제로 필요한 컬럼들만 추출하여 분류하는 ExtractNewRecordState를 사용하여 추가적으로 Topic에 쌓여질 데이터를 분류하였습니다.

"transforms":"unwrap_after, masknickname",
"transforms.unwrap_after.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap_after.add.fields": "db,schema,table,op,change_lsn,commit_lsn,source.ts_ms",
"transforms.unwrap_after.delete.handling.mode": "rewrite",
  • ExtractNewRecordState은 1차적으로 평면화된 데이터에서 Source info Meta에서 필요한 값을 추출해 After-State Meta에 추가해주기 위한 용도로 (add.fields)
    뒷단의 전처리에서 필요한 Source의 정보 (db,schema,table,op,change_lsn,commit_lsn)를 얻기 위해서 사용했습니다.

add.fields 에서 제일 중요한 것은 op 컬럼으로, 실제로Update, Insert 등 해당 이벤트 메세지가 실제로 어떤 트랜잭션인지 구분할 수 있습니다.
Delete는 delete.handling.mode 설정으로 “__deleted” Key가 추가되어 value 값으로 구분되어 처리됩니다. (해당 SMT로 아래와 같은 형태의 데이터가 Meta에 추가됩니다)

{
  ...
  "__db": "nasadb",
  "__schema": "dbo",
  "__table": "nasa1515_test_table",
  "__op": "u",
  "__change_lsn": "00:00000970:0003",
  "__commit_lsn": "00:00000970:0004",
  "__source_ts_ms": 12222227667,
  "__deleted": "false"
  ...
}

SMT를 사용한 데이터 마스킹의 경우, ExtractNewRecordState 과정으로 평면화된 데이터 중, 특정 Topic (특정 테이블)의 특정 Columns이 마스킹의 대상임으로
해당 조건을 정의한 뒤에, 조건에 맞는 필드들만 마스킹이되어야 하는 로직으로 구성되어야 하기에 TopicNameMatchesMaskField$Value Type의 SMT를 사용했습니다.

"predicates": "if_topics",
"predicates.if_topics.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.if_topics.pattern": "nasa1515.dbo.nasa1515_db", [예시]
"transforms.masknickname.predicate":  "if_topics",
"transforms.masknickname.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.masknickname.fields": "user_id,user_name,user_phone",
"transforms.masknickname.replacement": "****"
  • TopicNameMatches의 경우 Source Connector에서 수집되는 테이블 별 데이터에서, 실제로 Transforms이 발생해야 할 테이블을 구분 IF 절의 기능으로 사용했습니다.
  • MaskField$Value"transforms.masknickname.predicate": "if_topics"의 pattern에 맞는, Topic일 경우에만 실행되며, 미리 정의되어 있는 fields의 값을 치환하는데 사용했습니다.

MaskField$Value의 경우 문자열을 단순히 Replacement 해주는 용도로만 사용할 수 있습니다, 실제로 패턴과 조건이 있는 마스킹이 필요하면 K-sql을 사용해야합니다.


Json Converter VS Avro Converter

기본적으로 Kafka Connect의 구성요소에는 Connector와 Broker 사이에서 데이터를 Serialization/Deserialization 하는 Converter가 존재합니다.
요약하자면, Connector와 Broker 사이의 주고받는 데이터의 형식을 정의하는 역할을 한다고 이해하시면 되겠습니다. 일반적으론 Json/Avro Converter가 주로 사용되는데요.
두 Data Type의 가장 큰 차이는, Json의 경우 메세지에 스키마를 포함하고 있다는 부분입니다, 때문에 스키마 변경에 유연한 파이프라인이 목적이었긴 하지만, 실제 RDB의 특성상 운용 중인 Table들의 스키마 변경은 비번히 발생하지 않고, 트랜잭션은 매초 발생할텐데, JSON을 사용한다면 매번 트랜잭션마다 스키마를 포함한 데이터가 전달되기 때문에 RDB Source를 수집하는 CDC 파이프라인에는 비효율적이라고 판단했고, 반면에 Avro의 경우 Schema Registry라는 별도의 스키마 저장 공간에 스키마를 저장하기에 실제로 주고받는 데이터의 양이 적고, Confluent에서 지원되는 Schema Registry의 경우 Schema의 Version이나 관리 측면의 장점도 존재하여 Avro Converter를 쓰기로 결정했습니다.

image

Topic 까지의 Source Connector의 흐름

이제 Source Connector는 Kafka brocker(Confluent)에 연결된 각 테이블의 이름 별로 Topic을 생성한 뒤, 변환한 이벤트 메세지들을 Partition안에 데이터를 적재합니다.
image

  • 위의 그림과 같은 예시로, 실제 Topic Naming이 Source Table에서 받아온 Naming 그대로 생성이되고, 각 Partition에 메세지를 적재하게 됩니다.

Azure DataLake Sink Connector Json

이제 Topic에 저장되어 있는 이벤트 메세지들을, Sink Connector를 사용해 Parquet 파일형태로 변환하여, ADLS(Storage)에 적재합니다.
Kakfa Topic 안에 쌓여있는 메세지를, kafka Bootstrap을 사용해서 바로 끌고올 수 있는 방법이 있지만, 뒷단의 전처리 과정에서, 여러가지 Mart 구조를 갖게될 수 있으므로
보다 유연하게 (여러 Source의 데이터를 하나의 스토리지의 파일로 관리할 수 있도록)하기 위해, 파일로 저장하는 방식의 과정을 진행하게 됩니다.

{
 "name": "adls_gen2_sink_connector_001",
 "config": {
    "connector.class": "AzureDataLakeGen2Sink",
    "tasks.max":"2",
    "time.interval": "HOURLY",
    "kafka.endpoint": "**:9092", [암호화]
    "kafka.service.account.id": "**", [암호화]
    "kafka.region": "koreacentral",
    "azure.datalake.gen2.access.key": "****************", [암호화]
    "azure.datalake.gen2.account.name": "***", [암호화]
    "azure.datalake.gen2.client.key": "", [암호화]
    "kafka.auth.mode": "SERVICE_ACCOUNT"
    "topics": "nasa.dbo.nasa_student", [예시]
    "input.data.format": "AVRO",
    "output.data.format": "PARQUET",
    "topics.dir": "topics",
    "Path format": "year=YYYY/month=MM/day=dd/hour'=HH"
    "schema.context.name": "default"
    "flush.size": "10000",
    "rotate.schedule.interval.ms" : "60000",
    "rotate.interval.ms": "${time.interval}",
    "max.poll.records": "500",
    "max.poll.interval.ms": "300000"
    }
}

Sink Connector에서는 topics 설정에 있는 Topic을 대상으로, "input.data.format", "output.data.format" 두개의 설정으로, 실제 Topic에 쌓여있는AVRO 데이터를 Parquet으로 변환한 뒤, Path format`으로 실제 파일이 저장되는 디렉토리의 Partition을 지정해 저장하는 동작이 실행됩니다.
이외의 다른 Config 설정들은, 실제 Kakfa와, ADLS에 연결하기 위한 접속정보나, 암호 정보들이 대부분 입니다.

Sink Connector의 동작

이미지 설명

SinkConnector는 기존의 Kafka Consumer와 동일한 Polling Method를 사용합니다.
저장되어 있는 Topic의 Partition 데이터를 poll() 요청 후, 해당 Method의 Request값에 대해 SinkConnector에 내장되어 있는 후처리 동작을 하는 방식입니다.

  • Consumer 동일한 Message Batch를 사용하게 되고, Max Poll Records설정으로 Batch로 가져올 Message의 양을 조절하고 Max Poll Interval로 Message Batch의 Time interval을 조정해서 해당 조건에 맞는 Record를 가져옵니다.
  • Consumer는 Microbatch 형식으로 Record를 Uploder Obj (해당 Connector의 경우 ADLS obj)로 보내고, 데이터를 조건에 맞는 Partitioner로 분할합니다.
  • Upload Obj는 해당 파일이 Flush File 설정의 Flush Size로 저장될 Record의 양을 조절하고 offset.flush.interval.ms로 File Upload Interval을 조정하여, Parquet 파일 형태로 저장한 뒤 스토리지에 업로드 하게 됩니다.
  • 다만 이번 프로젝트를 진행하면서 생각보다 특정 Table에 적은 Transaction이 발생해서, Flush Size 만큼의 데이터가 생기지 않고, Time Interval 설정이 정상적을 동작하지 않는 Confluent의 이슈가 발생했습니다. (Confluent의 Standard Cluster의 경우 설정 적용에 제한적이었음.) 따라서 rotate_interval.ms, rotate.schedule.interval.ms 두개의 설정을 이용해, 앞선 조건이 만족하지 않더라도 강제적으로 Upload Obj의 Session을 종료하고 File을 Upload 하도록 만들었습니다. 따라서 무조건 설정된 Rotate 주기마다 파일이 생성되게 되고, Time Based Partitioner로 분할 저장되게 됩니다. (파일 갯수가 많아 짐에 따라 후에 Spark의 Read I/O에 성능에 영향이 있을 수 있으니 전략을 잘 수립해야함) 해당 전략을 사용한다면, 강제로 Session을 닫는 동작이 일어나기에 excactly-once을 만족할 수 없습니다.

DataBricks의 Spark Streaming 처리

프로젝트의 주요 목표는 LakeStoage로 적재된 CDC/Batch에 대한 동시 처리 파이프라인을 구축하는 것에 있어서 DataBricks를 사용해서 진행했습니다.
DataBricks에서 Spark Streaming의 Microbatch를 사용해 증분 데이터를 반영하는 로직을 구현했고, 프로젝트에서 작업한 적재 대상 데이터는 다음과 같은 특징이 있었습니다.

1. 3TB
2. 14억건
3. Daily 증분 량 : 10GB
4. 테이블은 1000개 이상

총 초기적재될 용량이 많은 것에 이어서, 일 별 증분량, 거기다가 한번에 증분 및 실시간 처리해야 할 테이블이 1000개 이상이었습니다.
때문에 파이프라인의 관리가 조금 더 유연하게 설계되어야하고, Streaming Source Table의 증분 반영 로직은 병렬처리가 필수적이었습니다. 처음에는 asyncio나 feature를 사용해서 비동기 방식으로 처리하려고 했으나, 단 한번 Streaming Session만 실행시켜주면 되는 상황으로 판단해, 병렬처리를 하는 방식으로 우회했습니다.

Steaming Source Code

전체적인 코드는 공개할 수 없지만,
각 테이블마다 상이한 PK 조건을 저장한 Table을 토대로 적합한 SparkSeesion을 만들어 할당해주는 Class를 만들어서 해결했습니다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta.tables import *
from datetime import datetime

# pk 조건 만드는 함수 (upsertToDelta에서 조건문 만드는데 사용)

class cls_upsert:
    def __init__(self, tableName, tablePks, targetSchema, checkpoint_location, source_data_location, schema):
        self.tableName = tableName # read tableName
        self.tablePKs = tablePks # read tablePK
        self.targetSchema = targetSchema # merge 대상 table schema name
        self.checkpoint_location = checkpoint_location # checkpoint 경로
        self.source_data_location = source_data_location # 
        self.schema = schema # source 
        self.tbl_target = DeltaTable.forName(spark, f"{self.targetSchema}.streaming_{self.tableName}")

    # PK 메소드
    def create_condition(pk_array:str) -> dict[str]:
        search=0
        return_string = ''
        if len(pk_array) == 1:
            return_string = return_string + 'target.'+pk_array[0]+' = using.'+pk_array[0]
        else:
            for i in pk_array:
                return_string = return_string + 'target.' + i + ' = using.' + i
                if len(pk_array)-1 == search:
                    break;
                search = search + 1
                return_string = return_string + ' and '
        return return_string


    # microBatch merge processing 함수 정의 ()
    def upsertToDelta(self, microBatchDF, batchId) -> str:
        window_spec = Window.partitionBy(*self.tablePKs).orderBy(desc("__source_ts_ms"))
        max_ts_ms = row_number().over(window_spec)

        using_df = microBatchDF.filter( col("__op") != 'r'  ) \
                            .withColumn("rank", max_ts_ms) \
                            .filter(col("rank") == 1) \
                            .drop('__db','__schema','__table','__change_lsn'
                                    ,'__commit_lsn','__source_ts_ms'
                                    , '__deleted', 'year', 'month', 'day', 'hour', "rank")

        # pk 조건문 만들기
        condition = create_condition(self.tablePKs)
        self.tbl_target.alias("target") \
                .merge(using_df.alias("using"), condition) \
                    .whenMatchedDelete(
                        condition="using.__op = 'd'",
                    ) \
                    .whenMatchedUpdateAll(
                        condition="using.__op = 'u'",
                    ) \
                    .whenNotMatchedInsertAll(
                        condition="using.__op = 'c' or using.__op = 'u'"
                    ) \
                    .execute()

    # Streming object를 생성하는 메소드
    def make_streamingObject(self):
        return spark.readStream.format("cloudFiles") \
                    .option("cloudFiles.format", "parquet") \
                    .schema(self.schema) \
                    .load(self.source_data_location) \
                    .writeStream \
                    .format("delta") \
                    .trigger(processingTime="10 minute") \
                    .foreachBatch(lambda microBatchDF, batchId: self.upsertToDelta(microBatchDF, batchId)) \
                    .outputMode("update") \
                    .option("checkpointLocation", self.checkpoint_location)

  • [PK 정제 코드 로직 요약] : PK 값이 테이블 별로 상이해, PK 값을 저장하고 있는 Info Table을 생성해서 처리해야하는 과제.

이미지 설명
  1. Info-Table Collect() : 각 테이블의 리스트와 PK 정보를 수집.
  2. Python Class: cdc_upsert: 테이블 별 PK 조건을 생성하고, DataFrame Upsert 로직 객체를 생성한 후, Streaming MicroBatch 객체를 생성.
  3. Streaming MicroBatch 실행: 각 테이블 별 Storage Data Path 기준 Event Trigger와 CheckPoint 증분 파일을 필터링, 데이터를 전처리 한 뒤 Delta Table에 변경 분을 Upsert.


큰일 났다 비상사태!!…문제 발생!!…

  • Streaming Object의 증분 데이터의 전처리 작업 도중, 갑자기 Blob Storage의 Read operator가 급증하면서 Transaction 비용이 치솟는 것을 확인했습니다!!

    이미지 설명

도대체 왜??? 갑자기…?? 이슈사항을 재현해서 확인해봤습니다.

이미지 설명

Spark Streaming의 foreachMicroBatch()중의 특정 CheckPoint 사용하게 되면, 지정한 Data Path를 지속적으로 확인해서 파일리스트를 가져오게되고, 해당 파일을 증분 파일과 필터링할 비교 리스트를 갱신하는데 사용합니다. 즉, processingTime/Trigger 조회 주기를 설정하지 않으면 기본 값인 500ms마다 hierarchical 구조의 디렉토리의 All-Depth를 조회하고 업데이트하기 때문에 발생했습니다…

  • 해결 : Class에서 Spark 객체를 생성할 때 Trigger 옵션 수정(10분)으로 1차 문제 해결했습니다
    다만, 비즈니스 로직의 구현에 따라, Transaction 빈도 발생 테이블을 구분하도록 커뮤니케이션해서, 시각화 용도의 테이블 주기만 수정해서, 동기화 주기 문제 해소했습니다.

[향후 개선 로직]

  1. [Partition 최적화 및 평면화] : Confluentd에서 저장되는 증분 데이터 DataPath의 Partition을 평면화해서 저장, CheckPoint 갱신 List의 Depth를 1/3로 줄일 수 있었습니다.
    (ex: dt=20240112/hour=01). 결과적으로 10분 단위의 대시보드 → 따른 로직이 없는, 실시간 대시보드의 구현이 가능.
  2. [파일 수 최소화] : Consumer(Sink Connector)의 max.poll record, flush.size의 설정을 최적화해서 저장되는 Data 파일 수의 최소화가 가능, 현재는 작은 파일로 저장 중으로, DataBirkcs에서 압축하는 Optimize 처리 로직을 구현해서 해소, 다만 불필요한 로직을 제거하고, Load 파일 개수를 줄여 성능 개선이 가능했었음.

마치며..

최대한 많은 양을 담으려고 노력했지만, 대외비인 내용들이 많이 포함되어 있어서, Batch 로직이던, 전처리 로직이던 담을 수 없는 부분들이 많았습니다.
최종적으로 작업 기간은 전체 데이터의 약 15TB의 이상의 초기 데이터를 적재하는데 15일 이상이 소요되었는데 (Confluent의 Standard Cluster의 제한 사항 때문에, 중간 중간에 문제가 많아서, 결국 용량이 높았던 Table은 따로 Backup File을 Restore 하는 방식을 사용하긴 했습니다..), 한정된 전체 작업 시간 중에 초기 적재 기간이 길었다 보니 중간 전처리 과정이나 설계 과정에서 예상치 못한 문제가 발생하지는 않을지, 혹은 서비스에 영향을 미치지 않을지에 대한 우려가 있었지만 다행히 큰 이슈 없이 무사히 잘 마무리되었습니다. 그리고 프로젝트 종료 후 현재까지 정상적으로 잘 운영되고 있습니다. 결과적으로 아래와 같은 개선? 사항을 얻을 수 있었습니다.
(사실 뒷단 Mart 작업이나, PowerBI 작업이 뚜렷한 인사이트인데…대외비..)

    1. [CDC 데이터 수집 로직 개발] : Confluent(Kafka Connector) CDC Stream to Storage 수집 로직 개발, Spark Streaming(Micro Batch)로 10분 단위 EtLT Real-Time 로직 개발, 데이터 접근 주기 24시간 → 10분으로 고도화했습니다.
    1. [Data Lake Platform 구축] : 고객 데이터를 Cloud 환경으로 통합해 분석 용도의 Delta Lake Medallion 아키텍처 기반 DataBricks Platform을 구축했습니다, 데이터 분석 프레임 구조를 Batch → Batch & Streaming으로 확장했습니다.
    1. [시각화 고도화] : Full-Table Scan ERP 구조의 데이터 조회 방식의 대시보드 → Demension 기반 Mart로 모델링 한 Power BI 대시보드로 고도화하였습니다. 평균 시각화 데이터 접근 성능 3분 → 20초로 단축했습니다.

추후에 CDC나 Confleunt, Streaming 관련해서 운영적이나, 설계 측 관점에 대한 업데이트가 있다면 새로운 포스트를 통해서 찾아뵙도록 하겠습니다. 감사합니다.


Tags

#Data Engineering#CDC#Confluent
NASA1515

NASA1515

Data Engineer

Hello I'M Wonseok aka NASA1515

Expertise

Public Cloud
k8s/Docker
Python

Social Media

instagramwebsitelinkedingithub

Related Posts

[KAFKA] Apache Kafka Consumer Group 심층 분석: 효율적인 데이터 처리를 위한 Rebalancing 메커니즘과 최적화 기법
[KAFKA] Apache Kafka Consumer Group 심층 분석: 효율적인 데이터 처리를 위한 Rebalancing 메커니즘과 최적화 기법
2022-12-21
9 min

Topics

CloudDevelop

Social Media